Created
March 7, 2026 15:08
-
-
Save bizrockman/704bb402f2acfca6fedca4a5536eb2cb to your computer and use it in GitHub Desktop.
Vibe Coding und Sicherheit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Supabase RLS (Row Level Security) Audit Script | |
| ================================================ | |
| Testet ob die RLS-Policies einer Supabase-Instanz korrekt konfiguriert sind. | |
| Verwendet nur den öffentlich sichtbaren anon-Key (kein Service-Key nötig). | |
| """ | |
| import httpx | |
| import json | |
| import sys | |
| from datetime import datetime | |
| # ─── Konfiguration ─────────────────────────────────────────────────────────── | |
| SUPABASE_URL = <Deine URL> | |
| ANON_KEY = <Dein Key> | |
| REST_URL = f"{SUPABASE_URL}/rest/v1" | |
| STORAGE_URL = f"{SUPABASE_URL}/storage/v1" | |
| AUTH_URL = f"{SUPABASE_URL}/auth/v1" | |
| HEADERS = { | |
| "apikey": ANON_KEY, | |
| "Authorization": f"Bearer {ANON_KEY}", | |
| "Content-Type": "application/json", | |
| "Accept": "application/json", | |
| "accept-profile": "public", | |
| "content-profile": "public", | |
| } | |
| COMMON_TABLE_NAMES = [ | |
| # Auth / User-bezogen | |
| "users", "profiles", "accounts", "user_profiles", "user_roles", | |
| "user_settings", "user_data", "customers", "members", | |
| # App-spezifisch (PropStage = Immobilien / Staging) | |
| "properties", "images", "staging_results", "orders", "transactions", | |
| "payments", "subscriptions", "plans", "pricing", "credits", | |
| "invoices", "billing", "designs", "rooms", "listings", | |
| "projects", "tasks", "results", "generations", "renders", | |
| # Allgemein | |
| "site_settings", "settings", "config", "configurations", | |
| "api_keys", "secrets", "tokens", "sessions", | |
| "notifications", "emails", "messages", "logs", | |
| "analytics", "events", "audit_logs", "activity_logs", | |
| "categories", "tags", "files", "uploads", "media", | |
| "comments", "reviews", "feedback", "support_tickets", | |
| # Admin | |
| "admin_settings", "admin_users", "roles", "permissions", | |
| "feature_flags", "features", "waitlist", "beta_users", | |
| # Stripe / Payments | |
| "stripe_customers", "stripe_subscriptions", "stripe_events", | |
| "payment_methods", "coupons", "discounts", | |
| ] | |
| # Farben für Terminal-Output | |
| class C: | |
| RED = "\033[91m" | |
| GREEN = "\033[92m" | |
| YELLOW = "\033[93m" | |
| BLUE = "\033[94m" | |
| MAGENTA = "\033[95m" | |
| CYAN = "\033[96m" | |
| BOLD = "\033[1m" | |
| END = "\033[0m" | |
| def severity_color(sev: str) -> str: | |
| return { | |
| "KRITISCH": C.RED, | |
| "HOCH": C.MAGENTA, | |
| "MITTEL": C.YELLOW, | |
| "NIEDRIG": C.CYAN, | |
| "INFO": C.BLUE, | |
| "OK": C.GREEN, | |
| }.get(sev, "") | |
| findings: list[dict] = [] | |
| def record(severity: str, category: str, detail: str, data: any = None): | |
| entry = {"severity": severity, "category": category, "detail": detail} | |
| if data is not None: | |
| entry["data_preview"] = str(data)[:500] | |
| findings.append(entry) | |
| color = severity_color(severity) | |
| print(f" {color}[{severity}]{C.END} {detail}") | |
| # ─── 1. Tabellen-Enumeration via OpenAPI-Schema ───────────────────────────── | |
| def test_openapi_schema(client: httpx.Client): | |
| print(f"\n{C.BOLD}{'='*70}") | |
| print("1. OpenAPI-Schema-Enumeration") | |
| print(f"{'='*70}{C.END}") | |
| resp = client.get(f"{REST_URL}/", headers=HEADERS) | |
| if resp.status_code == 200: | |
| try: | |
| schema = resp.json() | |
| if "paths" in schema: | |
| tables = [ | |
| p.strip("/") for p in schema["paths"] | |
| if p.strip("/") and not p.startswith("/rpc/") | |
| ] | |
| rpc_funcs = [ | |
| p.replace("/rpc/", "") for p in schema["paths"] | |
| if p.startswith("/rpc/") | |
| ] | |
| record( | |
| "HOCH", "Schema", | |
| f"OpenAPI-Schema exponiert {len(tables)} Tabellen und " | |
| f"{len(rpc_funcs)} RPC-Funktionen", | |
| ) | |
| if tables: | |
| print(f" Tabellen: {', '.join(sorted(tables))}") | |
| if rpc_funcs: | |
| print(f" RPC-Funktionen: {', '.join(sorted(rpc_funcs))}") | |
| return sorted(tables), sorted(rpc_funcs) | |
| elif "definitions" in schema: | |
| tables = list(schema["definitions"].keys()) | |
| record( | |
| "HOCH", "Schema", | |
| f"OpenAPI-Schema exponiert {len(tables)} Definitionen", | |
| ) | |
| print(f" Definitionen: {', '.join(sorted(tables))}") | |
| return sorted(tables), [] | |
| except Exception: | |
| pass | |
| record("INFO", "Schema", f"OpenAPI-Root liefert Status {resp.status_code}") | |
| return [], [] | |
| # ─── 2. Tabellen lesen (SELECT) ───────────────────────────────────────────── | |
| def test_table_read(client: httpx.Client, tables: list[str]): | |
| print(f"\n{C.BOLD}{'='*70}") | |
| print("2. Tabellen-Lesezugriff (SELECT)") | |
| print(f"{'='*70}{C.END}") | |
| all_tables = list(set(tables + COMMON_TABLE_NAMES)) | |
| accessible = [] | |
| for table in sorted(all_tables): | |
| resp = client.get( | |
| f"{REST_URL}/{table}?select=*&limit=5", | |
| headers=HEADERS, | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| row_count = len(data) if isinstance(data, list) else "?" | |
| marker = "bekannt" if table in tables else "geraten" | |
| if isinstance(data, list) and len(data) > 0: | |
| cols = list(data[0].keys()) if data else [] | |
| record( | |
| "KRITISCH", "SELECT", | |
| f"Tabelle '{table}' ({marker}): {row_count} Zeilen lesbar | " | |
| f"Spalten: {cols}", | |
| ) | |
| accessible.append((table, data)) | |
| elif isinstance(data, list) and len(data) == 0: | |
| record( | |
| "MITTEL", "SELECT", | |
| f"Tabelle '{table}' ({marker}): existiert, 0 Zeilen " | |
| f"(RLS blockiert oder leer)", | |
| ) | |
| accessible.append((table, [])) | |
| elif resp.status_code == 404: | |
| pass # Tabelle existiert nicht | |
| elif resp.status_code in (401, 403): | |
| record("OK", "SELECT", f"Tabelle '{table}': Zugriff verweigert ({resp.status_code})") | |
| else: | |
| record("INFO", "SELECT", f"Tabelle '{table}': Status {resp.status_code}") | |
| return accessible | |
| # ─── 3. Schreibzugriff testen (INSERT / UPDATE / DELETE) ──────────────────── | |
| def test_table_write(client: httpx.Client, accessible_tables: list[tuple]): | |
| print(f"\n{C.BOLD}{'='*70}") | |
| print("3. Schreibzugriff (INSERT / UPDATE / DELETE)") | |
| print(f"{'='*70}{C.END}") | |
| for table, rows in accessible_tables: | |
| dummy = {"__rls_test_col": "__rls_test_value"} | |
| # INSERT | |
| resp = client.post( | |
| f"{REST_URL}/{table}", | |
| headers={**HEADERS, "Prefer": "return=minimal"}, | |
| json=dummy, | |
| ) | |
| if resp.status_code in (200, 201): | |
| record("KRITISCH", "INSERT", f"INSERT in '{table}' erfolgreich!") | |
| elif resp.status_code == 409: | |
| record("HOCH", "INSERT", f"INSERT in '{table}': Constraint-Fehler (aber RLS erlaubt es)") | |
| elif resp.status_code in (401, 403): | |
| record("OK", "INSERT", f"INSERT in '{table}' blockiert ({resp.status_code})") | |
| else: | |
| detail = resp.text[:200] if resp.text else "" | |
| record("INFO", "INSERT", f"INSERT in '{table}': Status {resp.status_code} — {detail}") | |
| # UPDATE (mit unmöglicher Bedingung, um nichts zu ändern) | |
| resp = client.patch( | |
| f"{REST_URL}/{table}?id=eq.00000000-0000-0000-0000-000000000000", | |
| headers={**HEADERS, "Prefer": "return=minimal"}, | |
| json={"__rls_test_col": "__rls_test_value"}, | |
| ) | |
| if resp.status_code in (200, 204): | |
| record("HOCH", "UPDATE", f"UPDATE auf '{table}' nicht blockiert (RLS prüft evtl. nicht)") | |
| elif resp.status_code in (401, 403): | |
| record("OK", "UPDATE", f"UPDATE auf '{table}' blockiert ({resp.status_code})") | |
| else: | |
| record("INFO", "UPDATE", f"UPDATE auf '{table}': Status {resp.status_code}") | |
| # DELETE (mit unmöglicher Bedingung) | |
| resp = client.delete( | |
| f"{REST_URL}/{table}?id=eq.00000000-0000-0000-0000-000000000000", | |
| headers=HEADERS, | |
| ) | |
| if resp.status_code in (200, 204): | |
| record("HOCH", "DELETE", f"DELETE auf '{table}' nicht blockiert") | |
| elif resp.status_code in (401, 403): | |
| record("OK", "DELETE", f"DELETE auf '{table}' blockiert ({resp.status_code})") | |
| else: | |
| record("INFO", "DELETE", f"DELETE auf '{table}': Status {resp.status_code}") | |
| # ─── 4. RPC-Funktionen testen ─────────────────────────────────────────────── | |
| def test_rpc_functions(client: httpx.Client, rpc_funcs: list[str]): | |
| print(f"\n{C.BOLD}{'='*70}") | |
| print("4. RPC-Funktionen") | |
| print(f"{'='*70}{C.END}") | |
| if not rpc_funcs: | |
| print(" Keine RPC-Funktionen im Schema gefunden.") | |
| return | |
| for func in rpc_funcs: | |
| resp = client.post( | |
| f"{REST_URL}/rpc/{func}", | |
| headers=HEADERS, | |
| json={}, | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json() if resp.text else None | |
| preview = str(data)[:300] if data else "(leer)" | |
| record("HOCH", "RPC", f"RPC '{func}' aufrufbar: {preview}") | |
| elif resp.status_code in (401, 403): | |
| record("OK", "RPC", f"RPC '{func}' blockiert ({resp.status_code})") | |
| else: | |
| record("INFO", "RPC", f"RPC '{func}': Status {resp.status_code}") | |
| # ─── 5. Storage Buckets ───────────────────────────────────────────────────── | |
| def test_storage(client: httpx.Client): | |
| print(f"\n{C.BOLD}{'='*70}") | |
| print("5. Storage Buckets") | |
| print(f"{'='*70}{C.END}") | |
| resp = client.get(f"{STORAGE_URL}/bucket", headers=HEADERS) | |
| if resp.status_code == 200: | |
| buckets = resp.json() | |
| if isinstance(buckets, list) and buckets: | |
| record( | |
| "HOCH", "Storage", | |
| f"{len(buckets)} Buckets sichtbar: " | |
| f"{[b.get('name', '?') for b in buckets]}", | |
| ) | |
| for bucket in buckets: | |
| name = bucket.get("name", "") | |
| is_public = bucket.get("public", False) | |
| if is_public: | |
| record("MITTEL", "Storage", f"Bucket '{name}' ist öffentlich") | |
| # Dateien im Bucket auflisten | |
| list_resp = client.post( | |
| f"{STORAGE_URL}/object/list/{name}", | |
| headers=HEADERS, | |
| json={"prefix": "", "limit": 20, "offset": 0}, | |
| ) | |
| if list_resp.status_code == 200: | |
| files = list_resp.json() | |
| if isinstance(files, list) and files: | |
| file_names = [f.get("name", "?") for f in files[:10]] | |
| record( | |
| "HOCH", "Storage", | |
| f"Bucket '{name}': {len(files)} Dateien auflistbar — {file_names}", | |
| ) | |
| else: | |
| record("OK", "Storage", "Keine Buckets sichtbar oder leere Liste") | |
| elif resp.status_code in (401, 403): | |
| record("OK", "Storage", f"Bucket-Auflistung blockiert ({resp.status_code})") | |
| else: | |
| record("INFO", "Storage", f"Bucket-Endpunkt: Status {resp.status_code}") | |
| # ─── 6. Auth-Endpunkte ────────────────────────────────────────────────────── | |
| def test_auth_endpoints(client: httpx.Client): | |
| print(f"\n{C.BOLD}{'='*70}") | |
| print("6. Auth-Endpunkte") | |
| print(f"{'='*70}{C.END}") | |
| # Settings | |
| resp = client.get(f"{AUTH_URL}/settings", headers={"apikey": ANON_KEY}) | |
| if resp.status_code == 200: | |
| settings = resp.json() | |
| providers = settings.get("external", {}) | |
| enabled = [k for k, v in providers.items() if v is True or (isinstance(v, dict) and v.get("enabled"))] | |
| record("INFO", "Auth", f"Auth-Settings erreichbar. Provider: {enabled or list(providers.keys())[:10]}") | |
| else: | |
| record("INFO", "Auth", f"Auth-Settings: Status {resp.status_code}") | |
| # User-Liste (admin-only, sollte fehlschlagen) | |
| resp = client.get( | |
| f"{AUTH_URL}/admin/users", | |
| headers={"apikey": ANON_KEY, "Authorization": f"Bearer {ANON_KEY}"}, | |
| ) | |
| if resp.status_code == 200: | |
| record("KRITISCH", "Auth", "Admin-User-Liste mit anon-Key abrufbar!") | |
| else: | |
| record("OK", "Auth", f"Admin-User-Endpunkt blockiert ({resp.status_code})") | |
| # ─── 7. Sensible Daten in lesbaren Tabellen ───────────────────────────────── | |
| def check_sensitive_data(accessible_tables: list[tuple]): | |
| print(f"\n{C.BOLD}{'='*70}") | |
| print("7. Analyse sensibler Daten in lesbaren Tabellen") | |
| print(f"{'='*70}{C.END}") | |
| sensitive_keywords = [ | |
| "email", "password", "hash", "secret", "token", "key", "api_key", | |
| "stripe", "payment", "card", "ssn", "phone", "address", "credit", | |
| "private", "salt", "otp", "mfa", "2fa", "refresh_token", | |
| ] | |
| for table, rows in accessible_tables: | |
| if not rows: | |
| continue | |
| cols = list(rows[0].keys()) | |
| flagged = [ | |
| col for col in cols | |
| if any(kw in col.lower() for kw in sensitive_keywords) | |
| ] | |
| if flagged: | |
| record( | |
| "KRITISCH", "Sensible Daten", | |
| f"Tabelle '{table}' enthält potenziell sensible Spalten: {flagged}", | |
| ) | |
| for row in rows[:2]: | |
| preview = {k: row.get(k) for k in flagged} | |
| print(f" Beispiel: {json.dumps(preview, default=str)[:300]}") | |
| # ─── 8. Row-Count Estimation ──────────────────────────────────────────────── | |
| def test_row_counts(client: httpx.Client, accessible_tables: list[tuple]): | |
| print(f"\n{C.BOLD}{'='*70}") | |
| print("8. Zeilenanzahl-Schätzung (Prefer: count=exact)") | |
| print(f"{'='*70}{C.END}") | |
| for table, _ in accessible_tables: | |
| resp = client.head( | |
| f"{REST_URL}/{table}?select=*", | |
| headers={**HEADERS, "Prefer": "count=exact"}, | |
| ) | |
| count = resp.headers.get("content-range", "?") | |
| if resp.status_code == 200 and count != "?": | |
| record("INFO", "Count", f"Tabelle '{table}': {count} Zeilen") | |
| # ─── Zusammenfassung ───────────────────────────────────────────────────────── | |
| def print_summary(): | |
| print(f"\n{C.BOLD}{'='*70}") | |
| print("ZUSAMMENFASSUNG") | |
| print(f"{'='*70}{C.END}") | |
| counts = {} | |
| for f in findings: | |
| s = f["severity"] | |
| counts[s] = counts.get(s, 0) + 1 | |
| for sev in ["KRITISCH", "HOCH", "MITTEL", "NIEDRIG", "INFO", "OK"]: | |
| if sev in counts: | |
| color = severity_color(sev) | |
| print(f" {color}{sev}: {counts[sev]}{C.END}") | |
| kritisch = [f for f in findings if f["severity"] == "KRITISCH"] | |
| hoch = [f for f in findings if f["severity"] == "HOCH"] | |
| if kritisch or hoch: | |
| print(f"\n{C.RED}{C.BOLD}⚠ WARNUNG: Es wurden kritische oder hohe " | |
| f"Sicherheitsprobleme gefunden!{C.END}") | |
| print(f"{C.RED} Der Betreiber sollte dringend die RLS-Policies überprüfen.{C.END}") | |
| else: | |
| print(f"\n{C.GREEN}Keine kritischen Probleme gefunden. " | |
| f"RLS scheint grundsätzlich konfiguriert zu sein.{C.END}") | |
| # Export als JSON | |
| report_file = f"rls_audit_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| with open(report_file, "w", encoding="utf-8") as fp: | |
| json.dump(findings, fp, indent=2, ensure_ascii=False, default=str) | |
| print(f"\n Vollständiger Report gespeichert: {report_file}") | |
| # ─── Main ──────────────────────────────────────────────────────────────────── | |
| def main(): | |
| print(f"{C.BOLD}{C.CYAN}") | |
| print("╔══════════════════════════════════════════════════════════════════╗") | |
| print("║ Supabase RLS Audit — PropStage.ai ║") | |
| print("║ ║") | |
| print(f"║ Zeitpunkt: {datetime.now().strftime('%Y-%m-%d %H:%M:%S'):<43}║") | |
| print("╚══════════════════════════════════════════════════════════════════╝") | |
| print(f"{C.END}") | |
| with httpx.Client(timeout=15.0) as client: | |
| # 1) Schema | |
| schema_tables, rpc_funcs = test_openapi_schema(client) | |
| # 2) Tabellen lesen | |
| accessible = test_table_read(client, schema_tables) | |
| # 3) Schreibzugriff | |
| test_table_write(client, accessible) | |
| # 4) RPC | |
| test_rpc_functions(client, rpc_funcs) | |
| # 5) Storage | |
| test_storage(client) | |
| # 6) Auth | |
| test_auth_endpoints(client) | |
| # 7) Sensible Daten | |
| check_sensitive_data(accessible) | |
| # 8) Row-Counts | |
| test_row_counts(client, accessible) | |
| print_summary() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment