Last active
February 18, 2026 12:47
-
-
Save jaysonsantos/c4c09eec2c817cff213bf25b0d4b65db to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/bash | |
| # GitHub SSH Key Manager Installer | |
| # Usage: ./install.sh <github_username> | |
| set -euo pipefail | |
| GITHUB_USER="${1:-}" | |
| INSTALL_DIR="$HOME/.local/bin" | |
| SCRIPT_NAME="update-ssh-keys.py" | |
| SCRIPT_PATH="$INSTALL_DIR/$SCRIPT_NAME" | |
| if [[ -z "$GITHUB_USER" ]]; then | |
| echo "Error: GitHub username required" | |
| echo "Usage: $0 <github_username>" | |
| exit 1 | |
| fi | |
| # Create install directory if needed | |
| mkdir -p "$INSTALL_DIR" | |
| # Write the Python script | |
| cat > "$SCRIPT_PATH" << 'PYTHON_SCRIPT' | |
| #!/usr/bin/env python3 | |
| """ | |
| GitHub SSH Key Manager | |
| Fetches SSH keys from GitHub, maintains local state with backup capability, | |
| and only modifies keys within its managed section. | |
| """ | |
| import os | |
| import sys | |
| import re | |
| import json | |
| import hashlib | |
| import argparse | |
| import shutil | |
| from pathlib import Path | |
| from datetime import datetime | |
| from urllib.request import urlopen, Request | |
| from urllib.error import HTTPError, URLError | |
| class GitHubSSHManager: | |
| # Markers for the managed section | |
| SECTION_START = "# >>> GITHUB_SSH_MANAGER_START (DO NOT EDIT MANUALLY)" | |
| SECTION_END = "# <<< GITHUB_SSH_MANAGER_END (DO NOT EDIT MANUALLY)" | |
| def __init__(self, username, backup_dir=None): | |
| self.username = username | |
| self.ssh_dir = Path.home() / ".ssh" | |
| self.authorized_keys = self.ssh_dir / "authorized_keys" | |
| # Default backup directory | |
| if backup_dir is None: | |
| self.backup_dir = self.ssh_dir / ".github_ssh_manager_backups" | |
| else: | |
| self.backup_dir = Path(backup_dir) | |
| self.state_file = self.backup_dir / "state.json" | |
| self.current_backup_dir = None | |
| def _ensure_ssh_dir(self): | |
| """Ensure .ssh directory exists with proper permissions.""" | |
| self.ssh_dir.mkdir(mode=0o700, exist_ok=True) | |
| def _fetch_github_keys(self): | |
| """Fetch SSH keys from GitHub API.""" | |
| url = f"https://api.github.com/users/{self.username}/keys" | |
| try: | |
| req = Request(url, headers={ | |
| 'User-Agent': 'GitHub-SSH-Manager/1.0', | |
| 'Accept': 'application/vnd.github.v3+json' | |
| }) | |
| with urlopen(req, timeout=30) as response: | |
| data = json.loads(response.read().decode('utf-8')) | |
| keys = [] | |
| for key_data in data: | |
| key_info = { | |
| 'id': key_data['id'], | |
| 'key': key_data['key'].strip(), | |
| 'title': key_data.get('title', f"github_key_{key_data['id']}"), | |
| 'fetched_at': datetime.now().isoformat() | |
| } | |
| keys.append(key_info) | |
| return keys | |
| except HTTPError as e: | |
| if e.code == 404: | |
| raise Exception(f"GitHub user '{self.username}' not found or no public keys") from e | |
| elif e.code == 403: | |
| raise Exception("GitHub API rate limit exceeded. Try again later.") from e | |
| else: | |
| raise Exception(f"HTTP Error {e.code}: {e.reason}") from e | |
| except URLError as e: | |
| raise Exception(f"Network error: {e.reason}") from e | |
| except json.JSONDecodeError as e: | |
| raise Exception("Invalid response from GitHub API") from e | |
| def _create_backup(self): | |
| """Create timestamped backup of current authorized_keys.""" | |
| self.backup_dir.mkdir(mode=0o700, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| self.current_backup_dir = self.backup_dir / timestamp | |
| self.current_backup_dir.mkdir(mode=0o700, exist_ok=True) | |
| # Backup current authorized_keys if it exists | |
| if self.authorized_keys.exists(): | |
| backup_file = self.current_backup_dir / "authorized_keys" | |
| shutil.copy2(self.authorized_keys, backup_file) | |
| # Backup current state if it exists | |
| if self.state_file.exists(): | |
| backup_state = self.current_backup_dir / "state.json" | |
| shutil.copy2(self.state_file, backup_state) | |
| return self.current_backup_dir | |
| def _load_state(self): | |
| """Load previous state of managed keys.""" | |
| if not self.state_file.exists(): | |
| return {'keys': [], 'last_update': None} | |
| try: | |
| with open(self.state_file, 'r') as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, IOError) as e: | |
| raise Exception(f"Failed to load state file: {e}") from e | |
| def _save_state(self, keys): | |
| """Save current state of managed keys.""" | |
| state = { | |
| 'keys': keys, | |
| 'last_update': datetime.now().isoformat(), | |
| 'username': self.username | |
| } | |
| # Write atomically | |
| temp_file = self.state_file.with_suffix('.tmp') | |
| try: | |
| with open(temp_file, 'w') as f: | |
| json.dump(state, f, indent=2) | |
| temp_file.replace(self.state_file) | |
| self.state_file.chmod(0o600) | |
| except IOError as e: | |
| raise Exception(f"Failed to save state file: {e}") from e | |
| def _parse_authorized_keys(self): | |
| """Parse authorized_keys file into sections.""" | |
| if not self.authorized_keys.exists(): | |
| return {'before': [], 'managed': [], 'after': []} | |
| try: | |
| with open(self.authorized_keys, 'r') as f: | |
| content = f.read() | |
| except IOError as e: | |
| raise Exception(f"Failed to read authorized_keys: {e}") from e | |
| # Find managed section | |
| start_match = re.search(re.escape(self.SECTION_START), content) | |
| end_match = re.search(re.escape(self.SECTION_END), content) | |
| if start_match and end_match and start_match.start() < end_match.start(): | |
| # Valid section found | |
| before = content[:start_match.start()].rstrip() | |
| after = content[end_match.end():].lstrip() | |
| return { | |
| 'before': before.split('\n') if before else [], | |
| 'after': after.split('\n') if after else [] | |
| } | |
| else: | |
| # No managed section found, treat all as 'before' | |
| lines = [l for l in content.split('\n') if l.strip()] | |
| return { | |
| 'before': lines, | |
| 'after': [] | |
| } | |
| def _build_authorized_keys(self, sections, new_managed_keys): | |
| """Rebuild authorized_keys content.""" | |
| lines = [] | |
| # Add content before section | |
| if sections['before']: | |
| lines.extend(sections['before']) | |
| lines.append('') # Empty line before section | |
| # Add managed section | |
| lines.append(self.SECTION_START) | |
| lines.append(f"# Managed by GitHub SSH Manager for user: {self.username}") | |
| lines.append(f"# Last updated: {datetime.now().isoformat()}") | |
| lines.append(f"# GitHub keys count: {len(new_managed_keys)}") | |
| lines.append("") | |
| for key_info in new_managed_keys: | |
| # Add comment with key info | |
| lines.append(f"# GitHub Key ID: {key_info['id']} - {key_info['title']}") | |
| lines.append(key_info['key']) | |
| lines.append("") | |
| lines.append(self.SECTION_END) | |
| # Add content after section | |
| if sections['after']: | |
| lines.append('') # Empty line after section | |
| lines.extend(sections['after']) | |
| return '\n'.join(lines) + '\n' # Ensure trailing newline | |
| def _key_fingerprint(self, key): | |
| """Generate fingerprint for a key string.""" | |
| # Normalize key by removing extra whitespace | |
| normalized = ' '.join(key.split()) | |
| return hashlib.sha256(normalized.encode()).hexdigest()[:16] | |
| def sync(self, dry_run=False): | |
| """Main sync operation: fetch, compare, update.""" | |
| print(f"Fetching SSH keys for GitHub user: {self.username}") | |
| # Fetch current keys from GitHub | |
| try: | |
| github_keys = self._fetch_github_keys() | |
| except Exception as e: | |
| print(f"Error fetching from GitHub: {e}", file=sys.stderr) | |
| return 1 | |
| # CRITICAL: Never proceed if no keys returned or empty result | |
| if not github_keys: | |
| print("Error: GitHub returned empty key list. Aborting to prevent lockout.", file=sys.stderr) | |
| return 1 | |
| print(f"Found {len(github_keys)} key(s) on GitHub") | |
| # Load previous state | |
| try: | |
| old_state = self._load_state() | |
| except Exception as e: | |
| print(f"Error loading state: {e}", file=sys.stderr) | |
| return 1 | |
| old_keys = old_state.get('keys', []) | |
| # Compare keys for display purposes only (we always rebuild) | |
| old_fingerprints = {self._key_fingerprint(k['key']) for k in old_keys} | |
| new_fingerprints = {self._key_fingerprint(k['key']) for k in github_keys} | |
| added = [k for k in github_keys | |
| if self._key_fingerprint(k['key']) not in old_fingerprints] | |
| removed_count = len(old_fingerprints - new_fingerprints) | |
| unchanged = [k for k in github_keys | |
| if self._key_fingerprint(k['key']) in old_fingerprints] | |
| print(f"\nChanges detected:") | |
| print(f" + Added: {len(added)} key(s)") | |
| for k in added: | |
| print(f" [{k['id']}] {k['title']}") | |
| print(f" - Removed: {removed_count} key(s) (will be purged from section)") | |
| print(f" = Unchanged: {len(unchanged)} key(s)") | |
| if dry_run: | |
| print("\n[DRY RUN] No changes made") | |
| return 0 | |
| # Create backup | |
| try: | |
| backup_path = self._create_backup() | |
| print(f"\nBackup created: {backup_path}") | |
| except Exception as e: | |
| print(f"Error creating backup: {e}", file=sys.stderr) | |
| return 1 | |
| # Parse current authorized_keys | |
| try: | |
| sections = self._parse_authorized_keys() | |
| except Exception as e: | |
| print(f"Error parsing authorized_keys: {e}", file=sys.stderr) | |
| return 1 | |
| # Build new authorized_keys | |
| new_content = self._build_authorized_keys(sections, github_keys) | |
| # Write atomically | |
| temp_file = self.authorized_keys.with_suffix('.tmp') | |
| try: | |
| with open(temp_file, 'w') as f: | |
| f.write(new_content) | |
| temp_file.replace(self.authorized_keys) | |
| self.authorized_keys.chmod(0o600) | |
| except IOError as e: | |
| print(f"Error writing authorized_keys: {e}", file=sys.stderr) | |
| # Clean up temp file if it exists | |
| if temp_file.exists(): | |
| temp_file.unlink() | |
| return 1 | |
| # Save new state | |
| try: | |
| self._save_state(github_keys) | |
| except Exception as e: | |
| print(f"Error saving state: {e}", file=sys.stderr) | |
| # Note: authorized_keys is already updated, which is fine | |
| # since we have a backup | |
| print(f"\nUpdated {self.authorized_keys}") | |
| print("Done!") | |
| return 0 | |
| def status(self): | |
| """Show current status without making changes.""" | |
| print(f"GitHub User: {self.username}") | |
| print(f"SSH Directory: {self.ssh_dir}") | |
| print(f"Authorized Keys: {self.authorized_keys}") | |
| print(f"Backup Directory: {self.backup_dir}") | |
| # Load state | |
| try: | |
| state = self._load_state() | |
| except Exception as e: | |
| print(f"\nError loading state: {e}", file=sys.stderr) | |
| state = {'keys': [], 'last_update': None} | |
| print(f"\nLast Update: {state.get('last_update', 'Never')}") | |
| print(f"Previously Managed Keys: {len(state.get('keys', []))}") | |
| # Parse current file | |
| try: | |
| sections = self._parse_authorized_keys() | |
| except Exception as e: | |
| print(f"\nError parsing authorized_keys: {e}", file=sys.stderr) | |
| return 1 | |
| # Check for managed section | |
| if not sections['before'] and not sections['after']: | |
| print("Status: No managed section found yet (run sync to create)") | |
| else: | |
| print("Status: Managed section exists") | |
| # Check for unmanaged keys | |
| unmanaged = len(sections['before']) + len(sections['after']) | |
| if unmanaged: | |
| print(f"Unmanaged keys outside section: {unmanaged}") | |
| return 0 | |
| def restore(self, backup_timestamp=None): | |
| """Restore from backup.""" | |
| if backup_timestamp: | |
| backup_path = self.backup_dir / backup_timestamp | |
| else: | |
| # Find most recent backup | |
| if not self.backup_dir.exists(): | |
| print("No backups directory", file=sys.stderr) | |
| return 1 | |
| backups = sorted(self.backup_dir.glob("*")) | |
| if not backups: | |
| print("No backups found", file=sys.stderr) | |
| return 1 | |
| backup_path = backups[-1] | |
| if not backup_path.exists(): | |
| print(f"Backup not found: {backup_path}", file=sys.stderr) | |
| return 1 | |
| print(f"Restoring from: {backup_path}") | |
| # Restore authorized_keys | |
| backup_keys = backup_path / "authorized_keys" | |
| if backup_keys.exists(): | |
| try: | |
| shutil.copy2(backup_keys, self.authorized_keys) | |
| self.authorized_keys.chmod(0o600) | |
| print(f"Restored {self.authorized_keys}") | |
| except IOError as e: | |
| print(f"Error restoring authorized_keys: {e}", file=sys.stderr) | |
| return 1 | |
| else: | |
| print("Warning: No authorized_keys in backup") | |
| # Restore state | |
| backup_state = backup_path / "state.json" | |
| if backup_state.exists(): | |
| try: | |
| shutil.copy2(backup_state, self.state_file) | |
| self.state_file.chmod(0o600) | |
| print(f"Restored {self.state_file}") | |
| except IOError as e: | |
| print(f"Error restoring state: {e}", file=sys.stderr) | |
| return 1 | |
| else: | |
| print("Warning: No state.json in backup") | |
| return 0 | |
| def list_backups(self): | |
| """List available backups.""" | |
| if not self.backup_dir.exists(): | |
| print("No backups directory") | |
| return 0 | |
| backups = sorted(self.backup_dir.glob("*")) | |
| if not backups: | |
| print("No backups found") | |
| return 0 | |
| print("Available backups:") | |
| for backup in backups: | |
| state_file = backup / "state.json" | |
| info = "" | |
| if state_file.exists(): | |
| try: | |
| with open(state_file) as f: | |
| state = json.load(f) | |
| count = len(state.get('keys', [])) | |
| info = f"({count} keys)" | |
| except: | |
| pass | |
| print(f" {backup.name} {info}") | |
| return 0 | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Manage SSH keys from GitHub with isolated section and backup" | |
| ) | |
| parser.add_argument('username', help='GitHub username to fetch keys from') | |
| parser.add_argument('--dry-run', '-n', action='store_true', | |
| help='Show what would be done without making changes') | |
| parser.add_argument('--status', '-s', action='store_true', | |
| help='Show current status') | |
| parser.add_argument('--restore', '-r', metavar='TIMESTAMP', | |
| help='Restore from backup (use "latest" for most recent)') | |
| parser.add_argument('--list-backups', '-l', action='store_true', | |
| help='List available backups') | |
| parser.add_argument('--backup-dir', '-b', | |
| help='Custom backup directory (default: ~/.ssh/.github_ssh_manager_backups)') | |
| args = parser.parse_args() | |
| manager = GitHubSSHManager(args.username, args.backup_dir) | |
| manager._ensure_ssh_dir() | |
| if args.status: | |
| return manager.status() | |
| elif args.restore: | |
| timestamp = None if args.restore == 'latest' else args.restore | |
| return manager.restore(timestamp) | |
| elif args.list_backups: | |
| return manager.list_backups() | |
| else: | |
| return manager.sync(dry_run=args.dry_run) | |
| if __name__ == '__main__': | |
| sys.exit(main()) | |
| PYTHON_SCRIPT | |
| # Make executable | |
| chmod +x "$SCRIPT_PATH" | |
| echo "Installed: $SCRIPT_PATH" | |
| # # Ensure ~/.local/bin is in PATH for crontab | |
| # if [[ ":$PATH:" != *":$HOME/.local/bin:"* ]]; then | |
| # echo "Warning: ~/.local/bin is not in your PATH" | |
| # echo "Add this to your ~/.bashrc or ~/.zshrc:" | |
| # echo 'export PATH="$HOME/.local/bin:$PATH"' | |
| # fi | |
| # Setup crontab - every 3 hours | |
| CRON_JOB="0 */3 * * * $SCRIPT_PATH $GITHUB_USER > /tmp/update-ssh-keys.log 2>&1" | |
| # Check if crontab entry already exists | |
| if crontab -l 2>/dev/null | grep -qF "$SCRIPT_PATH"; then | |
| echo "Crontab entry already exists, updating..." | |
| # Remove old entry and add new one | |
| (crontab -l 2>/dev/null | grep -vF "$SCRIPT_PATH"; echo "$CRON_JOB") | crontab - | |
| else | |
| # Add new entry | |
| (crontab -l 2>/dev/null; echo "$CRON_JOB") | crontab - | |
| fi | |
| echo "Crontab installed: runs every 3 hours" | |
| echo "Logs to: /tmp/update-ssh-keys.log" | |
| # Run initial sync | |
| echo "" | |
| echo "Running initial sync..." | |
| "$SCRIPT_PATH" "$GITHUB_USER" | |
| echo "" | |
| echo "Installation complete!" | |
| echo "Manual usage: $SCRIPT_PATH $GITHUB_USER" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment