Skip to content

Instantly share code, notes, and snippets.

@saillinux
Created March 7, 2026 10:49
Show Gist options
  • Select an option

  • Save saillinux/ffe9e7d72a2de4d2b8fa1c9ca65128a5 to your computer and use it in GitHub Desktop.

Select an option

Save saillinux/ffe9e7d72a2de4d2b8fa1c9ca65128a5 to your computer and use it in GitHub Desktop.
Scan git repos for shell scripts making AWS IMDSv1 calls
#!/usr/bin/env python3
"""
Scan git repositories for shell scripts that make AWS IMDSv1 calls.
Reads a text file of git repository URLs, clones each one, and searches
for shell scripts containing IMDSv1 metadata endpoint calls
(http://169.254.169.254) that do NOT use the IMDSv2 token mechanism.
Outputs results to a CSV file.
"""
import argparse
import csv
import os
import re
import shutil
import subprocess
import sys
import tempfile
# The EC2 metadata endpoint used by both IMDSv1 and IMDSv2
METADATA_ENDPOINT = "169.254.169.254"
# Shell script file extensions
SHELL_EXTENSIONS = {".sh", ".bash", ".ksh", ".zsh", ".csh"}
# Shebang patterns that indicate a shell script
SHEBANG_RE = re.compile(r"^#!\s*/(?:usr/)?(?:bin/)?(?:env\s+)?(?:ba|da|k|z|c)?sh")
# IMDSv2 indicators — if present, the script likely uses v2 (or both)
IMDSV2_INDICATORS = [
"X-aws-ec2-metadata-token-ttl-seconds",
"X-aws-ec2-metadata-token",
"/latest/api/token",
]
def clone_repo(url: str, dest: str) -> bool:
"""Shallow-clone a git repository. Returns True on success."""
try:
subprocess.run(
["git", "clone", "--depth", "1", "--quiet", url, dest],
check=True,
capture_output=True,
text=True,
timeout=120,
)
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as exc:
print(f" [ERROR] Failed to clone {url}: {exc}", file=sys.stderr)
return False
def is_shell_script(filepath: str) -> bool:
"""Check if a file is a shell script by extension or shebang."""
_, ext = os.path.splitext(filepath)
if ext.lower() in SHELL_EXTENSIONS:
return True
try:
with open(filepath, "r", errors="ignore") as f:
first_line = f.readline(256)
return bool(SHEBANG_RE.match(first_line))
except (OSError, UnicodeDecodeError):
return False
def uses_imdsv1(filepath: str) -> bool:
"""
Return True if the file references the metadata endpoint AND
does NOT show signs of using IMDSv2 token-based access.
A script that contains both v1 and v2 patterns is still flagged,
because it contains at least some v1 usage.
"""
try:
with open(filepath, "r", errors="ignore") as f:
content = f.read()
except OSError:
return False
if METADATA_ENDPOINT not in content:
return False
# If the script has IMDSv2 token headers, check whether ALL metadata
# calls go through v2. A simple heuristic: count metadata-endpoint
# references that are NOT part of the token-fetch line. If any remain
# without a neighbouring token header, flag as v1.
has_v2 = any(indicator in content for indicator in IMDSV2_INDICATORS)
if not has_v2:
# No v2 patterns at all — this is pure IMDSv1
return True
# Has some v2 indicators. Walk through each line that hits the
# metadata endpoint and check if it carries a token header.
for line in content.splitlines():
line_stripped = line.strip()
if line_stripped.startswith("#"):
continue
if METADATA_ENDPOINT not in line_stripped:
continue
# Skip the token-fetch line itself (PUT to /latest/api/token)
if "/latest/api/token" in line_stripped:
continue
# If this metadata call lacks a token header, it's v1
if "X-aws-ec2-metadata-token" not in line_stripped:
return True
return False
def scan_repo(repo_dir: str) -> list[str]:
"""Walk the repo and return relative paths of shell scripts using IMDSv1."""
hits = []
for root, _dirs, files in os.walk(repo_dir):
# Skip .git directory
if ".git" in root.split(os.sep):
continue
for fname in files:
fpath = os.path.join(root, fname)
if is_shell_script(fpath) and uses_imdsv1(fpath):
relpath = os.path.relpath(fpath, repo_dir)
hits.append(relpath)
return sorted(hits)
def main():
parser = argparse.ArgumentParser(
description="Scan git repos for shell scripts making AWS IMDSv1 calls."
)
parser.add_argument(
"repo_list",
help="Text file with one git repository URL per line.",
)
parser.add_argument(
"-o",
"--output",
default="imdsv1_results.csv",
help="Output CSV file (default: imdsv1_results.csv).",
)
parser.add_argument(
"-w",
"--workdir",
default=None,
help="Working directory for cloned repos (default: temp directory, auto-cleaned).",
)
args = parser.parse_args()
# Read repo URLs
with open(args.repo_list) as f:
urls = [line.strip() for line in f if line.strip() and not line.startswith("#")]
if not urls:
print("No repository URLs found in the input file.", file=sys.stderr)
sys.exit(1)
print(f"Found {len(urls)} repository URL(s) to scan.")
# Decide on working directory
use_temp = args.workdir is None
workdir = tempfile.mkdtemp(prefix="imdsv1_scan_") if use_temp else os.path.abspath(args.workdir)
os.makedirs(workdir, exist_ok=True)
results: list[dict] = []
try:
for i, url in enumerate(urls, 1):
repo_name = url.rstrip("/").rsplit("/", 1)[-1].removesuffix(".git")
dest = os.path.join(workdir, f"{i}_{repo_name}")
print(f"\n[{i}/{len(urls)}] Cloning {url} ...")
if not clone_repo(url, dest):
continue
print(f" Scanning for IMDSv1 usage ...")
hits = scan_repo(dest)
if hits:
print(f" Found {len(hits)} file(s) with IMDSv1 calls:")
for h in hits:
print(f" - {h}")
results.append({"repository_url": url, "file_path": h})
else:
print(f" No IMDSv1 usage found.")
finally:
if use_temp:
print(f"\nCleaning up temp directory: {workdir}")
shutil.rmtree(workdir, ignore_errors=True)
# Write CSV
with open(args.output, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=["repository_url", "file_path"])
writer.writeheader()
writer.writerows(results)
print(f"\nDone. {len(results)} result(s) written to {args.output}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment