Skip to content

Instantly share code, notes, and snippets.

@jaysonsantos
Last active February 18, 2026 12:47
Show Gist options
  • Select an option

  • Save jaysonsantos/c4c09eec2c817cff213bf25b0d4b65db to your computer and use it in GitHub Desktop.

Select an option

Save jaysonsantos/c4c09eec2c817cff213bf25b0d4b65db to your computer and use it in GitHub Desktop.
#!/bin/bash
# GitHub SSH Key Manager Installer
# Usage: ./install.sh <github_username>
set -euo pipefail
GITHUB_USER="${1:-}"
INSTALL_DIR="$HOME/.local/bin"
SCRIPT_NAME="update-ssh-keys.py"
SCRIPT_PATH="$INSTALL_DIR/$SCRIPT_NAME"
if [[ -z "$GITHUB_USER" ]]; then
echo "Error: GitHub username required"
echo "Usage: $0 <github_username>"
exit 1
fi
# Create install directory if needed
mkdir -p "$INSTALL_DIR"
# Write the Python script
cat > "$SCRIPT_PATH" << 'PYTHON_SCRIPT'
#!/usr/bin/env python3
"""
GitHub SSH Key Manager
Fetches SSH keys from GitHub, maintains local state with backup capability,
and only modifies keys within its managed section.
"""
import os
import sys
import re
import json
import hashlib
import argparse
import shutil
from pathlib import Path
from datetime import datetime
from urllib.request import urlopen, Request
from urllib.error import HTTPError, URLError
class GitHubSSHManager:
# Markers for the managed section
SECTION_START = "# >>> GITHUB_SSH_MANAGER_START (DO NOT EDIT MANUALLY)"
SECTION_END = "# <<< GITHUB_SSH_MANAGER_END (DO NOT EDIT MANUALLY)"
def __init__(self, username, backup_dir=None):
self.username = username
self.ssh_dir = Path.home() / ".ssh"
self.authorized_keys = self.ssh_dir / "authorized_keys"
# Default backup directory
if backup_dir is None:
self.backup_dir = self.ssh_dir / ".github_ssh_manager_backups"
else:
self.backup_dir = Path(backup_dir)
self.state_file = self.backup_dir / "state.json"
self.current_backup_dir = None
def _ensure_ssh_dir(self):
"""Ensure .ssh directory exists with proper permissions."""
self.ssh_dir.mkdir(mode=0o700, exist_ok=True)
def _fetch_github_keys(self):
"""Fetch SSH keys from GitHub API."""
url = f"https://api.github.com/users/{self.username}/keys"
try:
req = Request(url, headers={
'User-Agent': 'GitHub-SSH-Manager/1.0',
'Accept': 'application/vnd.github.v3+json'
})
with urlopen(req, timeout=30) as response:
data = json.loads(response.read().decode('utf-8'))
keys = []
for key_data in data:
key_info = {
'id': key_data['id'],
'key': key_data['key'].strip(),
'title': key_data.get('title', f"github_key_{key_data['id']}"),
'fetched_at': datetime.now().isoformat()
}
keys.append(key_info)
return keys
except HTTPError as e:
if e.code == 404:
raise Exception(f"GitHub user '{self.username}' not found or no public keys") from e
elif e.code == 403:
raise Exception("GitHub API rate limit exceeded. Try again later.") from e
else:
raise Exception(f"HTTP Error {e.code}: {e.reason}") from e
except URLError as e:
raise Exception(f"Network error: {e.reason}") from e
except json.JSONDecodeError as e:
raise Exception("Invalid response from GitHub API") from e
def _create_backup(self):
"""Create timestamped backup of current authorized_keys."""
self.backup_dir.mkdir(mode=0o700, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.current_backup_dir = self.backup_dir / timestamp
self.current_backup_dir.mkdir(mode=0o700, exist_ok=True)
# Backup current authorized_keys if it exists
if self.authorized_keys.exists():
backup_file = self.current_backup_dir / "authorized_keys"
shutil.copy2(self.authorized_keys, backup_file)
# Backup current state if it exists
if self.state_file.exists():
backup_state = self.current_backup_dir / "state.json"
shutil.copy2(self.state_file, backup_state)
return self.current_backup_dir
def _load_state(self):
"""Load previous state of managed keys."""
if not self.state_file.exists():
return {'keys': [], 'last_update': None}
try:
with open(self.state_file, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
raise Exception(f"Failed to load state file: {e}") from e
def _save_state(self, keys):
"""Save current state of managed keys."""
state = {
'keys': keys,
'last_update': datetime.now().isoformat(),
'username': self.username
}
# Write atomically
temp_file = self.state_file.with_suffix('.tmp')
try:
with open(temp_file, 'w') as f:
json.dump(state, f, indent=2)
temp_file.replace(self.state_file)
self.state_file.chmod(0o600)
except IOError as e:
raise Exception(f"Failed to save state file: {e}") from e
def _parse_authorized_keys(self):
"""Parse authorized_keys file into sections."""
if not self.authorized_keys.exists():
return {'before': [], 'managed': [], 'after': []}
try:
with open(self.authorized_keys, 'r') as f:
content = f.read()
except IOError as e:
raise Exception(f"Failed to read authorized_keys: {e}") from e
# Find managed section
start_match = re.search(re.escape(self.SECTION_START), content)
end_match = re.search(re.escape(self.SECTION_END), content)
if start_match and end_match and start_match.start() < end_match.start():
# Valid section found
before = content[:start_match.start()].rstrip()
after = content[end_match.end():].lstrip()
return {
'before': before.split('\n') if before else [],
'after': after.split('\n') if after else []
}
else:
# No managed section found, treat all as 'before'
lines = [l for l in content.split('\n') if l.strip()]
return {
'before': lines,
'after': []
}
def _build_authorized_keys(self, sections, new_managed_keys):
"""Rebuild authorized_keys content."""
lines = []
# Add content before section
if sections['before']:
lines.extend(sections['before'])
lines.append('') # Empty line before section
# Add managed section
lines.append(self.SECTION_START)
lines.append(f"# Managed by GitHub SSH Manager for user: {self.username}")
lines.append(f"# Last updated: {datetime.now().isoformat()}")
lines.append(f"# GitHub keys count: {len(new_managed_keys)}")
lines.append("")
for key_info in new_managed_keys:
# Add comment with key info
lines.append(f"# GitHub Key ID: {key_info['id']} - {key_info['title']}")
lines.append(key_info['key'])
lines.append("")
lines.append(self.SECTION_END)
# Add content after section
if sections['after']:
lines.append('') # Empty line after section
lines.extend(sections['after'])
return '\n'.join(lines) + '\n' # Ensure trailing newline
def _key_fingerprint(self, key):
"""Generate fingerprint for a key string."""
# Normalize key by removing extra whitespace
normalized = ' '.join(key.split())
return hashlib.sha256(normalized.encode()).hexdigest()[:16]
def sync(self, dry_run=False):
"""Main sync operation: fetch, compare, update."""
print(f"Fetching SSH keys for GitHub user: {self.username}")
# Fetch current keys from GitHub
try:
github_keys = self._fetch_github_keys()
except Exception as e:
print(f"Error fetching from GitHub: {e}", file=sys.stderr)
return 1
# CRITICAL: Never proceed if no keys returned or empty result
if not github_keys:
print("Error: GitHub returned empty key list. Aborting to prevent lockout.", file=sys.stderr)
return 1
print(f"Found {len(github_keys)} key(s) on GitHub")
# Load previous state
try:
old_state = self._load_state()
except Exception as e:
print(f"Error loading state: {e}", file=sys.stderr)
return 1
old_keys = old_state.get('keys', [])
# Compare keys for display purposes only (we always rebuild)
old_fingerprints = {self._key_fingerprint(k['key']) for k in old_keys}
new_fingerprints = {self._key_fingerprint(k['key']) for k in github_keys}
added = [k for k in github_keys
if self._key_fingerprint(k['key']) not in old_fingerprints]
removed_count = len(old_fingerprints - new_fingerprints)
unchanged = [k for k in github_keys
if self._key_fingerprint(k['key']) in old_fingerprints]
print(f"\nChanges detected:")
print(f" + Added: {len(added)} key(s)")
for k in added:
print(f" [{k['id']}] {k['title']}")
print(f" - Removed: {removed_count} key(s) (will be purged from section)")
print(f" = Unchanged: {len(unchanged)} key(s)")
if dry_run:
print("\n[DRY RUN] No changes made")
return 0
# Create backup
try:
backup_path = self._create_backup()
print(f"\nBackup created: {backup_path}")
except Exception as e:
print(f"Error creating backup: {e}", file=sys.stderr)
return 1
# Parse current authorized_keys
try:
sections = self._parse_authorized_keys()
except Exception as e:
print(f"Error parsing authorized_keys: {e}", file=sys.stderr)
return 1
# Build new authorized_keys
new_content = self._build_authorized_keys(sections, github_keys)
# Write atomically
temp_file = self.authorized_keys.with_suffix('.tmp')
try:
with open(temp_file, 'w') as f:
f.write(new_content)
temp_file.replace(self.authorized_keys)
self.authorized_keys.chmod(0o600)
except IOError as e:
print(f"Error writing authorized_keys: {e}", file=sys.stderr)
# Clean up temp file if it exists
if temp_file.exists():
temp_file.unlink()
return 1
# Save new state
try:
self._save_state(github_keys)
except Exception as e:
print(f"Error saving state: {e}", file=sys.stderr)
# Note: authorized_keys is already updated, which is fine
# since we have a backup
print(f"\nUpdated {self.authorized_keys}")
print("Done!")
return 0
def status(self):
"""Show current status without making changes."""
print(f"GitHub User: {self.username}")
print(f"SSH Directory: {self.ssh_dir}")
print(f"Authorized Keys: {self.authorized_keys}")
print(f"Backup Directory: {self.backup_dir}")
# Load state
try:
state = self._load_state()
except Exception as e:
print(f"\nError loading state: {e}", file=sys.stderr)
state = {'keys': [], 'last_update': None}
print(f"\nLast Update: {state.get('last_update', 'Never')}")
print(f"Previously Managed Keys: {len(state.get('keys', []))}")
# Parse current file
try:
sections = self._parse_authorized_keys()
except Exception as e:
print(f"\nError parsing authorized_keys: {e}", file=sys.stderr)
return 1
# Check for managed section
if not sections['before'] and not sections['after']:
print("Status: No managed section found yet (run sync to create)")
else:
print("Status: Managed section exists")
# Check for unmanaged keys
unmanaged = len(sections['before']) + len(sections['after'])
if unmanaged:
print(f"Unmanaged keys outside section: {unmanaged}")
return 0
def restore(self, backup_timestamp=None):
"""Restore from backup."""
if backup_timestamp:
backup_path = self.backup_dir / backup_timestamp
else:
# Find most recent backup
if not self.backup_dir.exists():
print("No backups directory", file=sys.stderr)
return 1
backups = sorted(self.backup_dir.glob("*"))
if not backups:
print("No backups found", file=sys.stderr)
return 1
backup_path = backups[-1]
if not backup_path.exists():
print(f"Backup not found: {backup_path}", file=sys.stderr)
return 1
print(f"Restoring from: {backup_path}")
# Restore authorized_keys
backup_keys = backup_path / "authorized_keys"
if backup_keys.exists():
try:
shutil.copy2(backup_keys, self.authorized_keys)
self.authorized_keys.chmod(0o600)
print(f"Restored {self.authorized_keys}")
except IOError as e:
print(f"Error restoring authorized_keys: {e}", file=sys.stderr)
return 1
else:
print("Warning: No authorized_keys in backup")
# Restore state
backup_state = backup_path / "state.json"
if backup_state.exists():
try:
shutil.copy2(backup_state, self.state_file)
self.state_file.chmod(0o600)
print(f"Restored {self.state_file}")
except IOError as e:
print(f"Error restoring state: {e}", file=sys.stderr)
return 1
else:
print("Warning: No state.json in backup")
return 0
def list_backups(self):
"""List available backups."""
if not self.backup_dir.exists():
print("No backups directory")
return 0
backups = sorted(self.backup_dir.glob("*"))
if not backups:
print("No backups found")
return 0
print("Available backups:")
for backup in backups:
state_file = backup / "state.json"
info = ""
if state_file.exists():
try:
with open(state_file) as f:
state = json.load(f)
count = len(state.get('keys', []))
info = f"({count} keys)"
except:
pass
print(f" {backup.name} {info}")
return 0
def main():
parser = argparse.ArgumentParser(
description="Manage SSH keys from GitHub with isolated section and backup"
)
parser.add_argument('username', help='GitHub username to fetch keys from')
parser.add_argument('--dry-run', '-n', action='store_true',
help='Show what would be done without making changes')
parser.add_argument('--status', '-s', action='store_true',
help='Show current status')
parser.add_argument('--restore', '-r', metavar='TIMESTAMP',
help='Restore from backup (use "latest" for most recent)')
parser.add_argument('--list-backups', '-l', action='store_true',
help='List available backups')
parser.add_argument('--backup-dir', '-b',
help='Custom backup directory (default: ~/.ssh/.github_ssh_manager_backups)')
args = parser.parse_args()
manager = GitHubSSHManager(args.username, args.backup_dir)
manager._ensure_ssh_dir()
if args.status:
return manager.status()
elif args.restore:
timestamp = None if args.restore == 'latest' else args.restore
return manager.restore(timestamp)
elif args.list_backups:
return manager.list_backups()
else:
return manager.sync(dry_run=args.dry_run)
if __name__ == '__main__':
sys.exit(main())
PYTHON_SCRIPT
# Make executable
chmod +x "$SCRIPT_PATH"
echo "Installed: $SCRIPT_PATH"
# # Ensure ~/.local/bin is in PATH for crontab
# if [[ ":$PATH:" != *":$HOME/.local/bin:"* ]]; then
# echo "Warning: ~/.local/bin is not in your PATH"
# echo "Add this to your ~/.bashrc or ~/.zshrc:"
# echo 'export PATH="$HOME/.local/bin:$PATH"'
# fi
# Setup crontab - every 3 hours
CRON_JOB="0 */3 * * * $SCRIPT_PATH $GITHUB_USER > /tmp/update-ssh-keys.log 2>&1"
# Check if crontab entry already exists
if crontab -l 2>/dev/null | grep -qF "$SCRIPT_PATH"; then
echo "Crontab entry already exists, updating..."
# Remove old entry and add new one
(crontab -l 2>/dev/null | grep -vF "$SCRIPT_PATH"; echo "$CRON_JOB") | crontab -
else
# Add new entry
(crontab -l 2>/dev/null; echo "$CRON_JOB") | crontab -
fi
echo "Crontab installed: runs every 3 hours"
echo "Logs to: /tmp/update-ssh-keys.log"
# Run initial sync
echo ""
echo "Running initial sync..."
"$SCRIPT_PATH" "$GITHUB_USER"
echo ""
echo "Installation complete!"
echo "Manual usage: $SCRIPT_PATH $GITHUB_USER"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment