Skip to content

Instantly share code, notes, and snippets.

@arogozhnikov
Last active February 15, 2026 09:53
Show Gist options
  • Select an option

  • Save arogozhnikov/42e39c83a80a07b32dc240864da487d3 to your computer and use it in GitHub Desktop.

Select an option

Save arogozhnikov/42e39c83a80a07b32dc240864da487d3 to your computer and use it in GitHub Desktop.
Stable SSH auth agent, that keeps the same location for auth socket and can handle many connects/disconnects. Wor
*personal*
*private*
.git
# Stable SSH auth agent, that keeps the same location for auth socket
# and can handle many connects/disconnects.
# Works until any SSH connection with agent works.
import socket
import threading
import os
import select
import signal
import random
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path
import sys
import hashlib
handler = RotatingFileHandler(
"/tmp/stable_auth_agent.log",
maxBytes=1_000_000,
backupCount=2,
)
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(handler)
def bridge(src_conn: socket.socket, dst_path: str):
"""Bidirectionally connect src_conn and dst_path socket."""
try:
dst_conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
dst_conn.connect(dst_path)
except Exception as e:
logger.error(f"Failed to connect to {dst_path}: {e}")
src_conn.close()
return
sockets = [src_conn, dst_conn]
try:
while True:
rlist, _, _ = select.select(sockets, [], [])
for s in rlist:
data = s.recv(4096)
if not data:
return
if s is src_conn:
dst_conn.sendall(data)
else:
src_conn.sendall(data)
finally:
for c in sockets:
try:
c.close()
except OSError: # can already be closed
pass
def main(listen_socket: Path):
listen_socket.parent.mkdir(exist_ok=True, parents=True)
symlinks_folder = listen_socket.with_suffix(".links")
symlinks_folder.mkdir(exist_ok=True, parents=True)
pid_file = listen_socket.with_suffix(".pid")
my_hash = hashlib.md5(Path(__file__).read_bytes()).hexdigest()
# first - always 'register' own auth socket
sock_path = Path(os.environ["SSH_AUTH_SOCK"])
if sock_path.exists():
symlinks_folder.joinpath(str(random.randint(10**9, 10**10))).symlink_to(
sock_path
)
logger.info(f"Registered auth socket {sock_path=}")
if pid_file.exists():
try:
old_pid_str, old_hash = pid_file.read_text().strip().split(":::", 1)
old_pid = int(old_pid_str)
try:
# check if process exists
os.kill(old_pid, 0)
if old_hash == my_hash:
logger.info(
f"Agent with hash {my_hash} already running at PID {old_pid}. Exiting."
)
sys.exit(0)
else:
logger.info(
f"Killing old agent (PID {old_pid}) {old_hash=} differs from {my_hash}."
)
os.kill(old_pid, signal.SIGINT)
except OSError:
pass # Process dead or permission denied
except ValueError:
logger.warning("PID file corrupt, removing it")
pid_file.write_text(f"{os.getpid()}:::{my_hash}")
try:
listen_socket.unlink() # Clean up old bridge socket if it exists
except FileNotFoundError:
pass
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(listen_socket.as_posix())
server.listen()
logger.info("started listening to new socket")
try:
while True:
conn, _ = server.accept()
alive_links = []
for f in symlinks_folder.iterdir():
if f.is_symlink():
if f.resolve().exists():
alive_links.append(f)
else:
f.unlink()
logger.info(f"{len(alive_links)=} {alive_links=}")
if not alive_links:
logger.warning("no alive links, closing connection")
conn.close()
continue
used_link = alive_links[0].as_posix()
threading.Thread(target=bridge, args=(conn, used_link), daemon=True).start()
except KeyboardInterrupt:
pass
finally:
server.close()
logger.info(f"removing {listen_socket=} before exiting")
pid_file.unlink()
listen_socket.unlink()
if __name__ == "__main__":
_, new_agent_socket = sys.argv[:]
main(Path(new_agent_socket))
@arogozhnikov
Copy link
Author

arogozhnikov commented Oct 26, 2025

shall by used in .profile as

nohup python <file.py> /tmp/my_stable_socket &
export SSH_AUTH_SOCK=/tmp/my_stable_socket

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment