Created
September 12, 2024 10:05
-
-
Save LordVeovis/5166bb00d61d1afed6cdf9b707bc1371 to your computer and use it in GitHub Desktop.
Script to upload SBOM of all running containers on the local machine to Dependency-Track
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3 | |
| from sys import exit, platform | |
| from os import geteuid, getgroups | |
| from subprocess import Popen, PIPE, run | |
| import json | |
| import requests | |
| import re | |
| import grp | |
| import argparse | |
| import os | |
| import http | |
| import logging | |
| DT_APIKEY = os.environ["api"] | |
| DT_URL = "https://api.dependency-track.kveer.fr/api" | |
| DOCKER_TAG_RE = "^(?P<registry>(?=[^:\/]{4,253})(?!-)[a-zA-Z0-9-]{1,63}(?<!-)(?:\.(?!-)[a-zA-Z0-9-]{1,63}(?<!-))*(?::[0-9]{1,5})?/)?(?P<image>(?![._-])(?:[a-z0-9._-]*)(?<![._-])(?:/(?![._-])[a-z0-9._-]*(?<![._-]))*)(?P<version>:(?![.-])[a-zA-Z0-9_.-]{1,128})?$" | |
| DEBUG = True | |
| def execute_process(cmd, timeout=10): | |
| p = run(cmd, stdout=PIPE, shell=True, check=True, text=True, timeout=timeout) | |
| return {"stdout": p.stdout, "retcode": p.returncode} | |
| def get_existing_projects(): | |
| headers = {"X-Api-Key": DT_APIKEY} | |
| r = requests.get(url=DT_URL + "/v1/project", headers=headers) | |
| assert r.status_code == 200 | |
| projects = dict() | |
| for p in r.json(): | |
| name = p["name"] | |
| uuid = p["uuid"] | |
| version = p["version"] if "version" in p else None | |
| if name not in projects: | |
| projects[name] = {"uuid": uuid, "versions": dict()} | |
| if version != None and version not in projects[name]["versions"]: | |
| projects[name]["versions"][version] = uuid | |
| return projects | |
| def create_project(name, version, purl): | |
| print(f"Creating project {name}...") | |
| headers = {"X-Api-Key": DT_APIKEY} | |
| json = { | |
| "name": name, | |
| "version": version, | |
| "purl": purl, | |
| "description": "AUTO CREATED", | |
| "classifier": "CONTAINER", | |
| } | |
| r = requests.put(url=DT_URL + "/v1/project", headers=headers, json=json) | |
| assert r.status_code == 200 or r.status_code == 201 | |
| return r.json() | |
| def add_version(uuid, version): | |
| print(f"Adding version {version} to project {uuid}...") | |
| headers = {"X-Api-Key": DT_APIKEY} | |
| json = { | |
| "includeACL": False, | |
| "includeAuditHistory": False, | |
| "includeComponents": False, | |
| "includePolicyViolations": False, | |
| "includeProperties": False, | |
| "includeServices": False, | |
| "includeTags": False, | |
| "project": uuid, | |
| "version": version, | |
| } | |
| r = requests.put(url=DT_URL + "/v1/project/clone", headers=headers, json=json) | |
| assert r.status_code == 200 | |
| # this token indicates when the task is done with /v1/event/token/{uuid} | |
| # let assume the cloning is instant (which is not) | |
| token = r.json()["token"] | |
| r = requests.get(url=f"{DT_URL}/v1/project/{uuid}", headers=headers) | |
| assert r.status_code == 200 | |
| pv = r.json()["versions"] | |
| v = next(filter(lambda el: el["version"] == version, pv))["uuid"] | |
| return v | |
| def upload_sbom(uuid, sbom): | |
| headers = {"X-Api-Key": DT_APIKEY, "Accept": "application/json"} | |
| data = { | |
| "project": uuid, | |
| } | |
| files = {"bom": ("bom.json", sbom.encode("utf-8"), "application/json")} | |
| # http.client.HTTPConnection.debuglevel = 1 | |
| # logging.basicConfig() | |
| # logging.getLogger().setLevel(logging.DEBUG) | |
| # requests_log = logging.getLogger("requests.packages.urllib3") | |
| # requests_log.setLevel(logging.DEBUG) | |
| # requests_log.propagate = True | |
| r = requests.post(url=DT_URL + "/v1/bom", headers=headers, data=data, files=files) | |
| if DEBUG and r.status_code != 200: | |
| print(f"DEBUG: status_code={r.status_code}") | |
| print(data) | |
| assert r.status_code == 200 | |
| parser = argparse.ArgumentParser(description="sample argument parser") | |
| parser.add_argument("--dockerimage") | |
| args = parser.parse_args() | |
| # control args | |
| if args.dockerimage != None: | |
| m = re.fullmatch(DOCKER_TAG_RE, args.dockerimage) | |
| if m == None: | |
| exit("Please specify a valid docker tag to --dockerimage") | |
| if platform != "linux": | |
| exit("This script is only compatible with bash") | |
| try: | |
| docker_group = grp.getgrnam("docker").gr_gid | |
| except KeyError: | |
| docker_group = -1 | |
| if geteuid() != 0 and docker_group not in getgroups(): | |
| exit("You must be root for using docker") | |
| # max cmdline | |
| ret = execute_process("expr $(getconf ARG_MAX) - $(env | wc -c)") | |
| assert ret["retcode"] == 0 | |
| ARG_MAX = int(ret["stdout"]) | |
| # ARG_MAX is large enough to manage 29k image on the same line | |
| # avoiding us to manage the pagination | |
| assert ARG_MAX >= 200000 | |
| images_ids_tags = None | |
| if args.dockerimage == None: | |
| # retrieving running containers ID | |
| ret = execute_process("docker ps --format '{{.ID}}' | sort | uniq") | |
| assert ret["retcode"] == 0 | |
| running_containers = ret["stdout"].splitlines() | |
| # retrieving images ID of running containers | |
| _cid_line = str.join(" ", running_containers) | |
| ret = execute_process( | |
| f"docker inspect {_cid_line} | jq '[.[] | {{Id, Image, Name}}]' | jq -r '.[] | .Image'" | |
| ) | |
| assert ret["retcode"] == 0 | |
| used_images_ids = ret["stdout"].splitlines() | |
| # retrieving tag of used images ID | |
| _iid_line = str.join(" ", used_images_ids) | |
| ret = execute_process( | |
| f"docker image inspect {_iid_line} | jq '[.[] | {{Id, RepoTags, Author, Labels: .Config.Labels}}]'" | |
| ) | |
| assert ret["retcode"] == 0 | |
| images_ids_tags = json.loads(ret["stdout"]) | |
| else: | |
| ret = execute_process( | |
| f"docker image inspect {args.dockerimage} | jq '[.[] | {{Id, RepoTags, Author, Labels: .Config.Labels}}]'" | |
| ) | |
| assert ret["retcode"] == 0 | |
| images_ids_tags = json.loads(ret["stdout"]) | |
| # getting all existing projects from dependency-track | |
| projects = get_existing_projects() | |
| def split_docker_tag(tag): | |
| m = re.fullmatch(DOCKER_TAG_RE, tag) | |
| image = m.group("image") | |
| version = m.group("version")[1:] if m.group("version") != None else "latest" | |
| registry = m.group("registry")[:-1] if m.group("registry") != None else None | |
| return {"image": image, "version": version, "registry": registry} | |
| # pkg:docker/cassandra@latest | |
| # for each used images | |
| for img in images_ids_tags: | |
| if len(img["RepoTags"]) == 0: | |
| print("Image %s has no tags, passing." % (img["Id"])) | |
| continue | |
| assert len(img["RepoTags"]) == 1 | |
| repotag = img["RepoTags"][0] | |
| print(f"Handling image {repotag}") | |
| dd = split_docker_tag(repotag) | |
| purl = f"pkg:docker/{dd['image']}@{dd['version']}" + ( | |
| ("?repository_url=" + dd["registry"]) if dd["registry"] != None else "" | |
| ) | |
| project_name = ( | |
| "docker-" | |
| + (dd["registry"] + "/" if dd["registry"] != None else "") | |
| + dd["image"] | |
| ) | |
| # create project if not exists | |
| if project_name not in projects: | |
| j = create_project(project_name, dd["version"], purl) | |
| # add new project to projects | |
| projects[project_name] = { | |
| "uuid": j["uuid"], | |
| "versions": dict({dd["version"]: j["uuid"]}), | |
| } | |
| # create version if not exists | |
| elif dd["version"] not in projects[project_name]["versions"]: | |
| new_uuid = add_version(projects[project_name]["uuid"], dd["version"]) | |
| projects[project_name]["versions"][dd["version"]] = new_uuid | |
| # generating SBOM | |
| # res = execute_process(f"docker sbom --format cyclonedx-json {docker_project_name}") | |
| res = execute_process(f"syft scan -o cyclonedx-json@1.5 {repotag}", timeout=300) | |
| assert ret["retcode"] == 0 | |
| sbom = res["stdout"] | |
| # uploading SBOM | |
| upload_sbom(projects[project_name]["versions"][dd["version"]], sbom) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment