Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save bizrockman/704bb402f2acfca6fedca4a5536eb2cb to your computer and use it in GitHub Desktop.

Select an option

Save bizrockman/704bb402f2acfca6fedca4a5536eb2cb to your computer and use it in GitHub Desktop.
Vibe Coding und Sicherheit
"""
Supabase RLS (Row Level Security) Audit Script
================================================
Testet ob die RLS-Policies einer Supabase-Instanz korrekt konfiguriert sind.
Verwendet nur den öffentlich sichtbaren anon-Key (kein Service-Key nötig).
"""
import httpx
import json
import sys
from datetime import datetime
# ─── Konfiguration ───────────────────────────────────────────────────────────
SUPABASE_URL = <Deine URL>
ANON_KEY = <Dein Key>
REST_URL = f"{SUPABASE_URL}/rest/v1"
STORAGE_URL = f"{SUPABASE_URL}/storage/v1"
AUTH_URL = f"{SUPABASE_URL}/auth/v1"
HEADERS = {
"apikey": ANON_KEY,
"Authorization": f"Bearer {ANON_KEY}",
"Content-Type": "application/json",
"Accept": "application/json",
"accept-profile": "public",
"content-profile": "public",
}
COMMON_TABLE_NAMES = [
# Auth / User-bezogen
"users", "profiles", "accounts", "user_profiles", "user_roles",
"user_settings", "user_data", "customers", "members",
# App-spezifisch (PropStage = Immobilien / Staging)
"properties", "images", "staging_results", "orders", "transactions",
"payments", "subscriptions", "plans", "pricing", "credits",
"invoices", "billing", "designs", "rooms", "listings",
"projects", "tasks", "results", "generations", "renders",
# Allgemein
"site_settings", "settings", "config", "configurations",
"api_keys", "secrets", "tokens", "sessions",
"notifications", "emails", "messages", "logs",
"analytics", "events", "audit_logs", "activity_logs",
"categories", "tags", "files", "uploads", "media",
"comments", "reviews", "feedback", "support_tickets",
# Admin
"admin_settings", "admin_users", "roles", "permissions",
"feature_flags", "features", "waitlist", "beta_users",
# Stripe / Payments
"stripe_customers", "stripe_subscriptions", "stripe_events",
"payment_methods", "coupons", "discounts",
]
# Farben für Terminal-Output
class C:
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
BOLD = "\033[1m"
END = "\033[0m"
def severity_color(sev: str) -> str:
return {
"KRITISCH": C.RED,
"HOCH": C.MAGENTA,
"MITTEL": C.YELLOW,
"NIEDRIG": C.CYAN,
"INFO": C.BLUE,
"OK": C.GREEN,
}.get(sev, "")
findings: list[dict] = []
def record(severity: str, category: str, detail: str, data: any = None):
entry = {"severity": severity, "category": category, "detail": detail}
if data is not None:
entry["data_preview"] = str(data)[:500]
findings.append(entry)
color = severity_color(severity)
print(f" {color}[{severity}]{C.END} {detail}")
# ─── 1. Tabellen-Enumeration via OpenAPI-Schema ─────────────────────────────
def test_openapi_schema(client: httpx.Client):
print(f"\n{C.BOLD}{'='*70}")
print("1. OpenAPI-Schema-Enumeration")
print(f"{'='*70}{C.END}")
resp = client.get(f"{REST_URL}/", headers=HEADERS)
if resp.status_code == 200:
try:
schema = resp.json()
if "paths" in schema:
tables = [
p.strip("/") for p in schema["paths"]
if p.strip("/") and not p.startswith("/rpc/")
]
rpc_funcs = [
p.replace("/rpc/", "") for p in schema["paths"]
if p.startswith("/rpc/")
]
record(
"HOCH", "Schema",
f"OpenAPI-Schema exponiert {len(tables)} Tabellen und "
f"{len(rpc_funcs)} RPC-Funktionen",
)
if tables:
print(f" Tabellen: {', '.join(sorted(tables))}")
if rpc_funcs:
print(f" RPC-Funktionen: {', '.join(sorted(rpc_funcs))}")
return sorted(tables), sorted(rpc_funcs)
elif "definitions" in schema:
tables = list(schema["definitions"].keys())
record(
"HOCH", "Schema",
f"OpenAPI-Schema exponiert {len(tables)} Definitionen",
)
print(f" Definitionen: {', '.join(sorted(tables))}")
return sorted(tables), []
except Exception:
pass
record("INFO", "Schema", f"OpenAPI-Root liefert Status {resp.status_code}")
return [], []
# ─── 2. Tabellen lesen (SELECT) ─────────────────────────────────────────────
def test_table_read(client: httpx.Client, tables: list[str]):
print(f"\n{C.BOLD}{'='*70}")
print("2. Tabellen-Lesezugriff (SELECT)")
print(f"{'='*70}{C.END}")
all_tables = list(set(tables + COMMON_TABLE_NAMES))
accessible = []
for table in sorted(all_tables):
resp = client.get(
f"{REST_URL}/{table}?select=*&limit=5",
headers=HEADERS,
)
if resp.status_code == 200:
data = resp.json()
row_count = len(data) if isinstance(data, list) else "?"
marker = "bekannt" if table in tables else "geraten"
if isinstance(data, list) and len(data) > 0:
cols = list(data[0].keys()) if data else []
record(
"KRITISCH", "SELECT",
f"Tabelle '{table}' ({marker}): {row_count} Zeilen lesbar | "
f"Spalten: {cols}",
)
accessible.append((table, data))
elif isinstance(data, list) and len(data) == 0:
record(
"MITTEL", "SELECT",
f"Tabelle '{table}' ({marker}): existiert, 0 Zeilen "
f"(RLS blockiert oder leer)",
)
accessible.append((table, []))
elif resp.status_code == 404:
pass # Tabelle existiert nicht
elif resp.status_code in (401, 403):
record("OK", "SELECT", f"Tabelle '{table}': Zugriff verweigert ({resp.status_code})")
else:
record("INFO", "SELECT", f"Tabelle '{table}': Status {resp.status_code}")
return accessible
# ─── 3. Schreibzugriff testen (INSERT / UPDATE / DELETE) ────────────────────
def test_table_write(client: httpx.Client, accessible_tables: list[tuple]):
print(f"\n{C.BOLD}{'='*70}")
print("3. Schreibzugriff (INSERT / UPDATE / DELETE)")
print(f"{'='*70}{C.END}")
for table, rows in accessible_tables:
dummy = {"__rls_test_col": "__rls_test_value"}
# INSERT
resp = client.post(
f"{REST_URL}/{table}",
headers={**HEADERS, "Prefer": "return=minimal"},
json=dummy,
)
if resp.status_code in (200, 201):
record("KRITISCH", "INSERT", f"INSERT in '{table}' erfolgreich!")
elif resp.status_code == 409:
record("HOCH", "INSERT", f"INSERT in '{table}': Constraint-Fehler (aber RLS erlaubt es)")
elif resp.status_code in (401, 403):
record("OK", "INSERT", f"INSERT in '{table}' blockiert ({resp.status_code})")
else:
detail = resp.text[:200] if resp.text else ""
record("INFO", "INSERT", f"INSERT in '{table}': Status {resp.status_code} — {detail}")
# UPDATE (mit unmöglicher Bedingung, um nichts zu ändern)
resp = client.patch(
f"{REST_URL}/{table}?id=eq.00000000-0000-0000-0000-000000000000",
headers={**HEADERS, "Prefer": "return=minimal"},
json={"__rls_test_col": "__rls_test_value"},
)
if resp.status_code in (200, 204):
record("HOCH", "UPDATE", f"UPDATE auf '{table}' nicht blockiert (RLS prüft evtl. nicht)")
elif resp.status_code in (401, 403):
record("OK", "UPDATE", f"UPDATE auf '{table}' blockiert ({resp.status_code})")
else:
record("INFO", "UPDATE", f"UPDATE auf '{table}': Status {resp.status_code}")
# DELETE (mit unmöglicher Bedingung)
resp = client.delete(
f"{REST_URL}/{table}?id=eq.00000000-0000-0000-0000-000000000000",
headers=HEADERS,
)
if resp.status_code in (200, 204):
record("HOCH", "DELETE", f"DELETE auf '{table}' nicht blockiert")
elif resp.status_code in (401, 403):
record("OK", "DELETE", f"DELETE auf '{table}' blockiert ({resp.status_code})")
else:
record("INFO", "DELETE", f"DELETE auf '{table}': Status {resp.status_code}")
# ─── 4. RPC-Funktionen testen ───────────────────────────────────────────────
def test_rpc_functions(client: httpx.Client, rpc_funcs: list[str]):
print(f"\n{C.BOLD}{'='*70}")
print("4. RPC-Funktionen")
print(f"{'='*70}{C.END}")
if not rpc_funcs:
print(" Keine RPC-Funktionen im Schema gefunden.")
return
for func in rpc_funcs:
resp = client.post(
f"{REST_URL}/rpc/{func}",
headers=HEADERS,
json={},
)
if resp.status_code == 200:
data = resp.json() if resp.text else None
preview = str(data)[:300] if data else "(leer)"
record("HOCH", "RPC", f"RPC '{func}' aufrufbar: {preview}")
elif resp.status_code in (401, 403):
record("OK", "RPC", f"RPC '{func}' blockiert ({resp.status_code})")
else:
record("INFO", "RPC", f"RPC '{func}': Status {resp.status_code}")
# ─── 5. Storage Buckets ─────────────────────────────────────────────────────
def test_storage(client: httpx.Client):
print(f"\n{C.BOLD}{'='*70}")
print("5. Storage Buckets")
print(f"{'='*70}{C.END}")
resp = client.get(f"{STORAGE_URL}/bucket", headers=HEADERS)
if resp.status_code == 200:
buckets = resp.json()
if isinstance(buckets, list) and buckets:
record(
"HOCH", "Storage",
f"{len(buckets)} Buckets sichtbar: "
f"{[b.get('name', '?') for b in buckets]}",
)
for bucket in buckets:
name = bucket.get("name", "")
is_public = bucket.get("public", False)
if is_public:
record("MITTEL", "Storage", f"Bucket '{name}' ist öffentlich")
# Dateien im Bucket auflisten
list_resp = client.post(
f"{STORAGE_URL}/object/list/{name}",
headers=HEADERS,
json={"prefix": "", "limit": 20, "offset": 0},
)
if list_resp.status_code == 200:
files = list_resp.json()
if isinstance(files, list) and files:
file_names = [f.get("name", "?") for f in files[:10]]
record(
"HOCH", "Storage",
f"Bucket '{name}': {len(files)} Dateien auflistbar — {file_names}",
)
else:
record("OK", "Storage", "Keine Buckets sichtbar oder leere Liste")
elif resp.status_code in (401, 403):
record("OK", "Storage", f"Bucket-Auflistung blockiert ({resp.status_code})")
else:
record("INFO", "Storage", f"Bucket-Endpunkt: Status {resp.status_code}")
# ─── 6. Auth-Endpunkte ──────────────────────────────────────────────────────
def test_auth_endpoints(client: httpx.Client):
print(f"\n{C.BOLD}{'='*70}")
print("6. Auth-Endpunkte")
print(f"{'='*70}{C.END}")
# Settings
resp = client.get(f"{AUTH_URL}/settings", headers={"apikey": ANON_KEY})
if resp.status_code == 200:
settings = resp.json()
providers = settings.get("external", {})
enabled = [k for k, v in providers.items() if v is True or (isinstance(v, dict) and v.get("enabled"))]
record("INFO", "Auth", f"Auth-Settings erreichbar. Provider: {enabled or list(providers.keys())[:10]}")
else:
record("INFO", "Auth", f"Auth-Settings: Status {resp.status_code}")
# User-Liste (admin-only, sollte fehlschlagen)
resp = client.get(
f"{AUTH_URL}/admin/users",
headers={"apikey": ANON_KEY, "Authorization": f"Bearer {ANON_KEY}"},
)
if resp.status_code == 200:
record("KRITISCH", "Auth", "Admin-User-Liste mit anon-Key abrufbar!")
else:
record("OK", "Auth", f"Admin-User-Endpunkt blockiert ({resp.status_code})")
# ─── 7. Sensible Daten in lesbaren Tabellen ─────────────────────────────────
def check_sensitive_data(accessible_tables: list[tuple]):
print(f"\n{C.BOLD}{'='*70}")
print("7. Analyse sensibler Daten in lesbaren Tabellen")
print(f"{'='*70}{C.END}")
sensitive_keywords = [
"email", "password", "hash", "secret", "token", "key", "api_key",
"stripe", "payment", "card", "ssn", "phone", "address", "credit",
"private", "salt", "otp", "mfa", "2fa", "refresh_token",
]
for table, rows in accessible_tables:
if not rows:
continue
cols = list(rows[0].keys())
flagged = [
col for col in cols
if any(kw in col.lower() for kw in sensitive_keywords)
]
if flagged:
record(
"KRITISCH", "Sensible Daten",
f"Tabelle '{table}' enthält potenziell sensible Spalten: {flagged}",
)
for row in rows[:2]:
preview = {k: row.get(k) for k in flagged}
print(f" Beispiel: {json.dumps(preview, default=str)[:300]}")
# ─── 8. Row-Count Estimation ────────────────────────────────────────────────
def test_row_counts(client: httpx.Client, accessible_tables: list[tuple]):
print(f"\n{C.BOLD}{'='*70}")
print("8. Zeilenanzahl-Schätzung (Prefer: count=exact)")
print(f"{'='*70}{C.END}")
for table, _ in accessible_tables:
resp = client.head(
f"{REST_URL}/{table}?select=*",
headers={**HEADERS, "Prefer": "count=exact"},
)
count = resp.headers.get("content-range", "?")
if resp.status_code == 200 and count != "?":
record("INFO", "Count", f"Tabelle '{table}': {count} Zeilen")
# ─── Zusammenfassung ─────────────────────────────────────────────────────────
def print_summary():
print(f"\n{C.BOLD}{'='*70}")
print("ZUSAMMENFASSUNG")
print(f"{'='*70}{C.END}")
counts = {}
for f in findings:
s = f["severity"]
counts[s] = counts.get(s, 0) + 1
for sev in ["KRITISCH", "HOCH", "MITTEL", "NIEDRIG", "INFO", "OK"]:
if sev in counts:
color = severity_color(sev)
print(f" {color}{sev}: {counts[sev]}{C.END}")
kritisch = [f for f in findings if f["severity"] == "KRITISCH"]
hoch = [f for f in findings if f["severity"] == "HOCH"]
if kritisch or hoch:
print(f"\n{C.RED}{C.BOLD}⚠ WARNUNG: Es wurden kritische oder hohe "
f"Sicherheitsprobleme gefunden!{C.END}")
print(f"{C.RED} Der Betreiber sollte dringend die RLS-Policies überprüfen.{C.END}")
else:
print(f"\n{C.GREEN}Keine kritischen Probleme gefunden. "
f"RLS scheint grundsätzlich konfiguriert zu sein.{C.END}")
# Export als JSON
report_file = f"rls_audit_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, "w", encoding="utf-8") as fp:
json.dump(findings, fp, indent=2, ensure_ascii=False, default=str)
print(f"\n Vollständiger Report gespeichert: {report_file}")
# ─── Main ────────────────────────────────────────────────────────────────────
def main():
print(f"{C.BOLD}{C.CYAN}")
print("╔══════════════════════════════════════════════════════════════════╗")
print("║ Supabase RLS Audit — PropStage.ai ║")
print("║ ║")
print(f"║ Zeitpunkt: {datetime.now().strftime('%Y-%m-%d %H:%M:%S'):<43}║")
print("╚══════════════════════════════════════════════════════════════════╝")
print(f"{C.END}")
with httpx.Client(timeout=15.0) as client:
# 1) Schema
schema_tables, rpc_funcs = test_openapi_schema(client)
# 2) Tabellen lesen
accessible = test_table_read(client, schema_tables)
# 3) Schreibzugriff
test_table_write(client, accessible)
# 4) RPC
test_rpc_functions(client, rpc_funcs)
# 5) Storage
test_storage(client)
# 6) Auth
test_auth_endpoints(client)
# 7) Sensible Daten
check_sensitive_data(accessible)
# 8) Row-Counts
test_row_counts(client, accessible)
print_summary()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment