Created
March 13, 2026 02:16
-
-
Save bitnom/b6753dd071bae8dc2bbfe9d076bb5faf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| import base64 | |
| import json | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| RESULTS_BEGIN = "__APOTHIC_CAPABILITY_RESULTS_BEGIN__" | |
| RESULTS_END = "__APOTHIC_CAPABILITY_RESULTS_END__" | |
| def command_exists(name: str) -> str | None: | |
| return shutil.which(name) | |
| def run_check(name: str, command: list[str], *, timeout: int = 20) -> dict[str, object]: | |
| started = time.time() | |
| try: | |
| proc = subprocess.run( | |
| command, | |
| check=False, | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout, | |
| ) | |
| return { | |
| "name": name, | |
| "command": command, | |
| "returncode": proc.returncode, | |
| "stdout": proc.stdout[-4000:], | |
| "stderr": proc.stderr[-4000:], | |
| "duration_s": round(time.time() - started, 3), | |
| } | |
| except subprocess.TimeoutExpired as exc: | |
| return { | |
| "name": name, | |
| "command": command, | |
| "returncode": None, | |
| "stdout": (exc.stdout or "")[-4000:], | |
| "stderr": ((exc.stderr or "") + "\nTIMEOUT")[-4000:], | |
| "duration_s": round(time.time() - started, 3), | |
| } | |
| except Exception as exc: # pragma: no cover - probe-side defensive path | |
| return { | |
| "name": name, | |
| "command": command, | |
| "returncode": None, | |
| "stdout": "", | |
| "stderr": repr(exc), | |
| "duration_s": round(time.time() - started, 3), | |
| } | |
| def path_info(path: str) -> dict[str, object]: | |
| target = Path(path) | |
| info: dict[str, object] = { | |
| "path": path, | |
| "exists": target.exists(), | |
| "is_dir": target.is_dir(), | |
| "is_file": target.is_file(), | |
| } | |
| if target.exists(): | |
| stat = target.stat() | |
| info["mode"] = oct(stat.st_mode & 0o7777) | |
| return info | |
| def cleanup_command(command: list[str]) -> None: | |
| subprocess.run(command, check=False, capture_output=True, text=True, timeout=10) | |
| def _copy_binary_with_libs(binary_path: str, rootfs: Path) -> None: | |
| target = rootfs / binary_path.lstrip("/") | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy2(binary_path, target) | |
| ldd = run_check("ldd", ["ldd", binary_path]) | |
| if ldd.get("returncode") != 0: | |
| return | |
| for line in str(ldd.get("stdout") or "").splitlines(): | |
| candidate = None | |
| if "=>" in line: | |
| parts = line.split("=>", 1)[1].strip().split() | |
| if parts: | |
| candidate = parts[0] | |
| else: | |
| parts = line.strip().split() | |
| if parts and parts[0].startswith("/"): | |
| candidate = parts[0] | |
| if not candidate or not os.path.isabs(candidate) or not os.path.exists(candidate): | |
| continue | |
| lib_target = rootfs / candidate.lstrip("/") | |
| lib_target.parent.mkdir(parents=True, exist_ok=True) | |
| if not lib_target.exists(): | |
| shutil.copy2(candidate, lib_target) | |
| def _write_minimal_oci_config(bundle_dir: Path, runtime_name: str) -> None: | |
| _write_oci_config(bundle_dir, runtime_name, include_resources=True) | |
| def _write_oci_config(bundle_dir: Path, runtime_name: str, *, include_resources: bool) -> None: | |
| linux_config = { | |
| "namespaces": [ | |
| {"type": "pid"}, | |
| {"type": "network"}, | |
| {"type": "ipc"}, | |
| {"type": "uts"}, | |
| {"type": "mount"}, | |
| ], | |
| } | |
| if include_resources: | |
| linux_config["resources"] = { | |
| "devices": [{"allow": False, "access": "rwm"}], | |
| } | |
| config = { | |
| "ociVersion": "1.0.2", | |
| "process": { | |
| "terminal": False, | |
| "args": ["/bin/sh", "-c", f"echo {runtime_name}-runtime-ok"], | |
| "env": ["PATH=/bin:/usr/bin:/usr/local/bin"], | |
| "cwd": "/", | |
| }, | |
| "root": { | |
| "path": "rootfs", | |
| "readonly": False, | |
| }, | |
| "hostname": "aprobe", | |
| "mounts": [ | |
| {"destination": "/proc", "type": "proc", "source": "proc"}, | |
| {"destination": "/dev", "type": "tmpfs", "source": "tmpfs", "options": ["nosuid", "strictatime", "mode=755", "size=65536k"]}, | |
| {"destination": "/dev/pts", "type": "devpts", "source": "devpts", "options": ["nosuid", "noexec", "newinstance", "ptmxmode=0666", "mode=0620"]}, | |
| {"destination": "/dev/shm", "type": "tmpfs", "source": "shm", "options": ["nosuid", "noexec", "nodev", "mode=1777", "size=65536k"]}, | |
| {"destination": "/dev/mqueue", "type": "mqueue", "source": "mqueue", "options": ["nosuid", "noexec", "nodev"]}, | |
| {"destination": "/sys", "type": "sysfs", "source": "sysfs", "options": ["nosuid", "noexec", "nodev", "ro"]}, | |
| ], | |
| "linux": linux_config, | |
| } | |
| (bundle_dir / "config.json").write_text(json.dumps(config), encoding="utf-8") | |
| def probe_oci_runtime(binary: str, runtime_name: str, *, include_resources: bool = True) -> dict[str, object]: | |
| with tempfile.TemporaryDirectory(prefix=f"aprobe-{runtime_name}-") as temp_dir: | |
| bundle_dir = Path(temp_dir) / "bundle" | |
| rootfs_dir = bundle_dir / "rootfs" | |
| bundle_dir.mkdir(parents=True) | |
| rootfs_dir.mkdir() | |
| shell_path = shutil.which("sh") or "/bin/sh" | |
| _copy_binary_with_libs(shell_path, rootfs_dir) | |
| _write_oci_config(bundle_dir, runtime_name, include_resources=include_resources) | |
| container_id = f"aprobe-{runtime_name}-{uuid.uuid4().hex[:8]}" | |
| command = [binary] | |
| delete_command = [binary] | |
| if runtime_name == "runsc": | |
| runsc_root = str(Path(temp_dir) / "runsc-root") | |
| command.extend(["--root", runsc_root]) | |
| delete_command.extend(["--root", runsc_root]) | |
| command.extend(["run", "--bundle", str(bundle_dir), container_id]) | |
| suffix = "minimal" if include_resources else "no_cgroups" | |
| result = run_check(f"{runtime_name}_run_{suffix}", command, timeout=30) | |
| delete_command.extend(["delete", "-f", container_id]) | |
| cleanup_command(delete_command) | |
| return result | |
| def probe_buildah_vfs(*, isolation: str | None = None, extra_args: list[str] | None = None) -> dict[str, object]: | |
| with tempfile.TemporaryDirectory(prefix="aprobe-buildah-") as temp_dir: | |
| root = str(Path(temp_dir) / "root") | |
| runroot = str(Path(temp_dir) / "runroot") | |
| command = [ | |
| "buildah", | |
| "--root", | |
| root, | |
| "--runroot", | |
| runroot, | |
| "--storage-driver", | |
| "vfs", | |
| ] | |
| if extra_args: | |
| command.extend(extra_args) | |
| if isolation: | |
| command.extend(["--isolation", isolation]) | |
| command.extend( | |
| [ | |
| "from", | |
| "scratch", | |
| ] | |
| ) | |
| suffix = f"_{isolation}" if isolation else "" | |
| result = run_check(f"buildah_from_scratch_vfs{suffix}", command, timeout=30) | |
| if result.get("returncode") == 0: | |
| container_id = str(result.get("stdout") or "").strip().splitlines()[-1] | |
| if container_id: | |
| cleanup_command( | |
| [ | |
| "buildah", | |
| "--root", | |
| root, | |
| "--runroot", | |
| runroot, | |
| "--storage-driver", | |
| "vfs", | |
| *(["--isolation", isolation] if isolation else []), | |
| "rm", | |
| container_id, | |
| ] | |
| ) | |
| return result | |
| def probe_buildah_bud( | |
| *, | |
| isolation: str | None = None, | |
| userns: str | None = None, | |
| from_image: str = "docker.io/library/busybox:latest", | |
| ) -> dict[str, object]: | |
| with tempfile.TemporaryDirectory(prefix="aprobe-buildah-bud-") as temp_dir: | |
| base = Path(temp_dir) | |
| root = str(base / "root") | |
| runroot = str(base / "runroot") | |
| context_dir = base / "context" | |
| context_dir.mkdir() | |
| (context_dir / "Dockerfile").write_text( | |
| f"FROM {from_image}\nRUN echo buildah-bud-ok >/probe.txt\nCMD [\"/bin/sh\", \"-c\", \"cat /probe.txt\"]\n", | |
| encoding="utf-8", | |
| ) | |
| command = [ | |
| "buildah", | |
| "--root", | |
| root, | |
| "--runroot", | |
| runroot, | |
| "--storage-driver", | |
| "vfs", | |
| ] | |
| if isolation: | |
| command.extend(["--isolation", isolation]) | |
| if userns: | |
| command.extend(["--userns", userns]) | |
| command.extend( | |
| [ | |
| "bud", | |
| "--no-cache", | |
| "-t", | |
| "aprobe-build:latest", | |
| str(context_dir), | |
| ] | |
| ) | |
| suffix_parts = ["buildah_bud_vfs"] | |
| if isolation: | |
| suffix_parts.append(isolation) | |
| if userns: | |
| suffix_parts.append(f"userns_{userns}") | |
| return run_check("_".join(suffix_parts), command, timeout=120) | |
| def probe_chroot_minimal() -> dict[str, object]: | |
| with tempfile.TemporaryDirectory(prefix="aprobe-chroot-") as temp_dir: | |
| rootfs_dir = Path(temp_dir) / "rootfs" | |
| rootfs_dir.mkdir() | |
| shell_path = shutil.which("sh") or "/bin/sh" | |
| _copy_binary_with_libs(shell_path, rootfs_dir) | |
| shell_inside = "/" + Path(shell_path).relative_to("/").as_posix() | |
| return run_check( | |
| "chroot_minimal", | |
| ["chroot", str(rootfs_dir), shell_inside, "-c", "echo chroot-ok"], | |
| timeout=30, | |
| ) | |
| def probe_namespaces() -> dict[str, dict[str, object]]: | |
| results: dict[str, dict[str, object]] = {} | |
| results["ip_netns_add_del"] = run_check("ip_netns_add_del", ["ip", "netns", "add", "aprobe-ns"]) | |
| if results["ip_netns_add_del"].get("returncode") == 0: | |
| cleanup_command(["ip", "netns", "del", "aprobe-ns"]) | |
| results["ip_link_veth_add_del"] = run_check( | |
| "ip_link_veth_add_del", | |
| ["ip", "link", "add", "aprobe-veth0", "type", "veth", "peer", "name", "aprobe-veth1"], | |
| ) | |
| if results["ip_link_veth_add_del"].get("returncode") == 0: | |
| cleanup_command(["ip", "link", "del", "aprobe-veth0"]) | |
| results["ip_link_bridge_add_del"] = run_check( | |
| "ip_link_bridge_add_del", | |
| ["ip", "link", "add", "name", "aprobe-br0", "type", "bridge"], | |
| ) | |
| if results["ip_link_bridge_add_del"].get("returncode") == 0: | |
| cleanup_command(["ip", "link", "del", "aprobe-br0"]) | |
| return results | |
| def probe_overlay_mount() -> dict[str, object]: | |
| with tempfile.TemporaryDirectory(prefix="aprobe-overlay-") as temp_dir: | |
| base = Path(temp_dir) | |
| lower = base / "lower" | |
| upper = base / "upper" | |
| work = base / "work" | |
| merged = base / "merged" | |
| lower.mkdir() | |
| upper.mkdir() | |
| work.mkdir() | |
| merged.mkdir() | |
| (lower / "hello.txt").write_text("hello\n", encoding="utf-8") | |
| result = run_check( | |
| "overlay_mount", | |
| [ | |
| "mount", | |
| "-t", | |
| "overlay", | |
| "overlay", | |
| "-o", | |
| f"lowerdir={lower},upperdir={upper},workdir={work}", | |
| str(merged), | |
| ], | |
| ) | |
| if result.get("returncode") == 0: | |
| cleanup_command(["umount", str(merged)]) | |
| return result | |
| def probe_cgroup_write() -> dict[str, object]: | |
| cgroup_root = Path("/sys/fs/cgroup") | |
| target = cgroup_root / "aprobe-test" | |
| result = run_check("cgroup_mkdir", ["mkdir", str(target)]) | |
| if result.get("returncode") == 0: | |
| cleanup_command(["rmdir", str(target)]) | |
| return result | |
| def probe_fuse_open() -> dict[str, object]: | |
| script = ( | |
| "import os,sys; " | |
| "fd=os.open('/dev/fuse', os.O_RDWR); " | |
| "os.close(fd); " | |
| "print('opened /dev/fuse')" | |
| ) | |
| return run_check("fuse_device_open", ["python3", "-c", script]) | |
| def probe_tun_create() -> dict[str, object]: | |
| result = run_check("ip_tuntap_add_del", ["ip", "tuntap", "add", "dev", "aprobe-tun0", "mode", "tun"]) | |
| if result.get("returncode") == 0: | |
| cleanup_command(["ip", "link", "del", "aprobe-tun0"]) | |
| return result | |
| def probe_wireguard_link() -> dict[str, object]: | |
| result = run_check("wireguard_link_add_del", ["ip", "link", "add", "aprobe-wg0", "type", "wireguard"]) | |
| if result.get("returncode") == 0: | |
| cleanup_command(["ip", "link", "del", "aprobe-wg0"]) | |
| return result | |
| def collect_results() -> dict[str, object]: | |
| tool_checks = {} | |
| for name in [ | |
| "ip", | |
| "iptables", | |
| "ip6tables", | |
| "mount", | |
| "umount", | |
| "bwrap", | |
| "fusermount3", | |
| "runc", | |
| "runsc", | |
| "buildah", | |
| "dockerd", | |
| "nvidia-smi", | |
| "nvidia-ctk", | |
| "rclone", | |
| "skopeo", | |
| "wg", | |
| "wg-quick", | |
| ]: | |
| tool_checks[name] = command_exists(name) | |
| results: dict[str, object] = { | |
| "meta": { | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), | |
| "uid": os.getuid(), | |
| "gid": os.getgid(), | |
| "cwd": os.getcwd(), | |
| "hostname": os.uname().nodename, | |
| "python": run_check("python_version", ["python3", "--version"]), | |
| "kernel": run_check("uname", ["uname", "-a"]), | |
| "id": run_check("id", ["id"]), | |
| "mounts": run_check("mount", ["mount"]), | |
| "tool_paths": tool_checks, | |
| }, | |
| "paths": { | |
| "/dev/fuse": path_info("/dev/fuse"), | |
| "/dev/net/tun": path_info("/dev/net/tun"), | |
| "/dev/shm": path_info("/dev/shm"), | |
| "/sys/fs/cgroup": path_info("/sys/fs/cgroup"), | |
| "/etc/cdi/nvidia.yaml": path_info("/etc/cdi/nvidia.yaml"), | |
| "/dev/nvidia0": path_info("/dev/nvidia0"), | |
| "/dev/nvidiactl": path_info("/dev/nvidiactl"), | |
| "/usr/bin/bwrap": path_info("/usr/bin/bwrap"), | |
| "/usr/bin/unshare": path_info("/usr/bin/unshare"), | |
| }, | |
| "checks": {}, | |
| } | |
| checks: dict[str, object] = results["checks"] # type: ignore[assignment] | |
| checks["iptables_nat_prerouting"] = run_check( | |
| "iptables_nat_prerouting", | |
| ["iptables", "-t", "nat", "-S", "PREROUTING", "--wait"], | |
| ) | |
| checks["iptables_filter_forward"] = run_check( | |
| "iptables_filter_forward", | |
| ["iptables", "-S", "FORWARD", "--wait"], | |
| ) | |
| checks["ip6tables_nat_postrouting"] = run_check( | |
| "ip6tables_nat_postrouting", | |
| ["ip6tables", "-t", "nat", "-S", "POSTROUTING", "--wait"], | |
| ) | |
| checks.update(probe_namespaces()) | |
| checks["overlay_mount"] = probe_overlay_mount() | |
| checks["unshare_user_map_root"] = run_check( | |
| "unshare_user_map_root", | |
| ["unshare", "--user", "--map-root-user", "sh", "-c", "id -u && cat /proc/self/uid_map"], | |
| ) | |
| checks["unshare_user_mount_tmpfs"] = run_check( | |
| "unshare_user_mount_tmpfs", | |
| [ | |
| "unshare", | |
| "--user", | |
| "--map-root-user", | |
| "--mount", | |
| "sh", | |
| "-c", | |
| "set -eu; d=$(mktemp -d); mount -t tmpfs tmpfs \"$d\"; echo mounted; umount \"$d\"; rmdir \"$d\"", | |
| ], | |
| ) | |
| checks["unshare_mount"] = run_check( | |
| "unshare_mount", | |
| ["unshare", "--mount", "sh", "-c", "mount | head -n 5"], | |
| ) | |
| checks["unshare_pid_fork"] = run_check( | |
| "unshare_pid_fork", | |
| ["unshare", "--pid", "--fork", "sh", "-c", "echo $$; ps -o pid,ppid,comm"], | |
| ) | |
| checks["unshare_net"] = run_check( | |
| "unshare_net", | |
| ["unshare", "--net", "sh", "-c", "ip link show lo"], | |
| ) | |
| checks["bwrap_basic"] = run_check( | |
| "bwrap_basic", | |
| [ | |
| "bwrap", | |
| "--ro-bind", | |
| "/", | |
| "/", | |
| "--proc", | |
| "/proc", | |
| "--dev", | |
| "/dev", | |
| "--tmpfs", | |
| "/tmp", | |
| "sh", | |
| "-c", | |
| "echo bwrap-basic-ok && test -d /tmp", | |
| ], | |
| ) | |
| checks["bwrap_unshare_pid"] = run_check( | |
| "bwrap_unshare_pid", | |
| [ | |
| "bwrap", | |
| "--ro-bind", | |
| "/", | |
| "/", | |
| "--proc", | |
| "/proc", | |
| "--dev", | |
| "/dev", | |
| "--tmpfs", | |
| "/tmp", | |
| "--unshare-pid", | |
| "--new-session", | |
| "sh", | |
| "-c", | |
| "echo $$ && ps -o pid,ppid,comm", | |
| ], | |
| ) | |
| checks["bwrap_unshare_net"] = run_check( | |
| "bwrap_unshare_net", | |
| [ | |
| "bwrap", | |
| "--ro-bind", | |
| "/", | |
| "/", | |
| "--proc", | |
| "/proc", | |
| "--dev", | |
| "/dev", | |
| "--tmpfs", | |
| "/tmp", | |
| "--unshare-net", | |
| "sh", | |
| "-c", | |
| "ip link show lo", | |
| ], | |
| ) | |
| checks["cgroup_mkdir"] = probe_cgroup_write() | |
| checks["cgroup_controllers"] = run_check("cgroup_controllers", ["cat", "/sys/fs/cgroup/cgroup.controllers"]) | |
| checks["fuse_version"] = run_check("fuse_version", ["fusermount3", "--version"]) | |
| checks["fuse_device_open"] = probe_fuse_open() | |
| checks["tun_create"] = probe_tun_create() | |
| checks["wireguard_link_add_del"] = probe_wireguard_link() | |
| checks["wg_show"] = run_check("wg_show", ["wg", "show"]) | |
| checks["nvidia_smi"] = run_check("nvidia_smi", ["nvidia-smi", "-L"]) | |
| checks["nvidia_ctk"] = run_check("nvidia_ctk", ["nvidia-ctk", "--version"]) | |
| checks["runc_version"] = run_check("runc_version", ["runc", "--version"]) | |
| checks["runsc_version"] = run_check("runsc_version", ["runsc", "--version"]) | |
| checks["runc_run_minimal"] = probe_oci_runtime("runc", "runc") | |
| checks["runc_run_no_cgroups"] = probe_oci_runtime("runc", "runc", include_resources=False) | |
| checks["runsc_run_minimal"] = probe_oci_runtime("runsc", "runsc") | |
| checks["runsc_run_no_cgroups"] = probe_oci_runtime("runsc", "runsc", include_resources=False) | |
| checks["buildah_version"] = run_check("buildah_version", ["buildah", "version"]) | |
| checks["buildah_from_scratch_vfs"] = probe_buildah_vfs() | |
| checks["buildah_from_scratch_vfs_chroot"] = probe_buildah_vfs(isolation="chroot") | |
| checks["buildah_from_scratch_vfs_chroot_userns_host"] = probe_buildah_vfs( | |
| isolation="chroot", | |
| extra_args=["--userns", "host"], | |
| ) | |
| checks["buildah_bud_vfs_chroot"] = probe_buildah_bud(isolation="chroot") | |
| checks["buildah_bud_vfs_chroot_userns_host"] = probe_buildah_bud( | |
| isolation="chroot", | |
| userns="host", | |
| ) | |
| checks["chroot_minimal"] = probe_chroot_minimal() | |
| checks["dockerd_version"] = run_check("dockerd_version", ["dockerd", "--version"]) | |
| checks["df_shm"] = run_check("df_shm", ["df", "-h", "/dev/shm"]) | |
| checks["skopeo_version"] = run_check("skopeo_version", ["skopeo", "--version"]) | |
| checks["skopeo_inspect_busybox"] = run_check("skopeo_inspect_busybox", ["skopeo", "inspect", "docker://docker.io/library/busybox:latest"], timeout=60) | |
| return results | |
| results = collect_results() | |
| print(RESULTS_BEGIN, flush=True) | |
| encoded = base64.b64encode(json.dumps(results, sort_keys=True).encode("utf-8")).decode("ascii") | |
| for index in range(0, len(encoded), 120): | |
| print(encoded[index : index + 120], flush=True) | |
| print(RESULTS_END, flush=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment