Last active
May 20, 2025 17:59
-
-
Save DraconicDragon/40b636554697237493e95c04cc0d68bd to your computer and use it in GitHub Desktop.
Python script to read stealth (and normal usercomment) metadata from a given WebP (and probably PNG) image like how reForge does it. Ref.: https://github.com/Panchovix/stable-diffusion-webui-reForge/pull/361 and https://github.com/Panchovix/stable-diffusion-webui-reForge/blob/main/modules/stealth_infotext.py as well as the gist which i use to co…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import gzip | |
| import logging | |
| from pathlib import Path | |
| import piexif | |
| from PIL import Image | |
| # ─── Configuration ───────────────────────────────────────────────────────────── | |
| EXTRACT_STEALTH_METADATA = False # Extract stealth metadata from embedded bits | |
| EXTRACT_USERCOMMENT_METADATA = False # Extract EXIF UserComment metadata | |
| SHOW_IMAGE_PREVIEW = False # Opens image viewer | |
| SCRIPT_DIR = Path(__file__).resolve().parent | |
| # ─── Colorful Logging ────────────────────────────────────────────────────────── | |
| RESET = "\033[0m" | |
| COLOR_DEBUG = "\033[92m" # Green | |
| COLOR_INFO = "\033[94m" # Bright Blue | |
| COLOR_WARNING = "\033[93m" # Yellow | |
| COLOR_ERROR = "\033[91m" # Red | |
| COLOR_CRITICAL = "\033[95m" # Magenta | |
| class ColorFormatter(logging.Formatter): | |
| COLORS = { | |
| "DEBUG": COLOR_DEBUG, | |
| "INFO": COLOR_INFO, | |
| "WARNING": COLOR_WARNING, | |
| "ERROR": COLOR_ERROR, | |
| "CRITICAL": COLOR_CRITICAL, | |
| } | |
| def format(self, record): | |
| msg = super().format(record) | |
| color = self.COLORS.get(record.levelname, RESET) | |
| return f"{color}{msg}{RESET}" | |
| handler = logging.StreamHandler() | |
| formatter = ColorFormatter("%(levelname)s: %(message)s") | |
| handler.setFormatter(formatter) | |
| logger = logging.getLogger() | |
| logger.handlers = [] # Clear existing handlers | |
| logger.addHandler(handler) | |
| logger.setLevel(logging.INFO) | |
| # ─── Startup Settings Printout ────────────────────────────────────────────────── | |
| def print_settings(): | |
| CYAN = "\033[96m" | |
| # WHITE = "\033[97m" | |
| BRIGHT_GREEN = "\033[92m" | |
| RED = "\033[91m" | |
| settings = { | |
| "Extract Stealth?": EXTRACT_STEALTH_METADATA, | |
| "Extract UserComment?": EXTRACT_USERCOMMENT_METADATA, | |
| "Show Image Preview?": SHOW_IMAGE_PREVIEW, | |
| } | |
| for key, value in settings.items(): | |
| color = BRIGHT_GREEN if value else RED | |
| print(f"{CYAN}{key}:{RESET} {color}{value}{RESET}") | |
| print() # Blank line for spacing | |
| # ─── Stealth Metadata Extraction (New Logic) ──────────────────────────────── | |
| def read_info_from_image_stealth(image): | |
| width, height = image.size | |
| total_pixels = width * height | |
| pixels_processed = 0 | |
| logging.debug(f"Starting stealth metadata search in {width}x{height} image ({total_pixels} pixels)") | |
| # Use a more efficient approach for WebP files | |
| if hasattr(image, "format") and image.format == "WEBP": | |
| logging.debug("WebP image detected, optimizing processing method") | |
| # Convert to RGB/RGBA mode if not already to ensure consistent processing | |
| if image.mode not in ("RGB", "RGBA"): | |
| image = image.convert("RGBA" if "transparency" in image.info else "RGB") | |
| # Determine if image has alpha channel | |
| has_alpha = image.mode == "RGBA" | |
| pixels = image.load() | |
| # Initialize variables | |
| mode = None | |
| compressed = False | |
| binary_data = "" | |
| buffer_a = bytearray() | |
| buffer_rgb = bytearray() | |
| index_a = 0 | |
| index_rgb = 0 | |
| sig_confirmed = False | |
| confirming_signature = True | |
| reading_param_len = False | |
| reading_param = False | |
| read_end = False | |
| import time | |
| start_time = time.time() | |
| try: | |
| use_tqdm = False | |
| report_progress = False | |
| if total_pixels > 3500000: # 3.5 megapixels | |
| try: | |
| from tqdm import tqdm | |
| use_tqdm = True | |
| except ImportError: | |
| logging.warning("tqdm not found, falling back to time-based progress reporting.") | |
| report_progress = True | |
| last_report_time = 0 | |
| report_interval = 1.0 # Report progress every 1 second | |
| else: | |
| logging.debug("Image size is below 4MP, skipping tqdm and progress reporting.") | |
| if use_tqdm: | |
| pbar = tqdm(total=total_pixels, desc="Processing pixels", unit="px") | |
| for x in range(width): | |
| for y in range(height): | |
| pixels_processed += 1 | |
| if use_tqdm: | |
| pbar.update(1) | |
| elif report_progress: | |
| # Progress reporting | |
| current_time = time.time() | |
| if current_time - last_report_time >= report_interval: | |
| progress = pixels_processed / total_pixels * 100 | |
| elapsed = current_time - start_time | |
| logging.info( | |
| f"Processing: {progress:.1f}% complete ({pixels_processed}/{total_pixels} pixels, {elapsed:.1f}s elapsed)" | |
| ) | |
| last_report_time = current_time | |
| # Get pixel data | |
| if has_alpha: | |
| r, g, b, a = pixels[x, y] | |
| # Use more efficient bit manipulation | |
| buffer_a.append(a & 1) | |
| index_a += 1 | |
| else: | |
| r, g, b = pixels[x, y] | |
| # Use more efficient bit manipulation | |
| buffer_rgb.append(r & 1) | |
| buffer_rgb.append(g & 1) | |
| buffer_rgb.append(b & 1) | |
| index_rgb += 3 | |
| # Signature detection | |
| if confirming_signature: | |
| # Check alpha channel signature | |
| if has_alpha and index_a == len("stealth_pnginfo") * 8: | |
| # Convert bits to bytes more efficiently | |
| sig_bytes = bytearray() | |
| for i in range(0, len(buffer_a), 8): | |
| byte = 0 | |
| for bit_idx in range(8): | |
| if i + bit_idx < len(buffer_a): | |
| byte |= buffer_a[i + bit_idx] << (7 - bit_idx) | |
| sig_bytes.append(byte) | |
| try: | |
| decoded_sig = sig_bytes.decode("utf-8", errors="ignore") | |
| if decoded_sig in {"stealth_pnginfo", "stealth_pngcomp"}: | |
| logging.debug(f"Found signature in alpha channel: {decoded_sig}") | |
| confirming_signature = False | |
| sig_confirmed = True | |
| reading_param_len = True | |
| mode = "alpha" | |
| if decoded_sig == "stealth_pngcomp": | |
| compressed = True | |
| buffer_a = bytearray() | |
| index_a = 0 | |
| else: | |
| logging.debug(f"Invalid alpha signature: {decoded_sig}") | |
| except Exception as e: | |
| logging.debug(f"Error decoding potential alpha signature: {e}") | |
| # Check RGB channel signature | |
| elif index_rgb == len("stealth_pnginfo") * 8: | |
| # Convert bits to bytes more efficiently | |
| sig_bytes = bytearray() | |
| for i in range(0, len(buffer_rgb), 8): | |
| byte = 0 | |
| for bit_idx in range(8): | |
| if i + bit_idx < len(buffer_rgb): | |
| byte |= buffer_rgb[i + bit_idx] << (7 - bit_idx) | |
| sig_bytes.append(byte) | |
| try: | |
| decoded_sig = sig_bytes.decode("utf-8", errors="ignore") | |
| if decoded_sig in {"stealth_rgbinfo", "stealth_rgbcomp"}: | |
| logging.info(f"Found signature in RGB channels: {decoded_sig}") | |
| confirming_signature = False | |
| sig_confirmed = True | |
| reading_param_len = True | |
| mode = "rgb" | |
| if decoded_sig == "stealth_rgbcomp": | |
| compressed = True | |
| buffer_rgb = bytearray() | |
| index_rgb = 0 | |
| else: | |
| logging.debug(f"Invalid RGB signature: {decoded_sig}") | |
| except Exception as e: | |
| logging.debug(f"Error decoding potential RGB signature: {e}") | |
| # Process parameter length | |
| elif reading_param_len: | |
| if mode == "alpha" and index_a == 32: | |
| # More efficient binary to int conversion | |
| param_len = 0 | |
| for bit in buffer_a: | |
| param_len = (param_len << 1) | bit | |
| logging.debug(f"Alpha metadata length: {param_len} bits") | |
| reading_param_len = False | |
| reading_param = True | |
| buffer_a = bytearray() | |
| index_a = 0 | |
| elif mode == "rgb" and index_rgb == 33: | |
| # More efficient binary to int conversion | |
| pop = buffer_rgb[-1] | |
| buffer_rgb = buffer_rgb[:-1] | |
| param_len = 0 | |
| for bit in buffer_rgb: | |
| param_len = (param_len << 1) | bit | |
| logging.info(f"RGB metadata length: {param_len} bits") | |
| reading_param_len = False | |
| reading_param = True | |
| buffer_rgb = bytearray([pop]) | |
| index_rgb = 1 | |
| # Read the actual parameter data | |
| elif reading_param: | |
| if mode == "alpha" and index_a == param_len: | |
| binary_data = buffer_a | |
| logging.debug("Finished reading alpha channel metadata") | |
| read_end = True | |
| break | |
| elif mode == "rgb" and index_rgb >= param_len: | |
| diff = param_len - index_rgb | |
| if diff < 0: | |
| buffer_rgb = buffer_rgb[:diff] | |
| binary_data = buffer_rgb | |
| logging.debug("Finished reading RGB channel metadata") | |
| read_end = True | |
| break | |
| if read_end: | |
| break | |
| # Processing completed stats | |
| elapsed = time.time() - start_time | |
| logging.debug( | |
| f"Metadata extraction completed in {elapsed:.2f} seconds ({pixels_processed}/{total_pixels} pixels processed)" | |
| ) | |
| if use_tqdm: | |
| pbar.close() | |
| # Convert binary data to text if signature was found | |
| if sig_confirmed and len(binary_data) > 0: | |
| try: | |
| # Convert bits to bytes more efficiently | |
| byte_data = bytearray() | |
| for i in range(0, len(binary_data), 8): | |
| if i + 8 <= len(binary_data): | |
| byte = 0 | |
| for bit_idx in range(8): | |
| byte |= int(binary_data[i + bit_idx]) << (7 - bit_idx) | |
| byte_data.append(byte) | |
| # Decompress if needed and decode | |
| if compressed: | |
| logging.debug("Decompressing metadata...") | |
| decoded_data = gzip.decompress(bytes(byte_data)).decode("utf-8") | |
| else: | |
| decoded_data = byte_data.decode("utf-8", errors="ignore") | |
| return decoded_data | |
| except Exception as e: | |
| logging.error(f"Failed decoding stealth metadata: {e}") | |
| elif not sig_confirmed: | |
| logging.info("No stealth metadata signature detected") | |
| else: | |
| logging.warning("Signature found but no valid metadata extracted") | |
| except Exception as e: | |
| logging.error(f"Error during stealth metadata extraction: {e}") | |
| if use_tqdm: | |
| pbar.close() | |
| return None | |
| # ─── EXIF UserComment Extraction ─────────────────────────────────────────────── | |
| def extract_usercomment(image_path): | |
| """Extract UserComment from EXIF data.""" | |
| try: | |
| with Image.open(image_path) as img: | |
| if img.format == "WEBP": | |
| # Handle WebP using Pillow's Exif methods | |
| exif_data = img.getexif() | |
| if not exif_data: | |
| return None | |
| exif_ifd = exif_data.get_ifd(0x8769) # ExifIFD tag | |
| user_comment_tag = 0x9286 # UserComment tag | |
| if user_comment_tag not in exif_ifd: | |
| return None | |
| user_comment_bytes = exif_ifd[user_comment_tag] | |
| # Skip 8-byte encoding marker if present | |
| if len(user_comment_bytes) >= 8: | |
| # Try UTF-8 first, fall back to UTF-16 if needed | |
| try: | |
| return user_comment_bytes[8:].decode("utf-8") | |
| except UnicodeDecodeError: | |
| return user_comment_bytes[8:].decode("utf-16", errors="replace") | |
| else: | |
| return user_comment_bytes.decode("utf-8", errors="replace") | |
| else: | |
| # For non-WebP formats | |
| exif_dict = piexif.load(str(image_path)) | |
| if "Exif" in exif_dict and piexif.ExifIFD.UserComment in exif_dict["Exif"]: | |
| return piexif.helper.UserComment.load(exif_dict["Exif"][piexif.ExifIFD.UserComment]) | |
| except Exception: | |
| pass | |
| return None | |
| # ─── Process Single Image ───────────────────────────────────────────────────── | |
| def process_image(image_path): | |
| """Process a single image and extract metadata.""" | |
| try: | |
| logging.info(f"Processing: {image_path}") | |
| # Check file size first | |
| file_size = Path(image_path).stat().st_size / (1024 * 1024) # Size in MB | |
| if file_size > 10: # If file is larger than 10MB | |
| logging.warning(f"Large file detected ({file_size:.1f}MB). Processing may take longer.") | |
| # Open the image with better error handling for WebP | |
| try: | |
| with Image.open(image_path) as img: | |
| # Display image format and size info | |
| logging.debug(f"Image format: {img.format}, Size: {img.width}x{img.height}, Mode: {img.mode}") | |
| if SHOW_IMAGE_PREVIEW: | |
| img.show() | |
| # Create a copy of the image for extracting hidden metadata | |
| img_copy = img.copy() | |
| except Exception as e: | |
| logging.error(f"Failed to open image: {e}") | |
| return False | |
| metadata_found = False | |
| # Extract stealth metadata using the improved logic if enabled | |
| if EXTRACT_STEALTH_METADATA: | |
| stealth_text = read_info_from_image_stealth(img_copy) | |
| if stealth_text: | |
| print("=" * 80) | |
| print(f"{COLOR_INFO}🔍 STEALTH METADATA FOUND:{RESET}") | |
| print("-" * 80) | |
| print(stealth_text) | |
| print("=" * 80) | |
| metadata_found = True | |
| else: | |
| logging.info("No stealth metadata found in the image.") | |
| # Extract EXIF UserComment metadata if enabled | |
| if EXTRACT_USERCOMMENT_METADATA: | |
| usercomment = extract_usercomment(image_path) | |
| if usercomment: | |
| usercomment = usercomment.rstrip() | |
| print("=" * 80) | |
| print(f"{COLOR_INFO}📝 EXIF USERCOMMENT FOUND:{RESET}") | |
| print("-" * 80) | |
| print(usercomment) | |
| print("=" * 80) | |
| metadata_found = True | |
| else: | |
| logging.info("No EXIF UserComment metadata found.") | |
| # For PNG files, also print standard PNG text metadata if it exists | |
| if image_path.suffix.lower() == ".png": | |
| with Image.open(image_path) as img: | |
| if hasattr(img, "text") and img.text: # type: ignore | |
| print("=" * 80) | |
| print(f"{COLOR_INFO}📋 PNG tEXt METADATA:{RESET}") | |
| print("-" * 80) | |
| for key, value in img.text.items(): # type: ignore | |
| print(f"{key}: {value}") | |
| print("=" * 80) | |
| metadata_found = True | |
| else: | |
| logging.info("No PNG text metadata found.") | |
| if not metadata_found: | |
| print(f"\n{COLOR_WARNING}No metadata found in this image.{RESET}") | |
| return metadata_found | |
| except Exception as e: | |
| logging.error(f"Error processing {image_path}: {e}") | |
| return False | |
| # ─── Main ────────────────────────────────────────────────────────────────────── | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Extract stealth metadata and EXIF UserComment from images.") | |
| parser.add_argument("path", help="Image file or directory to process") | |
| parser.add_argument("--stealth", "-s", action="store_true", help="Extract stealth metadata") | |
| parser.add_argument("--comment", "-c", action="store_true", help="Extract EXIF UserComment") | |
| parser.add_argument("--preview", "-p", action="store_true", help="Show image preview") | |
| parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") | |
| args = parser.parse_args() | |
| # Update global settings based on command-line arguments | |
| global EXTRACT_STEALTH_METADATA, EXTRACT_USERCOMMENT_METADATA, SHOW_IMAGE_PREVIEW | |
| if args.stealth: | |
| EXTRACT_STEALTH_METADATA = True | |
| if args.comment: | |
| EXTRACT_USERCOMMENT_METADATA = True | |
| if args.preview: | |
| SHOW_IMAGE_PREVIEW = True | |
| if args.verbose: | |
| logger.setLevel(logging.DEBUG) | |
| # Configure PIL logger to prevent plugin import messages | |
| logging.getLogger("PIL").setLevel(logging.INFO) | |
| print_settings() | |
| path = Path(args.path) | |
| if path.is_file(): | |
| if path.suffix.lower() in (".png", ".webp"): | |
| process_image(path) | |
| else: | |
| logging.error(f"Unsupported file format: {path.suffix}") | |
| elif path.is_dir(): | |
| image_count = 0 | |
| found_count = 0 | |
| for file_path in path.glob("**/*"): | |
| if file_path.suffix.lower() in (".png", ".webp"): | |
| image_count += 1 | |
| if process_image(file_path): | |
| found_count += 1 | |
| print() # Add spacing between files | |
| print(f"\n{COLOR_INFO}Processed {image_count} images, found metadata in {found_count} files.{RESET}") | |
| else: | |
| logging.error(f"Path does not exist: {path}") | |
| if __name__ == "__main__": | |
| main() | |
| input("\nPress Enter to exit...") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment