Last active
March 4, 2026 14:17
-
-
Save omry/ccd81b9eed5f495903e059fc0b531185 to your computer and use it in GitHub Desktop.
Python script to sync UFW rules to allow SSH access to GitHub IPs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Description: Syncs UFW rules to allow outbound SSH push access to GitHub public IPs. | |
| Prevents rule leaking by pruning IPs that are removed from the GitHub API. | |
| Designed for cron: silent by default, errors to stderr, chatty with --verbose. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import argparse | |
| import subprocess | |
| import urllib.request | |
| from urllib.error import URLError | |
| API_URL = "https://api.github.com/meta" | |
| STATE_FILE = "/var/lib/github_ufw_state.json" | |
| VERBOSE = False | |
| def vprint(*args, **kwargs): | |
| """Print standard output only if verbose mode is enabled.""" | |
| if VERBOSE: | |
| print(*args, **kwargs) | |
| def check_root(): | |
| """Ensure the script is executed with root privileges.""" | |
| if os.geteuid() != 0: | |
| print("Error: This script must be run as root.", file=sys.stderr) | |
| sys.exit(1) | |
| def check_ufw_installed(): | |
| """Verify UFW is available on the system.""" | |
| try: | |
| subprocess.run(["ufw", "--version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) | |
| except (FileNotFoundError, subprocess.CalledProcessError): | |
| print("Error: UFW is not installed or not in PATH.", file=sys.stderr) | |
| sys.exit(1) | |
| def fetch_github_ips(): | |
| """Fetch the 'git' IP arrays from the GitHub Meta API.""" | |
| try: | |
| req = urllib.request.Request(API_URL, headers={'User-Agent': 'UFW-Sync-Script'}) | |
| with urllib.request.urlopen(req) as response: | |
| if response.status != 200: | |
| print(f"Error: API returned status {response.status}", file=sys.stderr) | |
| sys.exit(1) | |
| data = json.loads(response.read().decode('utf-8')) | |
| if 'git' not in data: | |
| print("Error: 'git' key not found in API response. Halting.", file=sys.stderr) | |
| sys.exit(1) | |
| return set(data['git']) | |
| except URLError as e: | |
| print(f"Error connecting to GitHub API: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| except json.JSONDecodeError: | |
| print("Error parsing JSON response from GitHub API.", file=sys.stderr) | |
| sys.exit(1) | |
| def load_state(): | |
| """Load previously applied IPs from the state file.""" | |
| if not os.path.exists(STATE_FILE): | |
| return set() | |
| try: | |
| with open(STATE_FILE, 'r') as f: | |
| return set(json.load(f)) | |
| except (json.JSONDecodeError, IOError): | |
| print("Warning: State file corrupted or unreadable. Starting fresh.", file=sys.stderr) | |
| return set() | |
| def save_state(ips): | |
| """Save the current active IPs to the state file.""" | |
| try: | |
| with open(STATE_FILE, 'w') as f: | |
| json.dump(list(ips), f, indent=4) | |
| except IOError as e: | |
| print(f"Error saving state file: {e}", file=sys.stderr) | |
| def run_ufw(args): | |
| """Execute a UFW command and suppress standard output.""" | |
| command = ["ufw"] + args | |
| result = subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True) | |
| return result.returncode == 0, result.stderr | |
| def main(): | |
| global VERBOSE | |
| # Parse command-line arguments | |
| parser = argparse.ArgumentParser(description="Sync UFW rules for GitHub SSH IPs.") | |
| parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output") | |
| args = parser.parse_args() | |
| VERBOSE = args.verbose | |
| check_root() | |
| check_ufw_installed() | |
| vprint("Fetching current GitHub IPs...") | |
| new_ips = fetch_github_ips() | |
| old_ips = load_state() | |
| # Calculate sets to determine exactly what to remove | |
| ips_to_remove = old_ips - new_ips | |
| # 1. Prune Deprecated IPs | |
| for ip in ips_to_remove: | |
| vprint(f"Removing deprecated GitHub IP: {ip}") | |
| success, err = run_ufw(["delete", "allow", "out", "to", ip, "port", "22", "proto", "tcp"]) | |
| if not success and "Could not find a rule that matches" not in err: | |
| print(f"Warning: Failed to remove rule for {ip}. {err.strip()}", file=sys.stderr) | |
| # 2. Apply Current IPs | |
| for ip in new_ips: | |
| success, err = run_ufw(["allow", "out", "to", ip, "port", "22", "proto", "tcp", "comment", "GitHub SSH Push"]) | |
| if not success and "Skipping adding existing rule" not in err: | |
| print(f"Warning: Failed to add rule for {ip}. {err.strip()}", file=sys.stderr) | |
| # 3. Update State | |
| save_state(new_ips) | |
| vprint("GitHub UFW rules successfully synchronized.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment