Created
December 12, 2025 23:07
-
-
Save etahamad/2915c7f39531cc333d844e8ed9a65bed to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import runpod | |
| import json | |
| import urllib.request | |
| import urllib.parse | |
| import time | |
| import os | |
| import requests | |
| import base64 | |
| from io import BytesIO | |
| import websocket | |
| import uuid | |
| import tempfile | |
| import socket | |
| import traceback | |
| import copy | |
| import sys | |
| import importlib | |
| import importlib.abc | |
| # --------------------------------------------------------------------------- | |
| # ComfyUI Authentication Patch: Fix comfy_api_nodes authentication | |
| # This patches the comfy_api_nodes module to use config-based auth instead | |
| # of requiring a user session, fixing the "Unauthorized: Please login first" error | |
| # --------------------------------------------------------------------------- | |
| # Global variable to store API key from request payload | |
| _request_api_key = None | |
| def get_system_api_key(): | |
| """Get API key from system sources (config file or environment variable)""" | |
| config_file = "/comfyui/user/default/config.json" | |
| try: | |
| if os.path.exists(config_file): | |
| with open(config_file, 'r') as f: | |
| config = json.load(f) | |
| api_key = config.get("user", {}).get("api_key", "") | |
| if api_key: | |
| print(f"worker-comfyui - Found API key in config['user']['api_key']") | |
| return api_key | |
| # Also check root level | |
| api_key = config.get("api_key", "") | |
| if api_key: | |
| print(f"worker-comfyui - Found API key in config['api_key']") | |
| return api_key | |
| # Also check environment variable | |
| api_key = os.environ.get("API_KEY_COMFY_ORG", "") | |
| if api_key: | |
| print(f"worker-comfyui - Found API key in environment variable") | |
| return api_key | |
| return None | |
| except Exception as e: | |
| print(f"worker-comfyui - Warning: Error reading config: {e}") | |
| return None | |
| def _patch_comfy_api_nodes_auth(): | |
| """Patch comfy_api_nodes to use API key from config file or request payload""" | |
| def get_api_key_from_config(): | |
| """Read API key from ComfyUI config file, environment variable, or request payload""" | |
| # First check if API key was provided in request payload | |
| global _request_api_key | |
| if _request_api_key: | |
| print(f"worker-comfyui - Found API key in request payload") | |
| return _request_api_key | |
| # Fall back to system API key | |
| system_key = get_system_api_key() | |
| if system_key: | |
| return system_key | |
| print("worker-comfyui - Warning: No API key found in config, environment, or request payload") | |
| return None | |
| def patch_client_module(module): | |
| """Patch the _request_base function in the client module""" | |
| # Check if we have an API key (will check global, config, env) | |
| api_key = get_api_key_from_config() | |
| if not api_key: | |
| return False | |
| if not hasattr(module, '_request_base'): | |
| return False | |
| original_request = module._request_base | |
| # Check if it's async | |
| import inspect | |
| is_async = inspect.iscoroutinefunction(original_request) | |
| if is_async: | |
| async def patched_request_base(cfg, *args, **kwargs): | |
| # Get API key dynamically (may have been set in request payload) | |
| current_api_key = get_api_key_from_config() | |
| if current_api_key: | |
| # Add API key to headers if not present | |
| if hasattr(cfg, 'headers'): | |
| if not isinstance(cfg.headers, dict): | |
| cfg.headers = {} | |
| if 'X-API-Key' not in cfg.headers: | |
| cfg.headers['X-API-Key'] = current_api_key | |
| if 'Authorization' not in cfg.headers: | |
| cfg.headers['Authorization'] = f'Bearer {current_api_key}' | |
| # Call original function | |
| try: | |
| return await original_request(cfg, *args, **kwargs) | |
| except Exception as e: | |
| # If we get "Please login first" error, ensure headers are set and retry | |
| error_str = str(e) | |
| if 'login' in error_str.lower() or 'unauthorized' in error_str.lower(): | |
| if current_api_key and hasattr(cfg, 'headers'): | |
| if not isinstance(cfg.headers, dict): | |
| cfg.headers = {} | |
| cfg.headers['X-API-Key'] = current_api_key | |
| cfg.headers['Authorization'] = f'Bearer {current_api_key}' | |
| return await original_request(cfg, *args, **kwargs) | |
| raise | |
| else: | |
| def patched_request_base(cfg, *args, **kwargs): | |
| # Get API key dynamically (may have been set in request payload) | |
| current_api_key = get_api_key_from_config() | |
| if current_api_key: | |
| # Add API key to headers if not present | |
| if hasattr(cfg, 'headers'): | |
| if not isinstance(cfg.headers, dict): | |
| cfg.headers = {} | |
| if 'X-API-Key' not in cfg.headers: | |
| cfg.headers['X-API-Key'] = current_api_key | |
| if 'Authorization' not in cfg.headers: | |
| cfg.headers['Authorization'] = f'Bearer {current_api_key}' | |
| # Call original function | |
| try: | |
| return original_request(cfg, *args, **kwargs) | |
| except Exception as e: | |
| # If we get "Please login first" error, ensure headers are set and retry | |
| error_str = str(e) | |
| if 'login' in error_str.lower() or 'unauthorized' in error_str.lower(): | |
| if current_api_key and hasattr(cfg, 'headers'): | |
| if not isinstance(cfg.headers, dict): | |
| cfg.headers = {} | |
| cfg.headers['X-API-Key'] = current_api_key | |
| cfg.headers['Authorization'] = f'Bearer {current_api_key}' | |
| return original_request(cfg, *args, **kwargs) | |
| raise | |
| # Apply the patch | |
| module._request_base = patched_request_base | |
| print(f"worker-comfyui - ✓ Patched comfy_api_nodes authentication (async={is_async}, api_key={'***' + api_key[-4:] if api_key and len(api_key) > 4 else 'N/A'})") | |
| return True | |
| # Set up import hook to intercept comfy_api_nodes.util.client when imported | |
| # Use importlib's machinery for Python 3 compatibility | |
| class ComfyAPINodesPatcher(importlib.abc.MetaPathFinder): | |
| def find_spec(self, name, path, target=None): | |
| # Let normal import proceed | |
| return None | |
| def find_module(self, name, path=None): | |
| # Legacy method for Python < 3.4 | |
| return None | |
| # Add our finder to meta_path | |
| patcher_finder = ComfyAPINodesPatcher() | |
| if patcher_finder not in sys.meta_path: | |
| sys.meta_path.insert(0, patcher_finder) | |
| # Also set up a simpler hook using __import__ | |
| original_import = __import__ | |
| def patched_import(name, globals=None, locals=None, fromlist=(), level=0): | |
| module = original_import(name, globals, locals, fromlist, level) | |
| # Check if this is the client module we want to patch | |
| if name == 'comfy_api_nodes.util.client' or ( | |
| 'comfy_api_nodes' in name and 'client' in name and | |
| hasattr(module, '__file__') and | |
| module.__file__ and | |
| 'client.py' in module.__file__ | |
| ): | |
| patch_client_module(module) | |
| return module | |
| # Replace the built-in __import__ in builtins module | |
| import builtins | |
| builtins.__import__ = patched_import | |
| # Also try to patch already-loaded modules | |
| for module_name in list(sys.modules.keys()): | |
| if 'comfy_api_nodes' in module_name and 'client' in module_name: | |
| try: | |
| module = sys.modules[module_name] | |
| if hasattr(module, '_request_base'): | |
| patch_client_module(module) | |
| except Exception: | |
| pass | |
| # Set up a periodic check to patch modules that load later | |
| def periodic_patch_check(): | |
| """Periodically check for and patch comfy_api_nodes""" | |
| import threading | |
| def check_and_patch(): | |
| time.sleep(5) # Wait for ComfyUI to start loading custom nodes | |
| for _ in range(10): # Check up to 10 times | |
| try: | |
| # Try to import and patch | |
| if 'comfy_api_nodes.util.client' in sys.modules: | |
| module = sys.modules['comfy_api_nodes.util.client'] | |
| if hasattr(module, '_request_base'): | |
| patch_client_module(module) | |
| break | |
| except Exception: | |
| pass | |
| time.sleep(2) | |
| thread = threading.Thread(target=check_and_patch, daemon=True) | |
| thread.start() | |
| periodic_patch_check() | |
| # Apply the patch early | |
| _patch_comfy_api_nodes_auth() | |
| # Time to wait between API check attempts in milliseconds | |
| COMFY_API_AVAILABLE_INTERVAL_MS = 50 | |
| # Maximum number of API check attempts | |
| COMFY_API_AVAILABLE_MAX_RETRIES = 500 | |
| # Websocket reconnection behaviour (can be overridden through environment variables) | |
| # NOTE: more attempts and diagnostics improve debuggability whenever ComfyUI crashes mid-job. | |
| # • WEBSOCKET_RECONNECT_ATTEMPTS sets how many times we will try to reconnect. | |
| # • WEBSOCKET_RECONNECT_DELAY_S sets the sleep in seconds between attempts. | |
| # | |
| # If the respective env-vars are not supplied we fall back to sensible defaults ("5" and "3"). | |
| WEBSOCKET_RECONNECT_ATTEMPTS = int(os.environ.get("WEBSOCKET_RECONNECT_ATTEMPTS", 5)) | |
| WEBSOCKET_RECONNECT_DELAY_S = int(os.environ.get("WEBSOCKET_RECONNECT_DELAY_S", 3)) | |
| # Retry configuration for 504 Gateway Timeout errors (e.g., from Gemini API with large images) | |
| # • GEMINI_504_RETRY_ATTEMPTS sets how many times to retry on 504 errors (default: 3) | |
| # • GEMINI_504_RETRY_DELAY_S sets the initial delay in seconds (default: 5, uses exponential backoff) | |
| GEMINI_504_RETRY_ATTEMPTS = int(os.environ.get("GEMINI_504_RETRY_ATTEMPTS", 3)) | |
| GEMINI_504_RETRY_DELAY_S = int(os.environ.get("GEMINI_504_RETRY_DELAY_S", 5)) | |
| # Timeout configuration for large images and Gemini API calls | |
| # • WEBSOCKET_RECV_TIMEOUT_S sets websocket receive timeout in seconds (default: 300 = 5 minutes, set higher for 4K images) | |
| # • WORKFLOW_EXECUTION_TIMEOUT_S sets maximum time to wait for workflow completion (default: 600 = 10 minutes, set higher for large images) | |
| # • For 4K images with Gemini, consider setting WEBSOCKET_RECV_TIMEOUT_S=600 and WORKFLOW_EXECUTION_TIMEOUT_S=1200 | |
| WEBSOCKET_RECV_TIMEOUT_S = int(os.environ.get("WEBSOCKET_RECV_TIMEOUT_S", 300)) # 5 minutes default | |
| WORKFLOW_EXECUTION_TIMEOUT_S = int(os.environ.get("WORKFLOW_EXECUTION_TIMEOUT_S", 600)) # 10 minutes default | |
| # Extra verbose websocket trace logs (set WEBSOCKET_TRACE=true to enable) | |
| if os.environ.get("WEBSOCKET_TRACE", "false").lower() == "true": | |
| # This prints low-level frame information to stdout which is invaluable for diagnosing | |
| # protocol errors but can be noisy in production – therefore gated behind an env-var. | |
| websocket.enableTrace(True) | |
| # Host where ComfyUI is running | |
| COMFY_HOST = "127.0.0.1:8188" | |
| # Enforce a clean state after each job is done | |
| # see https://docs.runpod.io/docs/handler-additional-controls#refresh-worker | |
| REFRESH_WORKER = os.environ.get("REFRESH_WORKER", "false").lower() == "true" | |
| # Maximum response size in bytes (RunPod typically has a limit around 1-6MB for JSON responses) | |
| # Base64 encoding increases size by ~33%, so we check the JSON size | |
| MAX_RESPONSE_SIZE_BYTES = int(os.environ.get("MAX_RESPONSE_SIZE_BYTES", 5242880)) # Default 5MB | |
| # Threshold for requiring S3 upload (files larger than this should use S3 to avoid response size issues) | |
| # 4K images/videos can easily exceed this, so we set it conservatively | |
| REQUIRE_S3_SIZE_THRESHOLD = int(os.environ.get("REQUIRE_S3_SIZE_THRESHOLD", 1048576)) # Default 1MB (raw file size) | |
| # --------------------------------------------------------------------------- | |
| # S3 Storage Detection: Check if Cloudflare R2 storage is configured | |
| # --------------------------------------------------------------------------- | |
| def is_s3_configured(): | |
| """Check if Cloudflare R2 storage is configured.""" | |
| return os.environ.get("BUCKET_ENDPOINT_URL") is not None and os.environ.get("BUCKET_NAME") is not None | |
| def log_storage_configuration(): | |
| """Log the current S3 storage configuration at startup.""" | |
| if is_s3_configured(): | |
| endpoint_url = os.environ.get("BUCKET_ENDPOINT_URL") | |
| bucket_name = os.environ.get("BUCKET_NAME") | |
| region = os.environ.get("S3_REGION", "auto") | |
| public_url = os.environ.get("BUCKET_PUBLIC_URL") | |
| # Detect storage provider | |
| storage_provider = "S3-compatible storage" | |
| if endpoint_url: | |
| if 'storage.supabase.co' in endpoint_url: | |
| storage_provider = "Supabase Storage" | |
| elif 'r2.cloudflarestorage.com' in endpoint_url: | |
| storage_provider = "Cloudflare R2" | |
| print(f"worker-comfyui - S3 storage: {storage_provider} configured") | |
| print(f"worker-comfyui - S3 storage: Endpoint: {endpoint_url}") | |
| print(f"worker-comfyui - S3 storage: Bucket: {bucket_name}") | |
| print(f"worker-comfyui - S3 storage: Region: {region}") | |
| if public_url: | |
| print(f"worker-comfyui - S3 storage: Public URL: {public_url}") | |
| else: | |
| print("worker-comfyui - S3 storage: Public URL: Not configured (will construct from endpoint)") | |
| print(f"worker-comfyui - S3 storage: Using {storage_provider} S3-compatible API") | |
| if 'r2.cloudflarestorage.com' in (endpoint_url or ""): | |
| print("worker-comfyui - S3 storage: Tip: Configure lifecycle policy to auto-delete files after 1 day (see docs/s3_setup_guide.md)") | |
| else: | |
| print("worker-comfyui - S3 storage: Not configured (files will be returned as base64)") | |
| print("worker-comfyui - S3 storage: Large files (>1MB) will fail. Configure BUCKET_ENDPOINT_URL, BUCKET_NAME, BUCKET_ACCESS_KEY_ID, and BUCKET_SECRET_ACCESS_KEY to enable S3 upload.") | |
| # Log storage configuration at module load time | |
| log_storage_configuration() | |
| # --------------------------------------------------------------------------- | |
| # Helper: quick reachability probe of ComfyUI HTTP endpoint (port 8188) | |
| # --------------------------------------------------------------------------- | |
| def _comfy_server_status(): | |
| """Return a dictionary with basic reachability info for the ComfyUI HTTP server.""" | |
| try: | |
| resp = requests.get(f"http://{COMFY_HOST}/", timeout=5) | |
| return { | |
| "reachable": resp.status_code == 200, | |
| "status_code": resp.status_code, | |
| } | |
| except Exception as exc: | |
| return {"reachable": False, "error": str(exc)} | |
| def _attempt_websocket_reconnect(ws_url, max_attempts, delay_s, initial_error): | |
| """ | |
| Attempts to reconnect to the WebSocket server after a disconnect. | |
| Args: | |
| ws_url (str): The WebSocket URL (including client_id). | |
| max_attempts (int): Maximum number of reconnection attempts. | |
| delay_s (int): Delay in seconds between attempts. | |
| initial_error (Exception): The error that triggered the reconnect attempt. | |
| Returns: | |
| websocket.WebSocket: The newly connected WebSocket object. | |
| Raises: | |
| websocket.WebSocketConnectionClosedException: If reconnection fails after all attempts. | |
| """ | |
| print( | |
| f"worker-comfyui - Websocket connection closed unexpectedly: {initial_error}. Attempting to reconnect..." | |
| ) | |
| last_reconnect_error = initial_error | |
| for attempt in range(max_attempts): | |
| # Log current server status before each reconnect attempt so that we can | |
| # see whether ComfyUI is still alive (HTTP port 8188 responding) even if | |
| # the websocket dropped. This is extremely useful to differentiate | |
| # between a network glitch and an outright ComfyUI crash/OOM-kill. | |
| srv_status = _comfy_server_status() | |
| if not srv_status["reachable"]: | |
| # If ComfyUI itself is down there is no point in retrying the websocket – | |
| # bail out immediately so the caller gets a clear "ComfyUI crashed" error. | |
| print( | |
| f"worker-comfyui - ComfyUI HTTP unreachable – aborting websocket reconnect: {srv_status.get('error', 'status '+str(srv_status.get('status_code')))}" | |
| ) | |
| raise websocket.WebSocketConnectionClosedException( | |
| "ComfyUI HTTP unreachable during websocket reconnect" | |
| ) | |
| # Otherwise we proceed with reconnect attempts while server is up | |
| print( | |
| f"worker-comfyui - Reconnect attempt {attempt + 1}/{max_attempts}... (ComfyUI HTTP reachable, status {srv_status.get('status_code')})" | |
| ) | |
| try: | |
| # Need to create a new socket object for reconnect | |
| new_ws = websocket.WebSocket() | |
| new_ws.connect(ws_url, timeout=10) # Use existing ws_url | |
| print(f"worker-comfyui - Websocket reconnected successfully.") | |
| return new_ws # Return the new connected socket | |
| except ( | |
| websocket.WebSocketException, | |
| ConnectionRefusedError, | |
| socket.timeout, | |
| OSError, | |
| ) as reconn_err: | |
| last_reconnect_error = reconn_err | |
| print( | |
| f"worker-comfyui - Reconnect attempt {attempt + 1} failed: {reconn_err}" | |
| ) | |
| if attempt < max_attempts - 1: | |
| print( | |
| f"worker-comfyui - Waiting {delay_s} seconds before next attempt..." | |
| ) | |
| time.sleep(delay_s) | |
| else: | |
| print(f"worker-comfyui - Max reconnection attempts reached.") | |
| # If loop completes without returning, raise an exception | |
| print("worker-comfyui - Failed to reconnect websocket after connection closed.") | |
| raise websocket.WebSocketConnectionClosedException( | |
| f"Connection closed and failed to reconnect. Last error: {last_reconnect_error}" | |
| ) | |
| def validate_input(job_input): | |
| """ | |
| Validates the input for the handler function. | |
| Args: | |
| job_input (dict): The input data to validate. | |
| Returns: | |
| tuple: A tuple containing the validated data and an error message, if any. | |
| The structure is (validated_data, error_message). | |
| """ | |
| # Validate if job_input is provided | |
| if job_input is None: | |
| return None, "Please provide input" | |
| # Check if input is a string and try to parse it as JSON | |
| if isinstance(job_input, str): | |
| try: | |
| job_input = json.loads(job_input) | |
| except json.JSONDecodeError: | |
| return None, "Invalid JSON format in input" | |
| # Validate 'workflow' in input | |
| workflow = job_input.get("workflow") | |
| if workflow is None: | |
| return None, "Missing 'workflow' parameter" | |
| # Validate 'images' in input, if provided | |
| images = job_input.get("images") | |
| if images is not None: | |
| if not isinstance(images, list) or not all( | |
| "name" in image and "image" in image for image in images | |
| ): | |
| return ( | |
| None, | |
| "'images' must be a list of objects with 'name' and 'image' keys", | |
| ) | |
| # Return validated data and no error | |
| return {"workflow": workflow, "images": images}, None | |
| def check_server(url, retries=500, delay=50): | |
| """ | |
| Check if a server is reachable via HTTP GET request | |
| Args: | |
| - url (str): The URL to check | |
| - retries (int, optional): The number of times to attempt connecting to the server. Default is 50 | |
| - delay (int, optional): The time in milliseconds to wait between retries. Default is 500 | |
| Returns: | |
| bool: True if the server is reachable within the given number of retries, otherwise False | |
| """ | |
| print(f"worker-comfyui - Checking API server at {url}...") | |
| for i in range(retries): | |
| try: | |
| response = requests.get(url, timeout=5) | |
| # If the response status code is 200, the server is up and running | |
| if response.status_code == 200: | |
| print(f"worker-comfyui - API is reachable") | |
| return True | |
| except requests.Timeout: | |
| pass | |
| except requests.RequestException as e: | |
| pass | |
| # Wait for the specified delay before retrying | |
| time.sleep(delay / 1000) | |
| print( | |
| f"worker-comfyui - Failed to connect to server at {url} after {retries} attempts." | |
| ) | |
| return False | |
| def download_and_upload_image_from_url(url, filename=None): | |
| """ | |
| Download an image from a URL and upload it to ComfyUI. | |
| Args: | |
| url (str): The URL of the image to download | |
| filename (str, optional): The filename to use. If None, extracts from URL. | |
| Returns: | |
| tuple: (success: bool, filename: str or error_message: str) | |
| """ | |
| try: | |
| # Extract filename from URL if not provided | |
| if not filename: | |
| # Get filename from URL path | |
| parsed_url = urllib.parse.urlparse(url) | |
| filename = os.path.basename(parsed_url.path) | |
| # URL decode the filename | |
| filename = urllib.parse.unquote(filename) | |
| if not filename or '.' not in filename: | |
| # Fallback to a default name | |
| filename = f"image_{hash(url) % 10000}.png" | |
| print(f"worker-comfyui - Downloading image from URL: {url}") | |
| response = requests.get(url, timeout=60, stream=True) | |
| response.raise_for_status() | |
| # Get content type to determine file extension if needed | |
| content_type = response.headers.get('content-type', '') | |
| if not filename.endswith(('.png', '.jpg', '.jpeg', '.webp', '.gif')): | |
| if 'image/png' in content_type: | |
| filename = filename.rsplit('.', 1)[0] + '.png' | |
| elif 'image/jpeg' in content_type: | |
| filename = filename.rsplit('.', 1)[0] + '.jpg' | |
| elif 'image/webp' in content_type: | |
| filename = filename.rsplit('.', 1)[0] + '.webp' | |
| # Read image data | |
| image_data = response.content | |
| # Upload to ComfyUI | |
| files = { | |
| "image": (filename, BytesIO(image_data), content_type or "image/png"), | |
| "overwrite": (None, "true"), | |
| } | |
| upload_response = requests.post( | |
| f"http://{COMFY_HOST}/upload/image", files=files, timeout=60 | |
| ) | |
| upload_response.raise_for_status() | |
| print(f"worker-comfyui - Successfully downloaded and uploaded {filename}") | |
| return True, filename | |
| except requests.Timeout: | |
| error_msg = f"Timeout downloading image from {url}" | |
| print(f"worker-comfyui - {error_msg}") | |
| return False, error_msg | |
| except requests.RequestException as e: | |
| error_msg = f"Error downloading/uploading image from {url}: {e}" | |
| print(f"worker-comfyui - {error_msg}") | |
| return False, error_msg | |
| except Exception as e: | |
| error_msg = f"Unexpected error downloading image from {url}: {e}" | |
| print(f"worker-comfyui - {error_msg}") | |
| return False, error_msg | |
| def extract_and_process_urls_from_workflow(workflow): | |
| """ | |
| Extract image URLs from LoadImage nodes in the workflow, download them, | |
| upload to ComfyUI, and replace URLs with local filenames. | |
| Args: | |
| workflow (dict): The workflow dictionary | |
| Returns: | |
| tuple: (modified_workflow: dict, errors: list) | |
| """ | |
| errors = [] | |
| url_to_filename = {} # Cache to avoid downloading same URL twice | |
| # Deep copy the workflow to avoid modifying the original | |
| modified_workflow = copy.deepcopy(workflow) | |
| # Iterate through all nodes in the workflow | |
| for node_id, node_data in modified_workflow.items(): | |
| if not isinstance(node_data, dict): | |
| continue | |
| inputs = node_data.get("inputs", {}) | |
| class_type = node_data.get("class_type", "") | |
| # Check if this is a LoadImage node | |
| if class_type == "LoadImage" and "image" in inputs: | |
| image_value = inputs["image"] | |
| # Check if it's a URL (starts with http:// or https://) | |
| if isinstance(image_value, str) and ( | |
| image_value.startswith("http://") or image_value.startswith("https://") | |
| ): | |
| url = image_value | |
| # Check if we've already processed this URL | |
| if url in url_to_filename: | |
| inputs["image"] = url_to_filename[url] | |
| print( | |
| f"worker-comfyui - Reusing cached filename for URL: {url} -> {url_to_filename[url]}" | |
| ) | |
| continue | |
| # Download and upload the image | |
| success, result = download_and_upload_image_from_url(url) | |
| if success: | |
| filename = result | |
| url_to_filename[url] = filename | |
| inputs["image"] = filename | |
| print( | |
| f"worker-comfyui - Replaced URL with filename in node {node_id}: {url} -> {filename}" | |
| ) | |
| else: | |
| error_msg = f"Failed to process image URL in node {node_id}: {result}" | |
| errors.append(error_msg) | |
| print(f"worker-comfyui - {error_msg}") | |
| return modified_workflow, errors | |
| def upload_images(images): | |
| """ | |
| Upload a list of base64 encoded images to the ComfyUI server using the /upload/image endpoint. | |
| Args: | |
| images (list): A list of dictionaries, each containing the 'name' of the image and the 'image' as a base64 encoded string. | |
| Returns: | |
| dict: A dictionary indicating success or error. | |
| """ | |
| if not images: | |
| return {"status": "success", "message": "No images to upload", "details": []} | |
| responses = [] | |
| upload_errors = [] | |
| print(f"worker-comfyui - Uploading {len(images)} image(s)...") | |
| for image in images: | |
| try: | |
| name = image["name"] | |
| image_data_uri = image["image"] # Get the full string (might have prefix) | |
| # --- Strip Data URI prefix if present --- | |
| if "," in image_data_uri: | |
| # Find the comma and take everything after it | |
| base64_data = image_data_uri.split(",", 1)[1] | |
| else: | |
| # Assume it's already pure base64 | |
| base64_data = image_data_uri | |
| # --- End strip --- | |
| blob = base64.b64decode(base64_data) # Decode the cleaned data | |
| # Prepare the form data | |
| files = { | |
| "image": (name, BytesIO(blob), "image/png"), | |
| "overwrite": (None, "true"), | |
| } | |
| # POST request to upload the image | |
| response = requests.post( | |
| f"http://{COMFY_HOST}/upload/image", files=files, timeout=30 | |
| ) | |
| response.raise_for_status() | |
| responses.append(f"Successfully uploaded {name}") | |
| print(f"worker-comfyui - Successfully uploaded {name}") | |
| except base64.binascii.Error as e: | |
| error_msg = f"Error decoding base64 for {image.get('name', 'unknown')}: {e}" | |
| print(f"worker-comfyui - {error_msg}") | |
| upload_errors.append(error_msg) | |
| except requests.Timeout: | |
| error_msg = f"Timeout uploading {image.get('name', 'unknown')}" | |
| print(f"worker-comfyui - {error_msg}") | |
| upload_errors.append(error_msg) | |
| except requests.RequestException as e: | |
| error_msg = f"Error uploading {image.get('name', 'unknown')}: {e}" | |
| print(f"worker-comfyui - {error_msg}") | |
| upload_errors.append(error_msg) | |
| except Exception as e: | |
| error_msg = ( | |
| f"Unexpected error uploading {image.get('name', 'unknown')}: {e}" | |
| ) | |
| print(f"worker-comfyui - {error_msg}") | |
| upload_errors.append(error_msg) | |
| if upload_errors: | |
| print(f"worker-comfyui - image(s) upload finished with errors") | |
| return { | |
| "status": "error", | |
| "message": "Some images failed to upload", | |
| "details": upload_errors, | |
| } | |
| print(f"worker-comfyui - image(s) upload complete") | |
| return { | |
| "status": "success", | |
| "message": "All images uploaded successfully", | |
| "details": responses, | |
| } | |
| def get_available_models(): | |
| """ | |
| Get list of available models from ComfyUI | |
| Returns: | |
| dict: Dictionary containing available models by type | |
| """ | |
| try: | |
| response = requests.get(f"http://{COMFY_HOST}/object_info", timeout=10) | |
| response.raise_for_status() | |
| object_info = response.json() | |
| # Extract available checkpoints from CheckpointLoaderSimple | |
| available_models = {} | |
| if "CheckpointLoaderSimple" in object_info: | |
| checkpoint_info = object_info["CheckpointLoaderSimple"] | |
| if "input" in checkpoint_info and "required" in checkpoint_info["input"]: | |
| ckpt_options = checkpoint_info["input"]["required"].get("ckpt_name") | |
| if ckpt_options and len(ckpt_options) > 0: | |
| available_models["checkpoints"] = ( | |
| ckpt_options[0] if isinstance(ckpt_options[0], list) else [] | |
| ) | |
| return available_models | |
| except Exception as e: | |
| print(f"worker-comfyui - Warning: Could not fetch available models: {e}") | |
| return {} | |
| def queue_workflow(workflow, client_id, extra_data=None): | |
| """ | |
| Queue a workflow to be processed by ComfyUI | |
| Args: | |
| workflow (dict): A dictionary containing the workflow to be processed | |
| client_id (str): The client ID for the websocket connection | |
| extra_data (dict, optional): Extra data to include in the payload (e.g., API keys) | |
| Returns: | |
| dict: The JSON response from ComfyUI after processing the workflow | |
| Raises: | |
| ValueError: If the workflow validation fails with detailed error information | |
| """ | |
| # Include client_id and extra_data in the prompt payload | |
| payload = {"prompt": workflow, "client_id": client_id} | |
| if extra_data: | |
| payload["extra_data"] = extra_data | |
| data = json.dumps(payload).encode("utf-8") | |
| # Use requests for consistency and timeout | |
| headers = {"Content-Type": "application/json"} | |
| response = requests.post( | |
| f"http://{COMFY_HOST}/prompt", data=data, headers=headers, timeout=30 | |
| ) | |
| # Handle validation errors with detailed information | |
| if response.status_code == 400: | |
| print(f"worker-comfyui - ComfyUI returned 400. Response body: {response.text}") | |
| try: | |
| error_data = response.json() | |
| print(f"worker-comfyui - Parsed error data: {error_data}") | |
| # Try to extract meaningful error information | |
| error_message = "Workflow validation failed" | |
| error_details = [] | |
| # ComfyUI seems to return different error formats, let's handle them all | |
| if "error" in error_data: | |
| error_info = error_data["error"] | |
| if isinstance(error_info, dict): | |
| error_message = error_info.get("message", error_message) | |
| if error_info.get("type") == "prompt_outputs_failed_validation": | |
| error_message = "Workflow validation failed" | |
| else: | |
| error_message = str(error_info) | |
| # Check for node validation errors in the response | |
| if "node_errors" in error_data: | |
| for node_id, node_error in error_data["node_errors"].items(): | |
| if isinstance(node_error, dict): | |
| for error_type, error_msg in node_error.items(): | |
| error_details.append( | |
| f"Node {node_id} ({error_type}): {error_msg}" | |
| ) | |
| else: | |
| error_details.append(f"Node {node_id}: {node_error}") | |
| # Check if the error data itself contains validation info | |
| if error_data.get("type") == "prompt_outputs_failed_validation": | |
| error_message = error_data.get("message", "Workflow validation failed") | |
| # For this type of error, we need to parse the validation details from logs | |
| # Since ComfyUI doesn't seem to include detailed validation errors in the response | |
| # Let's provide a more helpful generic message | |
| available_models = get_available_models() | |
| if available_models.get("checkpoints"): | |
| error_message += f"\n\nThis usually means a required model or parameter is not available." | |
| error_message += f"\nAvailable checkpoint models: {', '.join(available_models['checkpoints'])}" | |
| else: | |
| error_message += "\n\nThis usually means a required model or parameter is not available." | |
| error_message += "\nNo checkpoint models appear to be available. Please check your model installation." | |
| raise ValueError(error_message) | |
| # If we have specific validation errors, format them nicely | |
| if error_details: | |
| detailed_message = f"{error_message}:\n" + "\n".join( | |
| f"• {detail}" for detail in error_details | |
| ) | |
| # Try to provide helpful suggestions for common errors | |
| if any( | |
| "not in list" in detail and "ckpt_name" in detail | |
| for detail in error_details | |
| ): | |
| available_models = get_available_models() | |
| if available_models.get("checkpoints"): | |
| detailed_message += f"\n\nAvailable checkpoint models: {', '.join(available_models['checkpoints'])}" | |
| else: | |
| detailed_message += "\n\nNo checkpoint models appear to be available. Please check your model installation." | |
| raise ValueError(detailed_message) | |
| else: | |
| # Fallback to the raw response if we can't parse specific errors | |
| raise ValueError(f"{error_message}. Raw response: {response.text}") | |
| except (json.JSONDecodeError, KeyError) as e: | |
| # If we can't parse the error response, fall back to the raw text | |
| raise ValueError( | |
| f"ComfyUI validation failed (could not parse error response): {response.text}" | |
| ) | |
| # For other HTTP errors, raise them normally | |
| response.raise_for_status() | |
| return response.json() | |
| def get_history(prompt_id): | |
| """ | |
| Retrieve the history of a given prompt using its ID | |
| Args: | |
| prompt_id (str): The ID of the prompt whose history is to be retrieved | |
| Returns: | |
| dict: The history of the prompt, containing all the processing steps and results | |
| """ | |
| # Use requests for consistency and timeout | |
| response = requests.get(f"http://{COMFY_HOST}/history/{prompt_id}", timeout=30) | |
| response.raise_for_status() | |
| return response.json() | |
| def get_image_data(filename, subfolder, image_type): | |
| """ | |
| Fetch image bytes from the ComfyUI /view endpoint. | |
| Args: | |
| filename (str): The filename of the image. | |
| subfolder (str): The subfolder where the image is stored. | |
| image_type (str): The type of the image (e.g., 'output'). | |
| Returns: | |
| bytes: The raw image data, or None if an error occurs. | |
| """ | |
| print( | |
| f"worker-comfyui - Fetching image data: type={image_type}, subfolder={subfolder}, filename={filename}" | |
| ) | |
| data = {"filename": filename, "subfolder": subfolder, "type": image_type} | |
| url_values = urllib.parse.urlencode(data) | |
| try: | |
| # Use requests for consistency and timeout | |
| response = requests.get(f"http://{COMFY_HOST}/view?{url_values}", timeout=60) | |
| response.raise_for_status() | |
| print(f"worker-comfyui - Successfully fetched image data for {filename}") | |
| return response.content | |
| except requests.Timeout: | |
| print(f"worker-comfyui - Timeout fetching image data for {filename}") | |
| return None | |
| except requests.RequestException as e: | |
| print(f"worker-comfyui - Error fetching image data for {filename}: {e}") | |
| return None | |
| except Exception as e: | |
| print( | |
| f"worker-comfyui - Unexpected error fetching image data for {filename}: {e}" | |
| ) | |
| return None | |
| def has_gemini_nodes(workflow): | |
| """ | |
| Check if workflow contains GeminiImage2Node or GeminiImageDirectNode nodes | |
| (which may need longer timeouts for large images). | |
| Args: | |
| workflow (dict): The workflow dictionary | |
| Returns: | |
| bool: True if workflow contains Gemini nodes | |
| """ | |
| for node_id, node_data in workflow.items(): | |
| if isinstance(node_data, dict): | |
| class_type = node_data.get("class_type", "") | |
| if class_type in ("GeminiImage2Node", "GeminiImageDirectNode"): | |
| return True | |
| return False | |
| def calculate_response_size(response_dict): | |
| """ | |
| Calculate the approximate size of a response dictionary when serialized to JSON. | |
| Args: | |
| response_dict (dict): The response dictionary to measure. | |
| Returns: | |
| int: The approximate size in bytes. | |
| """ | |
| try: | |
| json_str = json.dumps(response_dict, ensure_ascii=False) | |
| return len(json_str.encode('utf-8')) | |
| except Exception as e: | |
| print(f"worker-comfyui - Warning: Could not calculate response size: {e}") | |
| return 0 | |
| def handler(job): | |
| """ | |
| Handles a job using ComfyUI via websockets for status and image retrieval. | |
| Args: | |
| job (dict): A dictionary containing job details and input parameters. | |
| Returns: | |
| dict: A dictionary containing either an error message or a success status with generated images. | |
| """ | |
| job_input = job["input"] | |
| job_id = job["id"] | |
| # Always ensure extra_data exists and has API key (from request or system) | |
| global _request_api_key | |
| extra_data = {} | |
| s3_config = {} | |
| # Extract extra_data and s3_config from request if provided | |
| if isinstance(job_input, dict): | |
| request_extra_data = job_input.get("extra_data") | |
| if isinstance(request_extra_data, dict): | |
| extra_data = request_extra_data.copy() | |
| request_s3_config = job_input.get("s3_config") | |
| if isinstance(request_s3_config, dict): | |
| s3_config = request_s3_config.copy() | |
| print(f"worker-comfyui - Extracted s3_config from request: {s3_config}") | |
| else: | |
| print(f"worker-comfyui - No s3_config found in request (type: {type(request_s3_config)})") | |
| # Check if API key is provided in request | |
| request_api_key = extra_data.get("api_key_comfy_org") | |
| if request_api_key: | |
| _request_api_key = request_api_key | |
| print(f"worker-comfyui - Found API key in request extra_data") | |
| # Also update the patch to use this API key | |
| _patch_comfy_api_nodes_auth() | |
| else: | |
| # No API key in request, always use system API key | |
| system_api_key = get_system_api_key() | |
| if system_api_key: | |
| extra_data["api_key_comfy_org"] = system_api_key | |
| print(f"worker-comfyui - Using system API key (from config/env)") | |
| # Set environment variable for nodes | |
| os.environ["API_KEY_COMFY_ORG"] = system_api_key | |
| # Update the patch to use this API key | |
| _patch_comfy_api_nodes_auth() | |
| else: | |
| print(f"worker-comfyui - Warning: No API key found in request or system") | |
| # Make sure that the input is valid | |
| validated_data, error_message = validate_input(job_input) | |
| if error_message: | |
| return {"error": error_message} | |
| # Extract validated data | |
| workflow = validated_data["workflow"] | |
| input_images = validated_data.get("images") | |
| # Check if workflow contains Gemini nodes - if so, use longer timeouts for large images | |
| contains_gemini = has_gemini_nodes(workflow) | |
| # Use workflow-specific timeouts (can be overridden by environment variables) | |
| ws_recv_timeout = WEBSOCKET_RECV_TIMEOUT_S | |
| workflow_exec_timeout = WORKFLOW_EXECUTION_TIMEOUT_S | |
| if contains_gemini: | |
| # Use longer timeouts for Gemini workflows (especially with 4K images) | |
| # Check for Gemini-specific timeout env vars first, then use 2x defaults if not set | |
| gemini_ws_timeout = int(os.environ.get("GEMINI_WEBSOCKET_RECV_TIMEOUT_S", WEBSOCKET_RECV_TIMEOUT_S * 2)) | |
| gemini_workflow_timeout = int(os.environ.get("GEMINI_WORKFLOW_EXECUTION_TIMEOUT_S", WORKFLOW_EXECUTION_TIMEOUT_S * 2)) | |
| # Only use Gemini-specific timeouts if base timeouts weren't explicitly set | |
| if os.environ.get("WEBSOCKET_RECV_TIMEOUT_S") is None: | |
| ws_recv_timeout = gemini_ws_timeout | |
| if os.environ.get("WORKFLOW_EXECUTION_TIMEOUT_S") is None: | |
| workflow_exec_timeout = gemini_workflow_timeout | |
| print(f"worker-comfyui - Detected Gemini node (GeminiImage2Node or GeminiImageDirectNode) in workflow - using extended timeouts:") | |
| print(f"worker-comfyui - Websocket receive timeout: {ws_recv_timeout}s") | |
| print(f"worker-comfyui - Workflow execution timeout: {workflow_exec_timeout}s") | |
| # Make sure that the ComfyUI HTTP API is available before proceeding | |
| if not check_server( | |
| f"http://{COMFY_HOST}/", | |
| COMFY_API_AVAILABLE_MAX_RETRIES, | |
| COMFY_API_AVAILABLE_INTERVAL_MS, | |
| ): | |
| return { | |
| "error": f"ComfyUI server ({COMFY_HOST}) not reachable after multiple retries." | |
| } | |
| # Extract and process image URLs from workflow | |
| print("worker-comfyui - Scanning workflow for image URLs...") | |
| workflow, url_errors = extract_and_process_urls_from_workflow(workflow) | |
| if url_errors: | |
| print(f"worker-comfyui - Warning: {len(url_errors)} error(s) processing image URLs") | |
| # Continue anyway, but log the errors | |
| # Upload input images if they exist | |
| if input_images: | |
| upload_result = upload_images(input_images) | |
| if upload_result["status"] == "error": | |
| # Return upload errors | |
| return { | |
| "error": "Failed to upload one or more input images", | |
| "details": upload_result["details"], | |
| } | |
| ws = None | |
| client_id = str(uuid.uuid4()) | |
| prompt_id = None | |
| output_data = [] | |
| errors = [] | |
| try: | |
| # Establish WebSocket connection | |
| ws_url = f"ws://{COMFY_HOST}/ws?clientId={client_id}" | |
| print(f"worker-comfyui - Connecting to websocket: {ws_url}") | |
| ws = websocket.WebSocket() | |
| ws.connect(ws_url, timeout=10) | |
| print(f"worker-comfyui - Websocket connected") | |
| # Queue the workflow | |
| try: | |
| queued_workflow = queue_workflow(workflow, client_id, extra_data) | |
| prompt_id = queued_workflow.get("prompt_id") | |
| if not prompt_id: | |
| raise ValueError( | |
| f"Missing 'prompt_id' in queue response: {queued_workflow}" | |
| ) | |
| print(f"worker-comfyui - Queued workflow with ID: {prompt_id}") | |
| except requests.RequestException as e: | |
| print(f"worker-comfyui - Error queuing workflow: {e}") | |
| raise ValueError(f"Error queuing workflow: {e}") | |
| except Exception as e: | |
| print(f"worker-comfyui - Unexpected error queuing workflow: {e}") | |
| # For ValueError exceptions from queue_workflow, pass through the original message | |
| if isinstance(e, ValueError): | |
| raise e | |
| else: | |
| raise ValueError(f"Unexpected error queuing workflow: {e}") | |
| # Wait for execution completion via WebSocket | |
| print(f"worker-comfyui - Waiting for workflow execution ({prompt_id})...") | |
| print(f"worker-comfyui - Using websocket receive timeout: {ws_recv_timeout}s, workflow execution timeout: {workflow_exec_timeout}s") | |
| # Set websocket receive timeout | |
| ws.settimeout(ws_recv_timeout) | |
| execution_done = False | |
| start_time = time.time() | |
| while True: | |
| # Check if we've exceeded the workflow execution timeout | |
| elapsed_time = time.time() - start_time | |
| if elapsed_time > workflow_exec_timeout: | |
| timeout_msg = f"Workflow execution exceeded timeout of {workflow_exec_timeout}s (elapsed: {elapsed_time:.1f}s)" | |
| print(f"worker-comfyui - ERROR: {timeout_msg}") | |
| errors.append(timeout_msg) | |
| break | |
| try: | |
| out = ws.recv() | |
| if isinstance(out, str): | |
| message = json.loads(out) | |
| if message.get("type") == "status": | |
| status_data = message.get("data", {}).get("status", {}) | |
| print( | |
| f"worker-comfyui - Status update: {status_data.get('exec_info', {}).get('queue_remaining', 'N/A')} items remaining in queue" | |
| ) | |
| elif message.get("type") == "executing": | |
| data = message.get("data", {}) | |
| if ( | |
| data.get("node") is None | |
| and data.get("prompt_id") == prompt_id | |
| ): | |
| print( | |
| f"worker-comfyui - Execution finished for prompt {prompt_id}" | |
| ) | |
| execution_done = True | |
| break | |
| elif message.get("type") == "execution_error": | |
| data = message.get("data", {}) | |
| if data.get("prompt_id") == prompt_id: | |
| error_message = data.get('exception_message', '') | |
| error_details = f"Node Type: {data.get('node_type')}, Node ID: {data.get('node_id')}, Message: {error_message}" | |
| print( | |
| f"worker-comfyui - Execution error received: {error_details}" | |
| ) | |
| # Check if this is a 504 Gateway Timeout error (common with Gemini API and large images) | |
| is_504_error = ( | |
| '504' in str(error_message) or | |
| 'gateway timeout' in str(error_message).lower() or | |
| 'timeout' in str(error_message).lower() and 'gemini' in str(error_message).lower() | |
| ) | |
| if is_504_error: | |
| print( | |
| f"worker-comfyui - Detected 504 Gateway Timeout error (likely from Gemini API with large images)" | |
| ) | |
| # Store error info for retry logic | |
| errors.append({ | |
| "type": "504_timeout", | |
| "details": error_details, | |
| "node_type": data.get('node_type'), | |
| "node_id": data.get('node_id'), | |
| "message": error_message | |
| }) | |
| else: | |
| errors.append(f"Workflow execution error: {error_details}") | |
| break | |
| else: | |
| continue | |
| except websocket.WebSocketTimeoutException: | |
| elapsed_time = time.time() - start_time | |
| if elapsed_time > workflow_exec_timeout: | |
| timeout_msg = f"Workflow execution exceeded timeout of {workflow_exec_timeout}s (elapsed: {elapsed_time:.1f}s)" | |
| print(f"worker-comfyui - ERROR: {timeout_msg}") | |
| errors.append(timeout_msg) | |
| break | |
| print(f"worker-comfyui - Websocket receive timed out (elapsed: {elapsed_time:.1f}s / {workflow_exec_timeout}s). Still waiting...") | |
| continue | |
| except websocket.WebSocketConnectionClosedException as closed_err: | |
| try: | |
| # Attempt to reconnect | |
| ws = _attempt_websocket_reconnect( | |
| ws_url, | |
| WEBSOCKET_RECONNECT_ATTEMPTS, | |
| WEBSOCKET_RECONNECT_DELAY_S, | |
| closed_err, | |
| ) | |
| print( | |
| "worker-comfyui - Resuming message listening after successful reconnect." | |
| ) | |
| continue | |
| except ( | |
| websocket.WebSocketConnectionClosedException | |
| ) as reconn_failed_err: | |
| # If _attempt_websocket_reconnect fails, it raises this exception | |
| # Let this exception propagate to the outer handler's except block | |
| raise reconn_failed_err | |
| except json.JSONDecodeError: | |
| print(f"worker-comfyui - Received invalid JSON message via websocket.") | |
| # Check if we have 504 timeout errors that should be retried | |
| retry_504_errors = [e for e in errors if isinstance(e, dict) and e.get("type") == "504_timeout"] | |
| if retry_504_errors and GEMINI_504_RETRY_ATTEMPTS > 0: | |
| print(f"worker-comfyui - Found {len(retry_504_errors)} 504 timeout error(s), attempting retry...") | |
| # Close current websocket before retry | |
| if ws and ws.connected: | |
| ws.close() | |
| # Retry the workflow execution with exponential backoff | |
| for retry_attempt in range(GEMINI_504_RETRY_ATTEMPTS): | |
| delay = GEMINI_504_RETRY_DELAY_S * (2 ** retry_attempt) # Exponential backoff: 5s, 10s, 20s | |
| print(f"worker-comfyui - Retry attempt {retry_attempt + 1}/{GEMINI_504_RETRY_ATTEMPTS} after {delay}s delay...") | |
| time.sleep(delay) | |
| # Reconnect websocket | |
| try: | |
| ws = websocket.WebSocket() | |
| ws.connect(ws_url, timeout=10) | |
| print(f"worker-comfyui - Websocket reconnected for retry") | |
| # Re-queue the workflow | |
| try: | |
| queued_workflow = queue_workflow(workflow, client_id, extra_data) | |
| retry_prompt_id = queued_workflow.get("prompt_id") | |
| if not retry_prompt_id: | |
| print(f"worker-comfyui - Retry failed: Missing prompt_id") | |
| break | |
| print(f"worker-comfyui - Retry workflow queued with ID: {retry_prompt_id}") | |
| # Wait for execution completion | |
| retry_execution_done = False | |
| retry_errors = [] | |
| retry_start_time = time.time() | |
| ws.settimeout(ws_recv_timeout) # Set timeout for retry | |
| while True: | |
| # Check timeout for retry | |
| retry_elapsed = time.time() - retry_start_time | |
| if retry_elapsed > workflow_exec_timeout: | |
| timeout_msg = f"Retry workflow execution exceeded timeout of {workflow_exec_timeout}s" | |
| print(f"worker-comfyui - ERROR: {timeout_msg}") | |
| retry_errors.append(timeout_msg) | |
| break | |
| try: | |
| out = ws.recv() | |
| if isinstance(out, str): | |
| message = json.loads(out) | |
| if message.get("type") == "executing": | |
| data = message.get("data", {}) | |
| if ( | |
| data.get("node") is None | |
| and data.get("prompt_id") == retry_prompt_id | |
| ): | |
| print(f"worker-comfyui - Retry execution finished for prompt {retry_prompt_id}") | |
| retry_execution_done = True | |
| prompt_id = retry_prompt_id # Update prompt_id for history fetch | |
| break | |
| elif message.get("type") == "execution_error": | |
| data = message.get("data", {}) | |
| if data.get("prompt_id") == retry_prompt_id: | |
| error_message = data.get('exception_message', '') | |
| error_details = f"Node Type: {data.get('node_type')}, Node ID: {data.get('node_id')}, Message: {error_message}" | |
| retry_errors.append(error_details) | |
| # Check if still a 504 error | |
| is_504_error = ( | |
| '504' in str(error_message) or | |
| 'gateway timeout' in str(error_message).lower() | |
| ) | |
| if not is_504_error: | |
| # Different error, break retry loop | |
| print(f"worker-comfyui - Retry failed with different error: {error_details}") | |
| retry_execution_done = False | |
| break | |
| else: | |
| print(f"worker-comfyui - Retry still got 504 error, will retry again if attempts remain") | |
| retry_execution_done = False | |
| break | |
| except websocket.WebSocketTimeoutException: | |
| retry_elapsed = time.time() - retry_start_time | |
| if retry_elapsed > workflow_exec_timeout: | |
| timeout_msg = f"Retry workflow execution exceeded timeout of {workflow_exec_timeout}s" | |
| print(f"worker-comfyui - ERROR: {timeout_msg}") | |
| retry_errors.append(timeout_msg) | |
| break | |
| continue | |
| except websocket.WebSocketConnectionClosedException: | |
| print(f"worker-comfyui - Websocket closed during retry, attempting reconnect...") | |
| ws = _attempt_websocket_reconnect( | |
| ws_url, | |
| WEBSOCKET_RECONNECT_ATTEMPTS, | |
| WEBSOCKET_RECONNECT_DELAY_S, | |
| websocket.WebSocketConnectionClosedException("Connection closed during retry") | |
| ) | |
| continue | |
| except json.JSONDecodeError: | |
| continue | |
| if retry_execution_done: | |
| print(f"worker-comfyui - Retry succeeded!") | |
| errors = [] # Clear 504 errors since retry succeeded | |
| execution_done = True | |
| break | |
| elif retry_attempt < GEMINI_504_RETRY_ATTEMPTS - 1: | |
| print(f"worker-comfyui - Retry attempt {retry_attempt + 1} failed, will retry again...") | |
| if ws and ws.connected: | |
| ws.close() | |
| continue | |
| else: | |
| print(f"worker-comfyui - All retry attempts exhausted") | |
| errors = [f"Workflow execution error (504 Gateway Timeout): {e.get('details', 'Unknown error')}" for e in retry_504_errors] | |
| if retry_errors: | |
| errors.extend([f"Retry error: {e}" for e in retry_errors]) | |
| break | |
| except Exception as e: | |
| print(f"worker-comfyui - Error during retry attempt {retry_attempt + 1}: {e}") | |
| if retry_attempt < GEMINI_504_RETRY_ATTEMPTS - 1: | |
| if ws and ws.connected: | |
| ws.close() | |
| continue | |
| else: | |
| errors = [f"Workflow execution error (504 Gateway Timeout): {err.get('details', 'Unknown error')}" for err in retry_504_errors] | |
| errors.append(f"Retry failed: {str(e)}") | |
| break | |
| except Exception as e: | |
| print(f"worker-comfyui - Error reconnecting websocket for retry: {e}") | |
| if retry_attempt < GEMINI_504_RETRY_ATTEMPTS - 1: | |
| continue | |
| else: | |
| errors = [f"Workflow execution error (504 Gateway Timeout): {err.get('details', 'Unknown error')}" for err in retry_504_errors] | |
| errors.append(f"Failed to reconnect for retry: {str(e)}") | |
| break | |
| # Convert dict errors back to strings for consistency | |
| errors = [e if isinstance(e, str) else e.get('details', str(e)) for e in errors] | |
| if not execution_done and not errors: | |
| raise ValueError( | |
| "Workflow monitoring loop exited without confirmation of completion or error." | |
| ) | |
| # Fetch history even if there were execution errors, some outputs might exist | |
| print(f"worker-comfyui - Fetching history for prompt {prompt_id}...") | |
| history = get_history(prompt_id) | |
| if prompt_id not in history: | |
| error_msg = f"Prompt ID {prompt_id} not found in history after execution." | |
| print(f"worker-comfyui - {error_msg}") | |
| if not errors: | |
| return {"error": error_msg} | |
| else: | |
| errors.append(error_msg) | |
| return { | |
| "error": "Job processing failed, prompt ID not found in history.", | |
| "details": errors, | |
| } | |
| prompt_history = history.get(prompt_id, {}) | |
| outputs = prompt_history.get("outputs", {}) | |
| if not outputs: | |
| warning_msg = f"No outputs found in history for prompt {prompt_id}." | |
| print(f"worker-comfyui - {warning_msg}") | |
| if not errors: | |
| errors.append(warning_msg) | |
| print(f"worker-comfyui - Processing {len(outputs)} output nodes...") | |
| for node_id, node_output in outputs.items(): | |
| if "images" in node_output: | |
| print( | |
| f"worker-comfyui - Node {node_id} contains {len(node_output['images'])} image(s)" | |
| ) | |
| for image_info in node_output["images"]: | |
| filename = image_info.get("filename") | |
| subfolder = image_info.get("subfolder", "") | |
| img_type = image_info.get("type") | |
| # skip temp images | |
| if img_type == "temp": | |
| print( | |
| f"worker-comfyui - Skipping image {filename} because type is 'temp'" | |
| ) | |
| continue | |
| if not filename: | |
| warn_msg = f"Skipping image in node {node_id} due to missing filename: {image_info}" | |
| print(f"worker-comfyui - {warn_msg}") | |
| errors.append(warn_msg) | |
| continue | |
| image_bytes = get_image_data(filename, subfolder, img_type) | |
| if image_bytes: | |
| # Determine file extension - prioritize custom path extension if available, otherwise use filename | |
| custom_path = s3_config.get("image_output_path") if s3_config else None | |
| if custom_path: | |
| # Extract extension from custom path (e.g., /collections/test123/test_out.png -> .png) | |
| file_extension = os.path.splitext(custom_path)[1] or os.path.splitext(filename)[1] or ".png" | |
| else: | |
| # Use extension from ComfyUI filename | |
| file_extension = os.path.splitext(filename)[1] or ".png" | |
| image_size = len(image_bytes) | |
| image_size_mb = image_size / (1024 * 1024) | |
| # Check if this is a video file (common extensions) | |
| is_video = file_extension.lower() in ('.mp4', '.webm', '.avi', '.mov', '.mkv', '.flv', '.m4v') | |
| # Check if S3 upload is configured | |
| use_s3 = is_s3_configured() | |
| # Estimate base64 size (base64 increases size by ~33%) | |
| estimated_base64_size = int(image_size * 1.34) | |
| # For large files (especially 4K content), require S3 or fail | |
| # 4K images/videos can easily be 10-50MB+ which would exceed response limits | |
| if image_size > REQUIRE_S3_SIZE_THRESHOLD: | |
| if not use_s3: | |
| error_msg = ( | |
| f"File {filename} is too large ({image_size_mb:.2f} MB raw, ~{estimated_base64_size / (1024*1024):.2f} MB base64) " | |
| f"to return in response. S3 upload is required for files larger than {REQUIRE_S3_SIZE_THRESHOLD / (1024*1024):.1f} MB. " | |
| f"Please configure BUCKET_ENDPOINT_URL, BUCKET_NAME, BUCKET_ACCESS_KEY_ID, and BUCKET_SECRET_ACCESS_KEY environment variables. " | |
| f"See docs/s3_setup_guide.md for Cloudflare R2 configuration instructions." | |
| ) | |
| print(f"worker-comfyui - ERROR: {error_msg}") | |
| errors.append(error_msg) | |
| continue # Skip this file | |
| else: | |
| print( | |
| f"worker-comfyui - File {filename} is large ({image_size_mb:.2f} MB), using S3 upload" | |
| ) | |
| if use_s3 or image_size > REQUIRE_S3_SIZE_THRESHOLD: | |
| # Use S3 upload when configured or required | |
| try: | |
| with tempfile.NamedTemporaryFile( | |
| suffix=file_extension, delete=False | |
| ) as temp_file: | |
| temp_file.write(image_bytes) | |
| temp_file_path = temp_file.name | |
| print( | |
| f"worker-comfyui - Wrote file bytes to temporary file: {temp_file_path}" | |
| ) | |
| print(f"worker-comfyui - Uploading {filename} to S3...") | |
| # Use boto3 to upload to Cloudflare R2 | |
| import boto3 | |
| from botocore.config import Config | |
| endpoint_url = os.environ.get("BUCKET_ENDPOINT_URL") | |
| bucket_name = os.environ.get("BUCKET_NAME") | |
| access_key = os.environ.get("BUCKET_ACCESS_KEY_ID") | |
| secret_key = os.environ.get("BUCKET_SECRET_ACCESS_KEY") | |
| region = os.environ.get("S3_REGION", "auto") | |
| print(f"worker-comfyui - DEBUG: bucket_name from env = {bucket_name}") | |
| print(f"worker-comfyui - DEBUG: job_id = {job_id}") | |
| # Create S3 client for S3-compatible storage | |
| boto3_config = Config(signature_version='s3v4') | |
| s3_client = boto3.client( | |
| 's3', | |
| endpoint_url=endpoint_url, | |
| aws_access_key_id=access_key, | |
| aws_secret_access_key=secret_key, | |
| region_name=region, | |
| config=boto3_config | |
| ) | |
| # Determine S3 key: use custom path from request s3_config if provided, otherwise use default | |
| # This s3_key will be used for both upload AND response URL to ensure consistency | |
| print(f"worker-comfyui - DEBUG: s3_config = {s3_config}, type = {type(s3_config)}") | |
| custom_path = s3_config.get("image_output_path") if s3_config else None | |
| print(f"worker-comfyui - DEBUG: custom_path = {custom_path}") | |
| if custom_path: | |
| # Use custom path from request, ensuring it doesn't start with a slash (S3 keys shouldn't) | |
| s3_key = custom_path.lstrip("/") | |
| print(f"worker-comfyui - Using custom S3 path from request: {s3_key}") | |
| else: | |
| # Default: use job_id/filename when no custom path provided | |
| s3_key = f"{job_id}/{filename}" | |
| print(f"worker-comfyui - Using default S3 path: {s3_key}") | |
| # Determine ContentType based on file extension (required for Supabase Storage) | |
| # Use the extension we determined earlier (from custom path if available, otherwise from filename) | |
| # Default to 'image/png' since PNG is the most commonly used format | |
| mime_types = { | |
| '.png': 'image/png', # Most common format - default fallback | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.webp': 'image/webp', | |
| '.gif': 'image/gif', | |
| '.mp4': 'video/mp4', | |
| '.webm': 'video/webm', | |
| '.avi': 'video/x-msvideo', | |
| '.mov': 'video/quicktime', | |
| '.mkv': 'video/x-matroska', | |
| '.flv': 'video/x-flv', | |
| '.m4v': 'video/x-m4v', | |
| } | |
| # Get ContentType from extension, defaulting to 'image/png' if extension not recognized | |
| content_type = mime_types.get(file_extension.lower(), 'image/png') | |
| print(f"worker-comfyui - ContentType set to: {content_type} (from extension: {file_extension})") | |
| # Upload with ContentType specified (required for Supabase Storage) | |
| # Use upload_file for large files - it automatically uses multipart upload for files > 8MB | |
| # This handles files larger than the PutObject limit (typically 5GB, but some services have lower limits) | |
| # Configure multipart threshold lower to ensure multipart is used for large files | |
| from botocore.config import Config | |
| from botocore.exceptions import ClientError | |
| # Configure boto3 to use multipart upload for files larger than 5MB | |
| # This ensures large files are uploaded in chunks | |
| multipart_config = Config( | |
| multipart_threshold=5 * 1024 * 1024, # 5MB threshold - use multipart for files > 5MB | |
| max_multipart_size=64 * 1024 * 1024, # 64MB per part (max allowed by S3) | |
| ) | |
| s3_client_multipart = boto3.client( | |
| 's3', | |
| endpoint_url=endpoint_url, | |
| aws_access_key_id=access_key, | |
| aws_secret_access_key=secret_key, | |
| region_name=region, | |
| config=multipart_config | |
| ) | |
| # upload_file automatically handles multipart uploads for large files | |
| # ExtraArgs allows us to set ContentType and other metadata | |
| file_size_mb = os.path.getsize(temp_file_path) / (1024 * 1024) | |
| print(f"worker-comfyui - Uploading {filename} ({file_size_mb:.2f} MB) using multipart upload...") | |
| s3_client_multipart.upload_file( | |
| Filename=temp_file_path, | |
| Bucket=bucket_name, | |
| Key=s3_key, | |
| ExtraArgs={ | |
| 'ContentType': content_type | |
| } | |
| ) | |
| print(f"worker-comfyui - Successfully uploaded {filename} to S3") | |
| # Construct the public URL using the same s3_key that was used for upload | |
| # This ensures the response URL matches the actual uploaded file location | |
| # Use BUCKET_PUBLIC_URL if configured, otherwise construct from endpoint URL | |
| public_url = os.environ.get("BUCKET_PUBLIC_URL") | |
| if public_url: | |
| # Use configured public URL (e.g., https://pub-xxx.r2.dev or Supabase public URL) | |
| # Ensure it doesn't end with a slash | |
| public_url = public_url.rstrip("/") | |
| # Include bucket name in the URL: publicendpoint/bucketname/path | |
| s3_url = f"{public_url}/{bucket_name}/{s3_key}" | |
| else: | |
| # Fallback: construct from endpoint URL | |
| parsed = urllib.parse.urlparse(endpoint_url) | |
| hostname = parsed.hostname or "" | |
| # Check if this is Supabase Storage | |
| if 'storage.supabase.co' in hostname: | |
| # Supabase public URL format: https://<project-ref>.supabase.co/storage/v1/object/public/<bucket-name>/<key> | |
| # Extract project ref from hostname (e.g., cpelxqvcjnbpnphttzsn.storage.supabase.co -> cpelxqvcjnbpnphttzsn) | |
| project_ref = hostname.split('.')[0] | |
| s3_url = f"https://{project_ref}.supabase.co/storage/v1/object/public/{bucket_name}/{s3_key}" | |
| elif 'r2.cloudflarestorage.com' in hostname: | |
| # Cloudflare R2 public URL format: https://<account-id>.r2.cloudflarestorage.com/<bucket-name>/<key> | |
| account_id = hostname.split('.')[0] | |
| s3_url = f"https://{account_id}.r2.cloudflarestorage.com/{bucket_name}/{s3_key}" | |
| else: | |
| # Fallback: construct from endpoint URL | |
| base_url = endpoint_url.rstrip("/") | |
| s3_url = f"{base_url}/{bucket_name}/{s3_key}" | |
| print(f"worker-comfyui - Uploaded {filename} to S3: {s3_url}") | |
| os.remove(temp_file_path) # Clean up temp file | |
| # Use custom path filename if provided, otherwise use original ComfyUI filename | |
| response_filename = filename | |
| if custom_path: | |
| # Extract filename from custom path (e.g., /collections/test123/test_out.png -> test_out.png) | |
| response_filename = os.path.basename(custom_path) | |
| print(f"worker-comfyui - Using custom path filename in response: {response_filename}") | |
| # Append dictionary with filename and URL | |
| output_data.append( | |
| { | |
| "filename": response_filename, | |
| "type": "s3_url", | |
| "data": s3_url, | |
| } | |
| ) | |
| except Exception as e: | |
| error_msg = f"Error uploading {filename} to S3: {e}" | |
| print(f"worker-comfyui - {error_msg}") | |
| error_msg += " Verify your S3 credentials (Access Key ID, Secret Access Key) and bucket name are correct." | |
| errors.append(error_msg) | |
| if "temp_file_path" in locals() and os.path.exists( | |
| temp_file_path | |
| ): | |
| try: | |
| os.remove(temp_file_path) | |
| except OSError as rm_err: | |
| print( | |
| f"worker-comfyui - Error removing temp file {temp_file_path}: {rm_err}" | |
| ) | |
| else: | |
| # Return as base64 string (small file, S3 not configured) | |
| # Log size warning for approaching threshold | |
| if image_size > REQUIRE_S3_SIZE_THRESHOLD * 0.5: # > 50% of threshold | |
| print( | |
| f"worker-comfyui - WARNING: File {filename} is moderately large ({image_size_mb:.2f} MB). " | |
| f"Consider configuring S3 upload (BUCKET_ENDPOINT_URL, BUCKET_NAME) for better reliability with large files. " | |
| f"See docs/s3_setup_guide.md for setup instructions." | |
| ) | |
| try: | |
| base64_image = base64.b64encode(image_bytes).decode("utf-8") | |
| # Use custom path filename if provided, otherwise use original ComfyUI filename | |
| response_filename = filename | |
| if custom_path: | |
| # Extract filename from custom path (e.g., /collections/test123/test_out.png -> test_out.png) | |
| response_filename = os.path.basename(custom_path) | |
| print(f"worker-comfyui - Using custom path filename in response: {response_filename}") | |
| # Append dictionary with filename and base64 data | |
| output_data.append( | |
| { | |
| "filename": response_filename, | |
| "type": "base64", | |
| "data": base64_image, | |
| } | |
| ) | |
| file_type_str = "video" if is_video else "image" | |
| print(f"worker-comfyui - Encoded {response_filename} ({file_type_str}) as base64 ({image_size / 1024:.1f} KB)") | |
| except Exception as e: | |
| error_msg = f"Error encoding {filename} to base64: {e}" | |
| print(f"worker-comfyui - {error_msg}") | |
| errors.append(error_msg) | |
| else: | |
| error_msg = f"Failed to fetch image data for {filename} from /view endpoint." | |
| errors.append(error_msg) | |
| # Check for other output types | |
| other_keys = [k for k in node_output.keys() if k != "images"] | |
| if other_keys: | |
| warn_msg = ( | |
| f"Node {node_id} produced unhandled output keys: {other_keys}." | |
| ) | |
| print(f"worker-comfyui - WARNING: {warn_msg}") | |
| print( | |
| f"worker-comfyui - --> If this output is useful, please consider opening an issue on GitHub to discuss adding support." | |
| ) | |
| except websocket.WebSocketException as e: | |
| print(f"worker-comfyui - WebSocket Error: {e}") | |
| print(traceback.format_exc()) | |
| return {"error": f"WebSocket communication error: {e}"} | |
| except requests.RequestException as e: | |
| print(f"worker-comfyui - HTTP Request Error: {e}") | |
| print(traceback.format_exc()) | |
| return {"error": f"HTTP communication error with ComfyUI: {e}"} | |
| except ValueError as e: | |
| print(f"worker-comfyui - Value Error: {e}") | |
| print(traceback.format_exc()) | |
| return {"error": str(e)} | |
| except Exception as e: | |
| print(f"worker-comfyui - Unexpected Handler Error: {e}") | |
| print(traceback.format_exc()) | |
| return {"error": f"An unexpected error occurred: {e}"} | |
| finally: | |
| if ws and ws.connected: | |
| print(f"worker-comfyui - Closing websocket connection.") | |
| ws.close() | |
| final_result = {} | |
| if output_data: | |
| final_result["images"] = output_data | |
| if errors: | |
| final_result["errors"] = errors | |
| print(f"worker-comfyui - Job completed with errors/warnings: {errors}") | |
| if not output_data and errors: | |
| print(f"worker-comfyui - Job failed with no output images.") | |
| # Check if any errors are 504-related and provide helpful suggestions | |
| error_str = " ".join(errors).lower() | |
| if "504" in error_str or "gateway timeout" in error_str or ("timeout" in error_str and "gemini" in error_str): | |
| suggestion = ( | |
| "504 Gateway Timeout errors from Gemini API typically occur with large images (4K+). " | |
| "Suggestions:\n" | |
| "1. Increase timeouts for large images by setting environment variables:\n" | |
| " - WEBSOCKET_RECV_TIMEOUT_S=600 (10 minutes for websocket receive)\n" | |
| " - WORKFLOW_EXECUTION_TIMEOUT_S=1200 (20 minutes for workflow execution)\n" | |
| "2. Reduce image resolution before sending to Gemini (e.g., resize to 2K or lower)\n" | |
| "3. Compress images before sending (use prepare_request.py with --max-width and --max-height flags)\n" | |
| "4. Reduce the number of images sent to Gemini in a single request\n" | |
| "5. Use lower resolution settings in GeminiImage2Node (e.g., '2K' instead of '4K')\n" | |
| "6. The handler will automatically retry 504 errors up to 3 times (configurable via GEMINI_504_RETRY_ATTEMPTS)" | |
| ) | |
| return { | |
| "error": "Job processing failed (504 Gateway Timeout from Gemini API)", | |
| "details": errors, | |
| "suggestion": suggestion | |
| } | |
| return { | |
| "error": "Job processing failed", | |
| "details": errors, | |
| } | |
| elif not output_data and not errors: | |
| print( | |
| f"worker-comfyui - Job completed successfully, but the workflow produced no images." | |
| ) | |
| final_result["status"] = "success_no_images" | |
| final_result["images"] = [] | |
| # Calculate and log response size before returning | |
| response_size = calculate_response_size(final_result) | |
| response_size_mb = response_size / (1024 * 1024) | |
| print(f"worker-comfyui - Response size: {response_size:,} bytes ({response_size_mb:.2f} MB)") | |
| # Check if response is too large | |
| if response_size > MAX_RESPONSE_SIZE_BYTES: | |
| size_limit_mb = MAX_RESPONSE_SIZE_BYTES / (1024 * 1024) | |
| storage_help = ( | |
| "Configure S3-compatible storage (Cloudflare R2, Supabase Storage, AWS S3, etc.) by setting " | |
| "BUCKET_ENDPOINT_URL, BUCKET_NAME, BUCKET_ACCESS_KEY_ID, and BUCKET_SECRET_ACCESS_KEY. " | |
| "See docs/s3_setup_guide.md for instructions." | |
| ) | |
| error_msg = ( | |
| f"Response payload too large ({response_size_mb:.2f} MB) exceeds limit ({size_limit_mb:.2f} MB). " | |
| f"This typically happens when returning base64-encoded images. " | |
| f"To fix this, {storage_help} " | |
| f"Alternatively, reduce image size/quality in your workflow." | |
| ) | |
| print(f"worker-comfyui - ERROR: {error_msg}") | |
| return { | |
| "error": "Response payload too large", | |
| "details": error_msg, | |
| "response_size_bytes": response_size, | |
| "max_size_bytes": MAX_RESPONSE_SIZE_BYTES, | |
| "suggestion": "Configure S3 upload (see docs/s3_setup_guide.md) or reduce image size/quality" | |
| } | |
| elif response_size > MAX_RESPONSE_SIZE_BYTES * 0.8: | |
| # Warn if approaching the limit | |
| print(f"worker-comfyui - WARNING: Response size ({response_size_mb:.2f} MB) is approaching the limit. Consider using S3 upload for large images.") | |
| print(f"worker-comfyui - Job completed. Returning {len(output_data)} image(s).") | |
| return final_result | |
| if __name__ == "__main__": | |
| print("worker-comfyui - Starting handler...") | |
| runpod.serverless.start({"handler": handler}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment