-
-
Save mikedh/f3528dfbed78c7537541b9c0aca1620f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| sux - a less annoying tmux wrapper | |
| Modes: | |
| tmux `sux <name>` creates or attaches to a named tmux session. | |
| `sux` with no args attaches to the most recent session. | |
| docker `sux -d <name>` mounts the current directory into an isolated | |
| Docker container with rust, uv, node, and claude-code. | |
| Non-root user with sudo and NVIDIA GPU passthrough. | |
| `sux <name>` will automatically reattach if a container exists. | |
| worktree `sux -w <name>` creates a git worktree in ./worktrees/<name> | |
| on a new branch and opens a tmux session in it. | |
| combined `sux -w -d <name>` creates a worktree and runs it in Docker. | |
| yolo `sux -w -y "prompt" <name>` creates a worktree, starts a | |
| Docker container, and runs claude --dangerously-skip-permissions. | |
| Requires -w for safety. | |
| kill `sux -k <name>` kills the tmux session and removes the container. | |
| list `sux -l` lists all tmux sessions and running Docker containers. | |
| config `sux --config` writes a sane ~/.tmux.conf and rebuilds the | |
| Docker base image. | |
| """ | |
| import argparse | |
| import os | |
| import pwd | |
| import shutil | |
| import signal | |
| import subprocess | |
| import sys | |
| import time | |
| from pathlib import Path | |
| SUX_DOCKERFILE = """\ | |
| FROM debian:trixie | |
| RUN apt-get update && apt-get install -y \ | |
| curl git build-essential pkg-config libssl-dev gnupg \ | |
| ca-certificates sudo wget weston vulkan-tools emacs nano libtbb-dev \ | |
| && rm -rf /var/lib/apt/lists/* | |
| # Create non-root user matching host UID/GID/username | |
| ARG UID=1000 | |
| ARG GID=1000 | |
| ARG USERNAME=user | |
| RUN groupadd -g $GID $USERNAME && useradd -m -u $UID -g $GID -s /bin/bash $USERNAME | |
| RUN echo "$USERNAME ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers | |
| # Docker CLI only (daemon runs on host, accessed via proxy socket) | |
| RUN install -m 0755 -d /etc/apt/keyrings && \ | |
| curl -fsSL https://download.docker.com/linux/debian/gpg \ | |
| -o /etc/apt/keyrings/docker.asc && \ | |
| echo "deb [arch=$(dpkg --print-architecture) \ | |
| signed-by=/etc/apt/keyrings/docker.asc] \ | |
| https://download.docker.com/linux/debian trixie stable" \ | |
| > /etc/apt/sources.list.d/docker.list && \ | |
| apt-get update && \ | |
| apt-get install -y docker-ce-cli && \ | |
| rm -rf /var/lib/apt/lists/* | |
| USER $USERNAME | |
| ENV HOME=/home/$USERNAME | |
| # Rust (latest stable via rustup) | |
| RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y | |
| ENV PATH="$HOME/.cargo/bin:${PATH}" | |
| # uv (latest) | |
| RUN curl -LsSf https://astral.sh/uv/install.sh | sh | |
| ENV PATH="$HOME/.local/bin:${PATH}" | |
| # nvm + latest Node.js | |
| ENV NVM_DIR="$HOME/.nvm" | |
| RUN curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.1/install.sh | bash \ | |
| && . "$NVM_DIR/nvm.sh" && nvm install node | |
| # Claude Code (native installer) | |
| RUN curl -fsSL https://claude.ai/install.sh | bash | |
| # Terminal colors and aliases | |
| RUN cat >> $HOME/.bashrc <<'BASHRC' | |
| export TERM=xterm-256color | |
| PS1='\\[\\033[01;32m\\]\\u@\\h\\[\\033[00m\\]:\\[\\033[01;34m\\]\\w\\[\\033[00m\\]\\$ ' | |
| alias ls='ls --color=auto' | |
| eval "$(dircolors -b)" | |
| uv tool install ruff | |
| alias pip='uv pip' | |
| alias python='uv run python' | |
| alias python3='uv run python' | |
| alias yolo='claude --dangerously-skip-permissions' | |
| BASHRC | |
| ENV NVIDIA_DRIVER_CAPABILITIES=all | |
| WORKDIR /workspace | |
| """ | |
| TMUX_CONFIG = """\ | |
| # Enable mouse mode (scroll, click, resize panes) | |
| set -g mouse on | |
| # Increase scrollback buffer | |
| set -g history-limit 50000 | |
| # Start window/pane numbering at 1 | |
| set -g base-index 1 | |
| setw -g pane-base-index 1 | |
| # Faster escape time (helps with vim) | |
| set -sg escape-time 10 | |
| # Better colors | |
| set -g default-terminal "screen-256color" | |
| # Ctrl-a prefix (screen-like) | |
| set -g prefix C-a | |
| unbind C-b | |
| bind C-a send-prefix | |
| bind C-d detach | |
| """ | |
| def _proxy_serve(sock_path, workspace_host): | |
| """Docker socket filtering proxy. Runs in forked child, never returns.""" | |
| import asyncio | |
| import json | |
| import re | |
| DOCKER_SOCK = "/var/run/docker.sock" | |
| _V = r"(/v[\d.]+)?" | |
| _Q = r"(\?.*)?$" | |
| _ID = r"/[^/]+" | |
| API_WHITELIST = { | |
| "GET": re.compile( | |
| _V + r"/(containers|images|networks|volumes)(" + _ID + r".*)?$" | |
| r"|" + _V + r"/(info|version|_ping)$" | |
| r"|/_ping$" | |
| ), | |
| "HEAD": re.compile(_V + r"/_ping$|/_ping$"), | |
| "POST": re.compile( | |
| _V + r"/containers/create" + _Q | |
| + r"|" + _V + r"/containers" + _ID + r"/(start|stop|restart|kill|wait|resize|exec|attach)" + _Q | |
| + r"|" + _V + r"/exec" + _ID + r"/(start|resize|json)" + _Q | |
| + r"|" + _V + r"/(images/create|build|networks/create|volumes/create)" + _Q | |
| ), | |
| "DELETE": re.compile(_V + r"/(containers|networks|volumes)" + _ID + _Q), | |
| } | |
| ALLOWED_HOSTCONFIG = { | |
| "Binds", "NetworkMode", "PortBindings", "Tmpfs", "RestartPolicy", | |
| "Runtime", "DeviceRequests", "AutoRemove", "ShmSize", | |
| "Memory", "MemorySwap", "NanoCpus", "CpuShares", "CpuQuota", | |
| } | |
| ALLOWED_TOP_LEVEL = { | |
| "Image", "Cmd", "Entrypoint", "Env", "WorkingDir", "User", | |
| "ExposedPorts", "Labels", "Volumes", "Tty", "OpenStdin", "StdinOnce", | |
| "AttachStdin", "AttachStdout", "AttachStderr", "HostConfig", | |
| "NetworkingConfig", "Hostname", "StopSignal", "Healthcheck", | |
| } | |
| def sanitize_binds(binds): | |
| if not binds: | |
| return [] | |
| result = [] | |
| for bind in binds: | |
| parts = bind.split(":") | |
| src = parts[0] | |
| if not src.startswith("/"): | |
| result.append(bind) | |
| elif src == "/workspace" or src.startswith("/workspace/"): | |
| parts[0] = workspace_host + src[len("/workspace"):] | |
| result.append(":".join(parts)) | |
| else: | |
| print(f"PROXY: blocked bind mount: {bind}", file=sys.stderr) | |
| return result | |
| def sanitize_create(body): | |
| try: | |
| data = json.loads(body) | |
| except (json.JSONDecodeError, TypeError): | |
| return body | |
| clean = {k: v for k, v in data.items() if k in ALLOWED_TOP_LEVEL} | |
| hc = data.get("HostConfig") | |
| if hc and isinstance(hc, dict): | |
| clean_hc = {k: v for k, v in hc.items() if k in ALLOWED_HOSTCONFIG} | |
| if "Binds" in clean_hc: | |
| clean_hc["Binds"] = sanitize_binds(clean_hc["Binds"]) | |
| if clean_hc.get("NetworkMode") == "host": | |
| clean_hc["NetworkMode"] = "bridge" | |
| print("PROXY: blocked NetworkMode=host", file=sys.stderr) | |
| clean["HostConfig"] = clean_hc | |
| return json.dumps(clean).encode() | |
| async def read_http_head(reader): | |
| lines = [] | |
| while True: | |
| line = await reader.readline() | |
| if not line: | |
| return None, {}, 0 | |
| lines.append(line) | |
| if line == b"\r\n": | |
| break | |
| head = b"".join(lines) | |
| headers = {} | |
| for raw in lines[1:]: | |
| decoded = raw.decode("latin-1").strip() | |
| if ":" in decoded: | |
| k, v = decoded.split(":", 1) | |
| headers[k.strip().lower()] = v.strip() | |
| return head, headers, int(headers.get("content-length", 0)) | |
| async def forward_bytes(src, dst): | |
| try: | |
| while data := await src.read(65536): | |
| dst.write(data) | |
| await dst.drain() | |
| except (ConnectionResetError, BrokenPipeError, asyncio.CancelledError): | |
| pass | |
| finally: | |
| try: | |
| dst.close() | |
| except Exception: | |
| pass | |
| async def handle_client(client_r, client_w): | |
| try: | |
| while True: | |
| req_head, req_headers, req_cl = await read_http_head(client_r) | |
| if req_head is None: | |
| return | |
| first_line = req_head.split(b"\r\n")[0].decode("latin-1") | |
| parts = first_line.split() | |
| if len(parts) < 2: | |
| return | |
| method, path = parts[0], parts[1] | |
| body = b"" | |
| if req_cl > 0: | |
| body = await client_r.readexactly(req_cl) | |
| if req_headers.get("transfer-encoding", "").lower() == "chunked": | |
| chunks = [] | |
| while True: | |
| size_line = await client_r.readline() | |
| size = int(size_line.strip(), 16) | |
| if size == 0: | |
| await client_r.readline() | |
| break | |
| chunks.append(await client_r.readexactly(size)) | |
| await client_r.readline() | |
| body = b"".join(chunks) | |
| # Check API whitelist | |
| pat = API_WHITELIST.get(method) | |
| if not pat or not pat.search(path): | |
| client_w.write( | |
| b"HTTP/1.1 403 Forbidden\r\n" | |
| b"Content-Type: application/json\r\n" | |
| b"Content-Length: 52\r\n\r\n" | |
| b'{"message":"Blocked by sux Docker security proxy."}' | |
| ) | |
| await client_w.drain() | |
| print(f"PROXY: blocked {method} {path}", file=sys.stderr) | |
| return | |
| if method == "POST" and "/containers/create" in path and body: | |
| body = sanitize_create(body) | |
| # Rebuild request with updated content-length | |
| new_headers = [] | |
| for raw in req_head.split(b"\r\n")[1:]: | |
| if not raw: | |
| continue | |
| low = raw.decode("latin-1").lower() | |
| if low.startswith(("content-length:", "transfer-encoding:")): | |
| continue | |
| new_headers.append(raw) | |
| if body: | |
| new_headers.append(f"Content-Length: {len(body)}".encode()) | |
| new_req = ( | |
| f"{method} {path} HTTP/1.1\r\n".encode() | |
| + b"\r\n".join(new_headers) + b"\r\n\r\n" + body | |
| ) | |
| up_r, up_w = await asyncio.open_unix_connection(DOCKER_SOCK) | |
| up_w.write(new_req) | |
| await up_w.drain() | |
| resp_head, resp_headers, resp_cl = await read_http_head(up_r) | |
| if resp_head is None: | |
| up_w.close() | |
| return | |
| resp_first = resp_head.split(b"\r\n")[0].decode("latin-1") | |
| # 101 Upgrade: bidirectional forwarding (docker exec -it) | |
| if "101" in resp_first: | |
| client_w.write(resp_head) | |
| await client_w.drain() | |
| await asyncio.gather( | |
| forward_bytes(client_r, up_w), | |
| forward_bytes(up_r, client_w), | |
| return_exceptions=True, | |
| ) | |
| return | |
| if resp_headers.get("transfer-encoding", "").lower() == "chunked": | |
| client_w.write(resp_head) | |
| await client_w.drain() | |
| while True: | |
| size_line = await up_r.readline() | |
| if not size_line: | |
| break | |
| client_w.write(size_line) | |
| await client_w.drain() | |
| size = int(size_line.strip(), 16) | |
| if size == 0: | |
| client_w.write(await up_r.readline()) | |
| await client_w.drain() | |
| break | |
| client_w.write(await up_r.readexactly(size)) | |
| client_w.write(await up_r.readline()) | |
| await client_w.drain() | |
| elif resp_cl > 0: | |
| client_w.write(resp_head + await up_r.readexactly(resp_cl)) | |
| await client_w.drain() | |
| else: | |
| client_w.write(resp_head) | |
| await client_w.drain() | |
| up_w.close() | |
| except (ConnectionResetError, BrokenPipeError, asyncio.IncompleteReadError): | |
| pass | |
| except Exception as e: | |
| print(f"PROXY error: {e}", file=sys.stderr) | |
| finally: | |
| try: | |
| client_w.close() | |
| except Exception: | |
| pass | |
| async def serve(): | |
| try: | |
| os.unlink(sock_path) | |
| except FileNotFoundError: | |
| pass | |
| server = await asyncio.start_unix_server(handle_client, path=sock_path) | |
| os.chmod(sock_path, 0o666) | |
| print(f"PROXY: listening on {sock_path}", file=sys.stderr) | |
| async with server: | |
| await server.serve_forever() | |
| asyncio.run(serve()) | |
| def run_tmux(*args): | |
| """Run tmux command, replacing current process.""" | |
| os.execvp("tmux", ["tmux"] + list(args)) | |
| def tmux_running(): | |
| """Check if tmux server is running.""" | |
| return ( | |
| subprocess.run(["tmux", "list-sessions"], capture_output=True).returncode == 0 | |
| ) | |
| def host_username(): | |
| """Get the current host username.""" | |
| return pwd.getpwuid(os.getuid()).pw_name | |
| def build_docker_image(): | |
| """Build the sux-base Docker image.""" | |
| subprocess.run( | |
| [ | |
| "docker", | |
| "build", | |
| "-t", | |
| "sux-base", | |
| "--build-arg", | |
| f"UID={os.getuid()}", | |
| "--build-arg", | |
| f"GID={os.getgid()}", | |
| "--build-arg", | |
| f"USERNAME={host_username()}", | |
| "-", | |
| ], | |
| input=SUX_DOCKERFILE.encode(), | |
| check=True, | |
| ) | |
| def apply_config(): | |
| """Write tmux config and rebuild Docker image.""" | |
| conf_path = Path.home() / ".tmux.conf" | |
| backup_path = Path.home() / ".tmux.conf.bak" | |
| if conf_path.exists(): | |
| shutil.copy(conf_path, backup_path) | |
| print(f"Backed up existing config to {backup_path}") | |
| conf_path.write_text(TMUX_CONFIG) | |
| print(f"Wrote config to {conf_path}") | |
| if tmux_running(): | |
| subprocess.run(["tmux", "source-file", str(conf_path)]) | |
| print("Reloaded tmux config") | |
| # Rebuild sux-base Docker image | |
| subprocess.run(["docker", "rmi", "-f", "sux-base"], capture_output=True) | |
| print("Building sux-base Docker image...") | |
| build_docker_image() | |
| print("Rebuilt sux-base image") | |
| def list_sessions(): | |
| """List tmux and docker sessions.""" | |
| subprocess.run(["tmux", "list-sessions"]) | |
| result = subprocess.run( | |
| [ | |
| "docker", | |
| "ps", | |
| "--filter", | |
| "name=sux-", | |
| "--format", | |
| "{{.Names}}\t{{.Status}}", | |
| ], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| if result.stdout.strip(): | |
| print("\nDocker containers:") | |
| for line in result.stdout.strip().splitlines(): | |
| name, status = line.split("\t", 1) | |
| session = name.removeprefix("sux-") | |
| print(f" {session}: {status} (sux -d {session})") | |
| def ensure_docker_image(): | |
| """Build the sux-base Docker image if it doesn't exist.""" | |
| result = subprocess.run( | |
| ["docker", "image", "inspect", "sux-base"], capture_output=True | |
| ) | |
| if result.returncode == 0: | |
| return | |
| print("Building sux-base Docker image...") | |
| build_docker_image() | |
| print("Built sux-base image") | |
| def proxy_paths(name): | |
| """Return (sock, pid, log) paths for a proxy.""" | |
| base = f"/tmp/sux-proxy-{name}" | |
| return f"{base}.sock", f"{base}.pid", f"{base}.log" | |
| def start_proxy(name): | |
| """Fork a child process running the filtering proxy.""" | |
| sock, pidfile, logfile = proxy_paths(name) | |
| workspace_host = str(Path.cwd().resolve()) | |
| pid = os.fork() | |
| if pid == 0: | |
| # Child: daemonize and run proxy | |
| os.setsid() | |
| sys.stdin.close() | |
| log_fd = os.open(logfile, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o644) | |
| os.dup2(log_fd, 1) | |
| os.dup2(log_fd, 2) | |
| os.close(log_fd) | |
| try: | |
| _proxy_serve(sock, workspace_host) | |
| except Exception: | |
| import traceback | |
| traceback.print_exc() | |
| os._exit(1) | |
| # Parent: save PID and wait for socket | |
| Path(pidfile).write_text(str(pid)) | |
| for _ in range(50): | |
| if Path(sock).exists(): | |
| return sock | |
| time.sleep(0.1) | |
| diag = Path(logfile).read_text()[:4096] if Path(logfile).exists() else "" | |
| raise RuntimeError(f"Proxy failed to start within 5s. log: {diag}") | |
| def stop_proxy(name): | |
| """Stop the proxy and clean up.""" | |
| sock, pidfile, logfile = proxy_paths(name) | |
| if Path(pidfile).exists(): | |
| try: | |
| os.kill(int(Path(pidfile).read_text().strip()), signal.SIGTERM) | |
| except (ValueError, ProcessLookupError, PermissionError): | |
| pass | |
| for f in (pidfile, sock, logfile): | |
| Path(f).unlink(missing_ok=True) | |
| def ensure_proxy(name): | |
| """Ensure proxy is running, (re)starting if needed.""" | |
| _, pidfile, _ = proxy_paths(name) | |
| if Path(pidfile).exists(): | |
| try: | |
| os.kill(int(Path(pidfile).read_text().strip()), 0) | |
| return | |
| except (ValueError, ProcessLookupError): | |
| pass | |
| start_proxy(name) | |
| def docker_session(name, yolo=None): | |
| """Mount current directory into Docker container and open session.""" | |
| container_name = f"sux-{name}" | |
| host_dir = str(Path.cwd().resolve()) | |
| claude_dir = str(Path.home() / ".claude") | |
| claude_json = str(Path.home() / ".claude.json") | |
| user = host_username() | |
| ensure_docker_image() | |
| # Check if container exists | |
| result = subprocess.run( | |
| ["docker", "container", "inspect", container_name], capture_output=True | |
| ) | |
| if result.returncode != 0: | |
| # Start filtering proxy | |
| proxy_sock = start_proxy(name) | |
| # Mask top-level secrets and hidden directories with tmpfs | |
| secrets_mounts = [] | |
| if (Path(host_dir) / "secrets").is_dir(): | |
| secrets_mounts += ["--tmpfs", "/workspace/secrets"] | |
| for child in Path(host_dir).iterdir(): | |
| if ( | |
| child.is_dir() | |
| and child.name.startswith(".") | |
| and child.name not in (".git", ".github", ".claude") | |
| ): | |
| secrets_mounts += ["--tmpfs", f"/workspace/{child.name}"] | |
| # Pass through auth environment variables | |
| env_args = [] | |
| for key in ("ANTHROPIC_API_KEY",): | |
| val = os.environ.get(key) | |
| if val: | |
| env_args += ["-e", f"{key}={val}"] | |
| subprocess.run( | |
| [ | |
| "docker", | |
| "run", | |
| "-d", | |
| "--name", | |
| container_name, | |
| "--gpus", | |
| "all", | |
| "--runtime=nvidia", | |
| "-v", | |
| f"{host_dir}:/workspace", | |
| "-v", | |
| f"{claude_dir}:/home/{user}/.claude", | |
| "-v", | |
| f"{claude_json}:/home/{user}/.claude.json", | |
| "-v", | |
| f"{proxy_sock}:/var/run/docker.sock", | |
| "-w", | |
| "/workspace", | |
| *env_args, | |
| *secrets_mounts, | |
| "sux-base", | |
| "sleep", | |
| "infinity", | |
| ], | |
| check=True, | |
| ) | |
| print(f"Started container: {container_name}") | |
| else: | |
| ensure_proxy(name) | |
| subprocess.run(["docker", "start", container_name], capture_output=True) | |
| # Attach to or create tmux session running docker exec | |
| if tmux_running(): | |
| result = subprocess.run( | |
| ["tmux", "has-session", "-t", name], capture_output=True | |
| ) | |
| if result.returncode == 0: | |
| run_tmux("attach-session", "-t", name) | |
| return | |
| if yolo: | |
| run_tmux( | |
| "new-session", | |
| "-s", | |
| name, | |
| "docker", | |
| "exec", | |
| "-it", | |
| "-u", | |
| user, | |
| container_name, | |
| "bash", | |
| "-lc", | |
| f'yolo "{yolo}"', | |
| ) | |
| else: | |
| run_tmux( | |
| "new-session", | |
| "-s", | |
| name, | |
| "docker", | |
| "exec", | |
| "-it", | |
| "-u", | |
| user, | |
| container_name, | |
| "bash", | |
| "-l", | |
| ) | |
| def ensure_worktree(name): | |
| """Create git worktree if needed and chdir into it.""" | |
| worktree_path = Path.cwd() / "worktrees" / name | |
| if worktree_path.exists(): | |
| print(f"Worktree already exists: {worktree_path}") | |
| else: | |
| worktree_path.parent.mkdir(parents=True, exist_ok=True) | |
| result = subprocess.run( | |
| ["git", "rev-parse", "--verify", name], capture_output=True | |
| ) | |
| if result.returncode == 0: | |
| subprocess.run( | |
| ["git", "worktree", "add", str(worktree_path), name], check=True | |
| ) | |
| else: | |
| subprocess.run( | |
| ["git", "worktree", "add", "-b", name, str(worktree_path)], check=True | |
| ) | |
| print(f"Created worktree: {worktree_path}") | |
| os.chdir(worktree_path) | |
| # Initialize submodules in the worktree | |
| if (Path(".gitmodules")).exists(): | |
| subprocess.run( | |
| ["git", "submodule", "update", "--init", "--recursive"], check=True | |
| ) | |
| def attach_or_create(name=None): | |
| """Attach to session, creating if needed.""" | |
| if name: | |
| # Check for existing tmux session first | |
| if tmux_running(): | |
| result = subprocess.run( | |
| ["tmux", "has-session", "-t", name], capture_output=True | |
| ) | |
| if result.returncode == 0: | |
| run_tmux("attach-session", "-t", name) | |
| return | |
| # Check for existing docker container | |
| container_name = f"sux-{name}" | |
| result = subprocess.run( | |
| ["docker", "container", "inspect", container_name], capture_output=True | |
| ) | |
| if result.returncode == 0: | |
| try: | |
| ensure_proxy(name) | |
| except RuntimeError: | |
| pass # proxy may not be needed if socket is bind-mounted | |
| subprocess.run(["docker", "start", container_name], capture_output=True) | |
| run_tmux( | |
| "new-session", | |
| "-s", | |
| name, | |
| "docker", | |
| "exec", | |
| "-it", | |
| "-u", | |
| host_username(), | |
| container_name, | |
| "bash", | |
| "-l", | |
| ) | |
| return | |
| # Fall through to new plain tmux session | |
| run_tmux("new-session", "-s", name) | |
| else: | |
| # Attach to most recent, or create new | |
| if tmux_running(): | |
| run_tmux("attach-session") | |
| else: | |
| run_tmux("new-session") | |
| def kill_session(name): | |
| """Kill a tmux session and/or Docker container.""" | |
| killed = False | |
| if tmux_running(): | |
| result = subprocess.run( | |
| ["tmux", "kill-session", "-t", name], capture_output=True | |
| ) | |
| if result.returncode == 0: | |
| print(f"Killed tmux session: {name}") | |
| killed = True | |
| container_name = f"sux-{name}" | |
| result = subprocess.run(["docker", "rm", "-f", container_name], capture_output=True) | |
| if result.returncode == 0: | |
| print(f"Removed container: {container_name}") | |
| killed = True | |
| # Clean up proxy | |
| stop_proxy(name) | |
| if not killed: | |
| print(f"No session or container found: {name}") | |
| def config_test(): | |
| """Run automated tests: rebuild image, start container, verify proxy.""" | |
| test_name = "sux-config-test" | |
| container_name = f"sux-{test_name}" | |
| user = host_username() | |
| passed = 0 | |
| failed = 0 | |
| def check(desc, cmd, expect_success=True): | |
| nonlocal passed, failed | |
| result = subprocess.run( | |
| ["docker", "exec", "-u", user, container_name] + cmd, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| ok = (result.returncode == 0) == expect_success | |
| status = "PASS" if ok else "FAIL" | |
| if ok: | |
| passed += 1 | |
| else: | |
| failed += 1 | |
| print(f" [{status}] {desc}") | |
| if not ok: | |
| if result.stdout.strip(): | |
| print(f" stdout: {result.stdout.strip()[:200]}") | |
| if result.stderr.strip(): | |
| print(f" stderr: {result.stderr.strip()[:200]}") | |
| return ok | |
| try: | |
| # Rebuild image | |
| print("Rebuilding sux-base image...") | |
| subprocess.run(["docker", "rmi", "-f", "sux-base"], capture_output=True) | |
| build_docker_image() | |
| # Start proxy and container | |
| print("Starting test container...") | |
| proxy_sock = start_proxy(test_name) | |
| host_dir = str(Path.cwd().resolve()) | |
| subprocess.run( | |
| [ | |
| "docker", "run", "-d", "--name", container_name, | |
| "--gpus", "all", "--runtime=nvidia", | |
| "-v", f"{host_dir}:/workspace", | |
| "-v", f"{proxy_sock}:/var/run/docker.sock", | |
| "-w", "/workspace", | |
| "sux-base", "sleep", "infinity", | |
| ], | |
| check=True, | |
| capture_output=True, | |
| ) | |
| # Wait for container to be ready | |
| time.sleep(1) | |
| print("\nFunctionality tests:") | |
| check("docker ps works", ["docker", "ps"]) | |
| check("vulkaninfo has nvidia", | |
| ["bash", "-c", "vulkaninfo 2>/dev/null | grep -qi nvidia"]) | |
| check("docker run hello-world", | |
| ["docker", "run", "--rm", "hello-world"]) | |
| print("\nSecurity tests (dangerous options stripped by proxy):") | |
| # --privileged stripped: container runs but SYS_ADMIN is not granted | |
| check("--privileged stripped (not actually privileged)", | |
| ["docker", "run", "--rm", "--privileged", "alpine", | |
| "sh", "-c", "! cat /proc/sysrq-trigger >/dev/null 2>&1"]) | |
| # bind mount /etc stripped: /mnt is empty | |
| check("bind /etc stripped (mount absent)", | |
| ["docker", "run", "--rm", "-v", "/etc:/mnt:ro", "alpine", | |
| "sh", "-c", "! test -f /mnt/hostname"]) | |
| # --pid=host stripped: only see own processes | |
| check("--pid=host stripped (PID isolated)", | |
| ["docker", "run", "--rm", "--pid=host", "alpine", | |
| "sh", "-c", "test $(ls -d /proc/[0-9]* | wc -l) -lt 10"]) | |
| # --network=host stripped: bridge network (no docker0 visible) | |
| check("--network=host stripped (bridge used)", | |
| ["docker", "run", "--rm", "--network=host", "alpine", | |
| "sh", "-c", "! ip link show docker0 >/dev/null 2>&1"]) | |
| print(f"\nResults: {passed} passed, {failed} failed") | |
| finally: | |
| print("\nCleaning up...") | |
| subprocess.run(["docker", "rm", "-f", container_name], capture_output=True) | |
| stop_proxy(test_name) | |
| sys.exit(1 if failed > 0 else 0) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="A less annoying tmux wrapper", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=__doc__, | |
| ) | |
| parser.add_argument("-l", "--list", action="store_true", help="List sessions") | |
| parser.add_argument( | |
| "--config", action="store_true", help="Write sane tmux config to ~/.tmux.conf" | |
| ) | |
| parser.add_argument( | |
| "-k", | |
| "--kill", | |
| action="store_true", | |
| help="Kill session (tmux and/or Docker container)", | |
| ) | |
| parser.add_argument( | |
| "-d", "--docker", action="store_true", help="Run session in a Docker container" | |
| ) | |
| parser.add_argument( | |
| "-w", "--worktree", action="store_true", help="Create git worktree for session" | |
| ) | |
| parser.add_argument( | |
| "-y", | |
| "--yolo", | |
| metavar="PROMPT", | |
| help="Run claude --dangerously-skip-permissions with PROMPT (requires -w)", | |
| ) | |
| parser.add_argument( | |
| "--config-test", action="store_true", help=argparse.SUPPRESS | |
| ) | |
| parser.add_argument("name", nargs="?", help="Session name") | |
| args = parser.parse_args() | |
| if args.config_test: | |
| config_test() | |
| return | |
| if args.config: | |
| apply_config() | |
| elif args.list: | |
| list_sessions() | |
| elif args.kill: | |
| if not args.name: | |
| parser.error("-k/--kill requires a session name") | |
| kill_session(args.name) | |
| else: | |
| name = args.name | |
| if args.yolo and not args.worktree: | |
| parser.error("-y/--yolo requires -w/--worktree") | |
| if args.yolo: | |
| args.docker = True | |
| if not name: | |
| if args.docker or args.worktree: | |
| parser.error("-d/-w/-y require a session name") | |
| attach_or_create() | |
| elif args.worktree: | |
| ensure_worktree(name) | |
| if args.docker: | |
| docker_session(name, yolo=args.yolo) | |
| else: | |
| attach_or_create(name) | |
| elif args.docker: | |
| docker_session(name) | |
| else: | |
| attach_or_create(name) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment