Last active
February 15, 2026 09:53
-
-
Save arogozhnikov/42e39c83a80a07b32dc240864da487d3 to your computer and use it in GitHub Desktop.
Stable SSH auth agent, that keeps the same location for auth socket and can handle many connects/disconnects. Wor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| *personal* | |
| *private* | |
| .git | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Stable SSH auth agent, that keeps the same location for auth socket | |
| # and can handle many connects/disconnects. | |
| # Works until any SSH connection with agent works. | |
| import socket | |
| import threading | |
| import os | |
| import select | |
| import signal | |
| import random | |
| import logging | |
| from logging.handlers import RotatingFileHandler | |
| from pathlib import Path | |
| import sys | |
| import hashlib | |
| handler = RotatingFileHandler( | |
| "/tmp/stable_auth_agent.log", | |
| maxBytes=1_000_000, | |
| backupCount=2, | |
| ) | |
| handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.INFO) | |
| logger.addHandler(handler) | |
| def bridge(src_conn: socket.socket, dst_path: str): | |
| """Bidirectionally connect src_conn and dst_path socket.""" | |
| try: | |
| dst_conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| dst_conn.connect(dst_path) | |
| except Exception as e: | |
| logger.error(f"Failed to connect to {dst_path}: {e}") | |
| src_conn.close() | |
| return | |
| sockets = [src_conn, dst_conn] | |
| try: | |
| while True: | |
| rlist, _, _ = select.select(sockets, [], []) | |
| for s in rlist: | |
| data = s.recv(4096) | |
| if not data: | |
| return | |
| if s is src_conn: | |
| dst_conn.sendall(data) | |
| else: | |
| src_conn.sendall(data) | |
| finally: | |
| for c in sockets: | |
| try: | |
| c.close() | |
| except OSError: # can already be closed | |
| pass | |
| def main(listen_socket: Path): | |
| listen_socket.parent.mkdir(exist_ok=True, parents=True) | |
| symlinks_folder = listen_socket.with_suffix(".links") | |
| symlinks_folder.mkdir(exist_ok=True, parents=True) | |
| pid_file = listen_socket.with_suffix(".pid") | |
| my_hash = hashlib.md5(Path(__file__).read_bytes()).hexdigest() | |
| # first - always 'register' own auth socket | |
| sock_path = Path(os.environ["SSH_AUTH_SOCK"]) | |
| if sock_path.exists(): | |
| symlinks_folder.joinpath(str(random.randint(10**9, 10**10))).symlink_to( | |
| sock_path | |
| ) | |
| logger.info(f"Registered auth socket {sock_path=}") | |
| if pid_file.exists(): | |
| try: | |
| old_pid_str, old_hash = pid_file.read_text().strip().split(":::", 1) | |
| old_pid = int(old_pid_str) | |
| try: | |
| # check if process exists | |
| os.kill(old_pid, 0) | |
| if old_hash == my_hash: | |
| logger.info( | |
| f"Agent with hash {my_hash} already running at PID {old_pid}. Exiting." | |
| ) | |
| sys.exit(0) | |
| else: | |
| logger.info( | |
| f"Killing old agent (PID {old_pid}) {old_hash=} differs from {my_hash}." | |
| ) | |
| os.kill(old_pid, signal.SIGINT) | |
| except OSError: | |
| pass # Process dead or permission denied | |
| except ValueError: | |
| logger.warning("PID file corrupt, removing it") | |
| pid_file.write_text(f"{os.getpid()}:::{my_hash}") | |
| try: | |
| listen_socket.unlink() # Clean up old bridge socket if it exists | |
| except FileNotFoundError: | |
| pass | |
| server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| server.bind(listen_socket.as_posix()) | |
| server.listen() | |
| logger.info("started listening to new socket") | |
| try: | |
| while True: | |
| conn, _ = server.accept() | |
| alive_links = [] | |
| for f in symlinks_folder.iterdir(): | |
| if f.is_symlink(): | |
| if f.resolve().exists(): | |
| alive_links.append(f) | |
| else: | |
| f.unlink() | |
| logger.info(f"{len(alive_links)=} {alive_links=}") | |
| if not alive_links: | |
| logger.warning("no alive links, closing connection") | |
| conn.close() | |
| continue | |
| used_link = alive_links[0].as_posix() | |
| threading.Thread(target=bridge, args=(conn, used_link), daemon=True).start() | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| server.close() | |
| logger.info(f"removing {listen_socket=} before exiting") | |
| pid_file.unlink() | |
| listen_socket.unlink() | |
| if __name__ == "__main__": | |
| _, new_agent_socket = sys.argv[:] | |
| main(Path(new_agent_socket)) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
shall by used in
.profileas