Skip to content

Instantly share code, notes, and snippets.

@bitnom
Created March 13, 2026 02:16
Show Gist options
  • Select an option

  • Save bitnom/b6753dd071bae8dc2bbfe9d076bb5faf to your computer and use it in GitHub Desktop.

Select an option

Save bitnom/b6753dd071bae8dc2bbfe9d076bb5faf to your computer and use it in GitHub Desktop.
from __future__ import annotations
import base64
import json
import os
import shutil
import subprocess
import tempfile
import time
import uuid
from pathlib import Path
RESULTS_BEGIN = "__APOTHIC_CAPABILITY_RESULTS_BEGIN__"
RESULTS_END = "__APOTHIC_CAPABILITY_RESULTS_END__"
def command_exists(name: str) -> str | None:
return shutil.which(name)
def run_check(name: str, command: list[str], *, timeout: int = 20) -> dict[str, object]:
started = time.time()
try:
proc = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
timeout=timeout,
)
return {
"name": name,
"command": command,
"returncode": proc.returncode,
"stdout": proc.stdout[-4000:],
"stderr": proc.stderr[-4000:],
"duration_s": round(time.time() - started, 3),
}
except subprocess.TimeoutExpired as exc:
return {
"name": name,
"command": command,
"returncode": None,
"stdout": (exc.stdout or "")[-4000:],
"stderr": ((exc.stderr or "") + "\nTIMEOUT")[-4000:],
"duration_s": round(time.time() - started, 3),
}
except Exception as exc: # pragma: no cover - probe-side defensive path
return {
"name": name,
"command": command,
"returncode": None,
"stdout": "",
"stderr": repr(exc),
"duration_s": round(time.time() - started, 3),
}
def path_info(path: str) -> dict[str, object]:
target = Path(path)
info: dict[str, object] = {
"path": path,
"exists": target.exists(),
"is_dir": target.is_dir(),
"is_file": target.is_file(),
}
if target.exists():
stat = target.stat()
info["mode"] = oct(stat.st_mode & 0o7777)
return info
def cleanup_command(command: list[str]) -> None:
subprocess.run(command, check=False, capture_output=True, text=True, timeout=10)
def _copy_binary_with_libs(binary_path: str, rootfs: Path) -> None:
target = rootfs / binary_path.lstrip("/")
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(binary_path, target)
ldd = run_check("ldd", ["ldd", binary_path])
if ldd.get("returncode") != 0:
return
for line in str(ldd.get("stdout") or "").splitlines():
candidate = None
if "=>" in line:
parts = line.split("=>", 1)[1].strip().split()
if parts:
candidate = parts[0]
else:
parts = line.strip().split()
if parts and parts[0].startswith("/"):
candidate = parts[0]
if not candidate or not os.path.isabs(candidate) or not os.path.exists(candidate):
continue
lib_target = rootfs / candidate.lstrip("/")
lib_target.parent.mkdir(parents=True, exist_ok=True)
if not lib_target.exists():
shutil.copy2(candidate, lib_target)
def _write_minimal_oci_config(bundle_dir: Path, runtime_name: str) -> None:
_write_oci_config(bundle_dir, runtime_name, include_resources=True)
def _write_oci_config(bundle_dir: Path, runtime_name: str, *, include_resources: bool) -> None:
linux_config = {
"namespaces": [
{"type": "pid"},
{"type": "network"},
{"type": "ipc"},
{"type": "uts"},
{"type": "mount"},
],
}
if include_resources:
linux_config["resources"] = {
"devices": [{"allow": False, "access": "rwm"}],
}
config = {
"ociVersion": "1.0.2",
"process": {
"terminal": False,
"args": ["/bin/sh", "-c", f"echo {runtime_name}-runtime-ok"],
"env": ["PATH=/bin:/usr/bin:/usr/local/bin"],
"cwd": "/",
},
"root": {
"path": "rootfs",
"readonly": False,
},
"hostname": "aprobe",
"mounts": [
{"destination": "/proc", "type": "proc", "source": "proc"},
{"destination": "/dev", "type": "tmpfs", "source": "tmpfs", "options": ["nosuid", "strictatime", "mode=755", "size=65536k"]},
{"destination": "/dev/pts", "type": "devpts", "source": "devpts", "options": ["nosuid", "noexec", "newinstance", "ptmxmode=0666", "mode=0620"]},
{"destination": "/dev/shm", "type": "tmpfs", "source": "shm", "options": ["nosuid", "noexec", "nodev", "mode=1777", "size=65536k"]},
{"destination": "/dev/mqueue", "type": "mqueue", "source": "mqueue", "options": ["nosuid", "noexec", "nodev"]},
{"destination": "/sys", "type": "sysfs", "source": "sysfs", "options": ["nosuid", "noexec", "nodev", "ro"]},
],
"linux": linux_config,
}
(bundle_dir / "config.json").write_text(json.dumps(config), encoding="utf-8")
def probe_oci_runtime(binary: str, runtime_name: str, *, include_resources: bool = True) -> dict[str, object]:
with tempfile.TemporaryDirectory(prefix=f"aprobe-{runtime_name}-") as temp_dir:
bundle_dir = Path(temp_dir) / "bundle"
rootfs_dir = bundle_dir / "rootfs"
bundle_dir.mkdir(parents=True)
rootfs_dir.mkdir()
shell_path = shutil.which("sh") or "/bin/sh"
_copy_binary_with_libs(shell_path, rootfs_dir)
_write_oci_config(bundle_dir, runtime_name, include_resources=include_resources)
container_id = f"aprobe-{runtime_name}-{uuid.uuid4().hex[:8]}"
command = [binary]
delete_command = [binary]
if runtime_name == "runsc":
runsc_root = str(Path(temp_dir) / "runsc-root")
command.extend(["--root", runsc_root])
delete_command.extend(["--root", runsc_root])
command.extend(["run", "--bundle", str(bundle_dir), container_id])
suffix = "minimal" if include_resources else "no_cgroups"
result = run_check(f"{runtime_name}_run_{suffix}", command, timeout=30)
delete_command.extend(["delete", "-f", container_id])
cleanup_command(delete_command)
return result
def probe_buildah_vfs(*, isolation: str | None = None, extra_args: list[str] | None = None) -> dict[str, object]:
with tempfile.TemporaryDirectory(prefix="aprobe-buildah-") as temp_dir:
root = str(Path(temp_dir) / "root")
runroot = str(Path(temp_dir) / "runroot")
command = [
"buildah",
"--root",
root,
"--runroot",
runroot,
"--storage-driver",
"vfs",
]
if extra_args:
command.extend(extra_args)
if isolation:
command.extend(["--isolation", isolation])
command.extend(
[
"from",
"scratch",
]
)
suffix = f"_{isolation}" if isolation else ""
result = run_check(f"buildah_from_scratch_vfs{suffix}", command, timeout=30)
if result.get("returncode") == 0:
container_id = str(result.get("stdout") or "").strip().splitlines()[-1]
if container_id:
cleanup_command(
[
"buildah",
"--root",
root,
"--runroot",
runroot,
"--storage-driver",
"vfs",
*(["--isolation", isolation] if isolation else []),
"rm",
container_id,
]
)
return result
def probe_buildah_bud(
*,
isolation: str | None = None,
userns: str | None = None,
from_image: str = "docker.io/library/busybox:latest",
) -> dict[str, object]:
with tempfile.TemporaryDirectory(prefix="aprobe-buildah-bud-") as temp_dir:
base = Path(temp_dir)
root = str(base / "root")
runroot = str(base / "runroot")
context_dir = base / "context"
context_dir.mkdir()
(context_dir / "Dockerfile").write_text(
f"FROM {from_image}\nRUN echo buildah-bud-ok >/probe.txt\nCMD [\"/bin/sh\", \"-c\", \"cat /probe.txt\"]\n",
encoding="utf-8",
)
command = [
"buildah",
"--root",
root,
"--runroot",
runroot,
"--storage-driver",
"vfs",
]
if isolation:
command.extend(["--isolation", isolation])
if userns:
command.extend(["--userns", userns])
command.extend(
[
"bud",
"--no-cache",
"-t",
"aprobe-build:latest",
str(context_dir),
]
)
suffix_parts = ["buildah_bud_vfs"]
if isolation:
suffix_parts.append(isolation)
if userns:
suffix_parts.append(f"userns_{userns}")
return run_check("_".join(suffix_parts), command, timeout=120)
def probe_chroot_minimal() -> dict[str, object]:
with tempfile.TemporaryDirectory(prefix="aprobe-chroot-") as temp_dir:
rootfs_dir = Path(temp_dir) / "rootfs"
rootfs_dir.mkdir()
shell_path = shutil.which("sh") or "/bin/sh"
_copy_binary_with_libs(shell_path, rootfs_dir)
shell_inside = "/" + Path(shell_path).relative_to("/").as_posix()
return run_check(
"chroot_minimal",
["chroot", str(rootfs_dir), shell_inside, "-c", "echo chroot-ok"],
timeout=30,
)
def probe_namespaces() -> dict[str, dict[str, object]]:
results: dict[str, dict[str, object]] = {}
results["ip_netns_add_del"] = run_check("ip_netns_add_del", ["ip", "netns", "add", "aprobe-ns"])
if results["ip_netns_add_del"].get("returncode") == 0:
cleanup_command(["ip", "netns", "del", "aprobe-ns"])
results["ip_link_veth_add_del"] = run_check(
"ip_link_veth_add_del",
["ip", "link", "add", "aprobe-veth0", "type", "veth", "peer", "name", "aprobe-veth1"],
)
if results["ip_link_veth_add_del"].get("returncode") == 0:
cleanup_command(["ip", "link", "del", "aprobe-veth0"])
results["ip_link_bridge_add_del"] = run_check(
"ip_link_bridge_add_del",
["ip", "link", "add", "name", "aprobe-br0", "type", "bridge"],
)
if results["ip_link_bridge_add_del"].get("returncode") == 0:
cleanup_command(["ip", "link", "del", "aprobe-br0"])
return results
def probe_overlay_mount() -> dict[str, object]:
with tempfile.TemporaryDirectory(prefix="aprobe-overlay-") as temp_dir:
base = Path(temp_dir)
lower = base / "lower"
upper = base / "upper"
work = base / "work"
merged = base / "merged"
lower.mkdir()
upper.mkdir()
work.mkdir()
merged.mkdir()
(lower / "hello.txt").write_text("hello\n", encoding="utf-8")
result = run_check(
"overlay_mount",
[
"mount",
"-t",
"overlay",
"overlay",
"-o",
f"lowerdir={lower},upperdir={upper},workdir={work}",
str(merged),
],
)
if result.get("returncode") == 0:
cleanup_command(["umount", str(merged)])
return result
def probe_cgroup_write() -> dict[str, object]:
cgroup_root = Path("/sys/fs/cgroup")
target = cgroup_root / "aprobe-test"
result = run_check("cgroup_mkdir", ["mkdir", str(target)])
if result.get("returncode") == 0:
cleanup_command(["rmdir", str(target)])
return result
def probe_fuse_open() -> dict[str, object]:
script = (
"import os,sys; "
"fd=os.open('/dev/fuse', os.O_RDWR); "
"os.close(fd); "
"print('opened /dev/fuse')"
)
return run_check("fuse_device_open", ["python3", "-c", script])
def probe_tun_create() -> dict[str, object]:
result = run_check("ip_tuntap_add_del", ["ip", "tuntap", "add", "dev", "aprobe-tun0", "mode", "tun"])
if result.get("returncode") == 0:
cleanup_command(["ip", "link", "del", "aprobe-tun0"])
return result
def probe_wireguard_link() -> dict[str, object]:
result = run_check("wireguard_link_add_del", ["ip", "link", "add", "aprobe-wg0", "type", "wireguard"])
if result.get("returncode") == 0:
cleanup_command(["ip", "link", "del", "aprobe-wg0"])
return result
def collect_results() -> dict[str, object]:
tool_checks = {}
for name in [
"ip",
"iptables",
"ip6tables",
"mount",
"umount",
"bwrap",
"fusermount3",
"runc",
"runsc",
"buildah",
"dockerd",
"nvidia-smi",
"nvidia-ctk",
"rclone",
"skopeo",
"wg",
"wg-quick",
]:
tool_checks[name] = command_exists(name)
results: dict[str, object] = {
"meta": {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"uid": os.getuid(),
"gid": os.getgid(),
"cwd": os.getcwd(),
"hostname": os.uname().nodename,
"python": run_check("python_version", ["python3", "--version"]),
"kernel": run_check("uname", ["uname", "-a"]),
"id": run_check("id", ["id"]),
"mounts": run_check("mount", ["mount"]),
"tool_paths": tool_checks,
},
"paths": {
"/dev/fuse": path_info("/dev/fuse"),
"/dev/net/tun": path_info("/dev/net/tun"),
"/dev/shm": path_info("/dev/shm"),
"/sys/fs/cgroup": path_info("/sys/fs/cgroup"),
"/etc/cdi/nvidia.yaml": path_info("/etc/cdi/nvidia.yaml"),
"/dev/nvidia0": path_info("/dev/nvidia0"),
"/dev/nvidiactl": path_info("/dev/nvidiactl"),
"/usr/bin/bwrap": path_info("/usr/bin/bwrap"),
"/usr/bin/unshare": path_info("/usr/bin/unshare"),
},
"checks": {},
}
checks: dict[str, object] = results["checks"] # type: ignore[assignment]
checks["iptables_nat_prerouting"] = run_check(
"iptables_nat_prerouting",
["iptables", "-t", "nat", "-S", "PREROUTING", "--wait"],
)
checks["iptables_filter_forward"] = run_check(
"iptables_filter_forward",
["iptables", "-S", "FORWARD", "--wait"],
)
checks["ip6tables_nat_postrouting"] = run_check(
"ip6tables_nat_postrouting",
["ip6tables", "-t", "nat", "-S", "POSTROUTING", "--wait"],
)
checks.update(probe_namespaces())
checks["overlay_mount"] = probe_overlay_mount()
checks["unshare_user_map_root"] = run_check(
"unshare_user_map_root",
["unshare", "--user", "--map-root-user", "sh", "-c", "id -u && cat /proc/self/uid_map"],
)
checks["unshare_user_mount_tmpfs"] = run_check(
"unshare_user_mount_tmpfs",
[
"unshare",
"--user",
"--map-root-user",
"--mount",
"sh",
"-c",
"set -eu; d=$(mktemp -d); mount -t tmpfs tmpfs \"$d\"; echo mounted; umount \"$d\"; rmdir \"$d\"",
],
)
checks["unshare_mount"] = run_check(
"unshare_mount",
["unshare", "--mount", "sh", "-c", "mount | head -n 5"],
)
checks["unshare_pid_fork"] = run_check(
"unshare_pid_fork",
["unshare", "--pid", "--fork", "sh", "-c", "echo $$; ps -o pid,ppid,comm"],
)
checks["unshare_net"] = run_check(
"unshare_net",
["unshare", "--net", "sh", "-c", "ip link show lo"],
)
checks["bwrap_basic"] = run_check(
"bwrap_basic",
[
"bwrap",
"--ro-bind",
"/",
"/",
"--proc",
"/proc",
"--dev",
"/dev",
"--tmpfs",
"/tmp",
"sh",
"-c",
"echo bwrap-basic-ok && test -d /tmp",
],
)
checks["bwrap_unshare_pid"] = run_check(
"bwrap_unshare_pid",
[
"bwrap",
"--ro-bind",
"/",
"/",
"--proc",
"/proc",
"--dev",
"/dev",
"--tmpfs",
"/tmp",
"--unshare-pid",
"--new-session",
"sh",
"-c",
"echo $$ && ps -o pid,ppid,comm",
],
)
checks["bwrap_unshare_net"] = run_check(
"bwrap_unshare_net",
[
"bwrap",
"--ro-bind",
"/",
"/",
"--proc",
"/proc",
"--dev",
"/dev",
"--tmpfs",
"/tmp",
"--unshare-net",
"sh",
"-c",
"ip link show lo",
],
)
checks["cgroup_mkdir"] = probe_cgroup_write()
checks["cgroup_controllers"] = run_check("cgroup_controllers", ["cat", "/sys/fs/cgroup/cgroup.controllers"])
checks["fuse_version"] = run_check("fuse_version", ["fusermount3", "--version"])
checks["fuse_device_open"] = probe_fuse_open()
checks["tun_create"] = probe_tun_create()
checks["wireguard_link_add_del"] = probe_wireguard_link()
checks["wg_show"] = run_check("wg_show", ["wg", "show"])
checks["nvidia_smi"] = run_check("nvidia_smi", ["nvidia-smi", "-L"])
checks["nvidia_ctk"] = run_check("nvidia_ctk", ["nvidia-ctk", "--version"])
checks["runc_version"] = run_check("runc_version", ["runc", "--version"])
checks["runsc_version"] = run_check("runsc_version", ["runsc", "--version"])
checks["runc_run_minimal"] = probe_oci_runtime("runc", "runc")
checks["runc_run_no_cgroups"] = probe_oci_runtime("runc", "runc", include_resources=False)
checks["runsc_run_minimal"] = probe_oci_runtime("runsc", "runsc")
checks["runsc_run_no_cgroups"] = probe_oci_runtime("runsc", "runsc", include_resources=False)
checks["buildah_version"] = run_check("buildah_version", ["buildah", "version"])
checks["buildah_from_scratch_vfs"] = probe_buildah_vfs()
checks["buildah_from_scratch_vfs_chroot"] = probe_buildah_vfs(isolation="chroot")
checks["buildah_from_scratch_vfs_chroot_userns_host"] = probe_buildah_vfs(
isolation="chroot",
extra_args=["--userns", "host"],
)
checks["buildah_bud_vfs_chroot"] = probe_buildah_bud(isolation="chroot")
checks["buildah_bud_vfs_chroot_userns_host"] = probe_buildah_bud(
isolation="chroot",
userns="host",
)
checks["chroot_minimal"] = probe_chroot_minimal()
checks["dockerd_version"] = run_check("dockerd_version", ["dockerd", "--version"])
checks["df_shm"] = run_check("df_shm", ["df", "-h", "/dev/shm"])
checks["skopeo_version"] = run_check("skopeo_version", ["skopeo", "--version"])
checks["skopeo_inspect_busybox"] = run_check("skopeo_inspect_busybox", ["skopeo", "inspect", "docker://docker.io/library/busybox:latest"], timeout=60)
return results
results = collect_results()
print(RESULTS_BEGIN, flush=True)
encoded = base64.b64encode(json.dumps(results, sort_keys=True).encode("utf-8")).decode("ascii")
for index in range(0, len(encoded), 120):
print(encoded[index : index + 120], flush=True)
print(RESULTS_END, flush=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment