Created
March 7, 2026 10:49
-
-
Save saillinux/ffe9e7d72a2de4d2b8fa1c9ca65128a5 to your computer and use it in GitHub Desktop.
Scan git repos for shell scripts making AWS IMDSv1 calls
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Scan git repositories for shell scripts that make AWS IMDSv1 calls. | |
| Reads a text file of git repository URLs, clones each one, and searches | |
| for shell scripts containing IMDSv1 metadata endpoint calls | |
| (http://169.254.169.254) that do NOT use the IMDSv2 token mechanism. | |
| Outputs results to a CSV file. | |
| """ | |
| import argparse | |
| import csv | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| # The EC2 metadata endpoint used by both IMDSv1 and IMDSv2 | |
| METADATA_ENDPOINT = "169.254.169.254" | |
| # Shell script file extensions | |
| SHELL_EXTENSIONS = {".sh", ".bash", ".ksh", ".zsh", ".csh"} | |
| # Shebang patterns that indicate a shell script | |
| SHEBANG_RE = re.compile(r"^#!\s*/(?:usr/)?(?:bin/)?(?:env\s+)?(?:ba|da|k|z|c)?sh") | |
| # IMDSv2 indicators — if present, the script likely uses v2 (or both) | |
| IMDSV2_INDICATORS = [ | |
| "X-aws-ec2-metadata-token-ttl-seconds", | |
| "X-aws-ec2-metadata-token", | |
| "/latest/api/token", | |
| ] | |
| def clone_repo(url: str, dest: str) -> bool: | |
| """Shallow-clone a git repository. Returns True on success.""" | |
| try: | |
| subprocess.run( | |
| ["git", "clone", "--depth", "1", "--quiet", url, dest], | |
| check=True, | |
| capture_output=True, | |
| text=True, | |
| timeout=120, | |
| ) | |
| return True | |
| except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as exc: | |
| print(f" [ERROR] Failed to clone {url}: {exc}", file=sys.stderr) | |
| return False | |
| def is_shell_script(filepath: str) -> bool: | |
| """Check if a file is a shell script by extension or shebang.""" | |
| _, ext = os.path.splitext(filepath) | |
| if ext.lower() in SHELL_EXTENSIONS: | |
| return True | |
| try: | |
| with open(filepath, "r", errors="ignore") as f: | |
| first_line = f.readline(256) | |
| return bool(SHEBANG_RE.match(first_line)) | |
| except (OSError, UnicodeDecodeError): | |
| return False | |
| def uses_imdsv1(filepath: str) -> bool: | |
| """ | |
| Return True if the file references the metadata endpoint AND | |
| does NOT show signs of using IMDSv2 token-based access. | |
| A script that contains both v1 and v2 patterns is still flagged, | |
| because it contains at least some v1 usage. | |
| """ | |
| try: | |
| with open(filepath, "r", errors="ignore") as f: | |
| content = f.read() | |
| except OSError: | |
| return False | |
| if METADATA_ENDPOINT not in content: | |
| return False | |
| # If the script has IMDSv2 token headers, check whether ALL metadata | |
| # calls go through v2. A simple heuristic: count metadata-endpoint | |
| # references that are NOT part of the token-fetch line. If any remain | |
| # without a neighbouring token header, flag as v1. | |
| has_v2 = any(indicator in content for indicator in IMDSV2_INDICATORS) | |
| if not has_v2: | |
| # No v2 patterns at all — this is pure IMDSv1 | |
| return True | |
| # Has some v2 indicators. Walk through each line that hits the | |
| # metadata endpoint and check if it carries a token header. | |
| for line in content.splitlines(): | |
| line_stripped = line.strip() | |
| if line_stripped.startswith("#"): | |
| continue | |
| if METADATA_ENDPOINT not in line_stripped: | |
| continue | |
| # Skip the token-fetch line itself (PUT to /latest/api/token) | |
| if "/latest/api/token" in line_stripped: | |
| continue | |
| # If this metadata call lacks a token header, it's v1 | |
| if "X-aws-ec2-metadata-token" not in line_stripped: | |
| return True | |
| return False | |
| def scan_repo(repo_dir: str) -> list[str]: | |
| """Walk the repo and return relative paths of shell scripts using IMDSv1.""" | |
| hits = [] | |
| for root, _dirs, files in os.walk(repo_dir): | |
| # Skip .git directory | |
| if ".git" in root.split(os.sep): | |
| continue | |
| for fname in files: | |
| fpath = os.path.join(root, fname) | |
| if is_shell_script(fpath) and uses_imdsv1(fpath): | |
| relpath = os.path.relpath(fpath, repo_dir) | |
| hits.append(relpath) | |
| return sorted(hits) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Scan git repos for shell scripts making AWS IMDSv1 calls." | |
| ) | |
| parser.add_argument( | |
| "repo_list", | |
| help="Text file with one git repository URL per line.", | |
| ) | |
| parser.add_argument( | |
| "-o", | |
| "--output", | |
| default="imdsv1_results.csv", | |
| help="Output CSV file (default: imdsv1_results.csv).", | |
| ) | |
| parser.add_argument( | |
| "-w", | |
| "--workdir", | |
| default=None, | |
| help="Working directory for cloned repos (default: temp directory, auto-cleaned).", | |
| ) | |
| args = parser.parse_args() | |
| # Read repo URLs | |
| with open(args.repo_list) as f: | |
| urls = [line.strip() for line in f if line.strip() and not line.startswith("#")] | |
| if not urls: | |
| print("No repository URLs found in the input file.", file=sys.stderr) | |
| sys.exit(1) | |
| print(f"Found {len(urls)} repository URL(s) to scan.") | |
| # Decide on working directory | |
| use_temp = args.workdir is None | |
| workdir = tempfile.mkdtemp(prefix="imdsv1_scan_") if use_temp else os.path.abspath(args.workdir) | |
| os.makedirs(workdir, exist_ok=True) | |
| results: list[dict] = [] | |
| try: | |
| for i, url in enumerate(urls, 1): | |
| repo_name = url.rstrip("/").rsplit("/", 1)[-1].removesuffix(".git") | |
| dest = os.path.join(workdir, f"{i}_{repo_name}") | |
| print(f"\n[{i}/{len(urls)}] Cloning {url} ...") | |
| if not clone_repo(url, dest): | |
| continue | |
| print(f" Scanning for IMDSv1 usage ...") | |
| hits = scan_repo(dest) | |
| if hits: | |
| print(f" Found {len(hits)} file(s) with IMDSv1 calls:") | |
| for h in hits: | |
| print(f" - {h}") | |
| results.append({"repository_url": url, "file_path": h}) | |
| else: | |
| print(f" No IMDSv1 usage found.") | |
| finally: | |
| if use_temp: | |
| print(f"\nCleaning up temp directory: {workdir}") | |
| shutil.rmtree(workdir, ignore_errors=True) | |
| # Write CSV | |
| with open(args.output, "w", newline="") as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=["repository_url", "file_path"]) | |
| writer.writeheader() | |
| writer.writerows(results) | |
| print(f"\nDone. {len(results)} result(s) written to {args.output}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment