Skip to content

Instantly share code, notes, and snippets.

@LordVeovis
Created September 12, 2024 10:05
Show Gist options
  • Select an option

  • Save LordVeovis/5166bb00d61d1afed6cdf9b707bc1371 to your computer and use it in GitHub Desktop.

Select an option

Save LordVeovis/5166bb00d61d1afed6cdf9b707bc1371 to your computer and use it in GitHub Desktop.
Script to upload SBOM of all running containers on the local machine to Dependency-Track
#!/usr/bin/python3
from sys import exit, platform
from os import geteuid, getgroups
from subprocess import Popen, PIPE, run
import json
import requests
import re
import grp
import argparse
import os
import http
import logging
DT_APIKEY = os.environ["api"]
DT_URL = "https://api.dependency-track.kveer.fr/api"
DOCKER_TAG_RE = "^(?P<registry>(?=[^:\/]{4,253})(?!-)[a-zA-Z0-9-]{1,63}(?<!-)(?:\.(?!-)[a-zA-Z0-9-]{1,63}(?<!-))*(?::[0-9]{1,5})?/)?(?P<image>(?![._-])(?:[a-z0-9._-]*)(?<![._-])(?:/(?![._-])[a-z0-9._-]*(?<![._-]))*)(?P<version>:(?![.-])[a-zA-Z0-9_.-]{1,128})?$"
DEBUG = True
def execute_process(cmd, timeout=10):
p = run(cmd, stdout=PIPE, shell=True, check=True, text=True, timeout=timeout)
return {"stdout": p.stdout, "retcode": p.returncode}
def get_existing_projects():
headers = {"X-Api-Key": DT_APIKEY}
r = requests.get(url=DT_URL + "/v1/project", headers=headers)
assert r.status_code == 200
projects = dict()
for p in r.json():
name = p["name"]
uuid = p["uuid"]
version = p["version"] if "version" in p else None
if name not in projects:
projects[name] = {"uuid": uuid, "versions": dict()}
if version != None and version not in projects[name]["versions"]:
projects[name]["versions"][version] = uuid
return projects
def create_project(name, version, purl):
print(f"Creating project {name}...")
headers = {"X-Api-Key": DT_APIKEY}
json = {
"name": name,
"version": version,
"purl": purl,
"description": "AUTO CREATED",
"classifier": "CONTAINER",
}
r = requests.put(url=DT_URL + "/v1/project", headers=headers, json=json)
assert r.status_code == 200 or r.status_code == 201
return r.json()
def add_version(uuid, version):
print(f"Adding version {version} to project {uuid}...")
headers = {"X-Api-Key": DT_APIKEY}
json = {
"includeACL": False,
"includeAuditHistory": False,
"includeComponents": False,
"includePolicyViolations": False,
"includeProperties": False,
"includeServices": False,
"includeTags": False,
"project": uuid,
"version": version,
}
r = requests.put(url=DT_URL + "/v1/project/clone", headers=headers, json=json)
assert r.status_code == 200
# this token indicates when the task is done with /v1/event/token/{uuid}
# let assume the cloning is instant (which is not)
token = r.json()["token"]
r = requests.get(url=f"{DT_URL}/v1/project/{uuid}", headers=headers)
assert r.status_code == 200
pv = r.json()["versions"]
v = next(filter(lambda el: el["version"] == version, pv))["uuid"]
return v
def upload_sbom(uuid, sbom):
headers = {"X-Api-Key": DT_APIKEY, "Accept": "application/json"}
data = {
"project": uuid,
}
files = {"bom": ("bom.json", sbom.encode("utf-8"), "application/json")}
# http.client.HTTPConnection.debuglevel = 1
# logging.basicConfig()
# logging.getLogger().setLevel(logging.DEBUG)
# requests_log = logging.getLogger("requests.packages.urllib3")
# requests_log.setLevel(logging.DEBUG)
# requests_log.propagate = True
r = requests.post(url=DT_URL + "/v1/bom", headers=headers, data=data, files=files)
if DEBUG and r.status_code != 200:
print(f"DEBUG: status_code={r.status_code}")
print(data)
assert r.status_code == 200
parser = argparse.ArgumentParser(description="sample argument parser")
parser.add_argument("--dockerimage")
args = parser.parse_args()
# control args
if args.dockerimage != None:
m = re.fullmatch(DOCKER_TAG_RE, args.dockerimage)
if m == None:
exit("Please specify a valid docker tag to --dockerimage")
if platform != "linux":
exit("This script is only compatible with bash")
try:
docker_group = grp.getgrnam("docker").gr_gid
except KeyError:
docker_group = -1
if geteuid() != 0 and docker_group not in getgroups():
exit("You must be root for using docker")
# max cmdline
ret = execute_process("expr $(getconf ARG_MAX) - $(env | wc -c)")
assert ret["retcode"] == 0
ARG_MAX = int(ret["stdout"])
# ARG_MAX is large enough to manage 29k image on the same line
# avoiding us to manage the pagination
assert ARG_MAX >= 200000
images_ids_tags = None
if args.dockerimage == None:
# retrieving running containers ID
ret = execute_process("docker ps --format '{{.ID}}' | sort | uniq")
assert ret["retcode"] == 0
running_containers = ret["stdout"].splitlines()
# retrieving images ID of running containers
_cid_line = str.join(" ", running_containers)
ret = execute_process(
f"docker inspect {_cid_line} | jq '[.[] | {{Id, Image, Name}}]' | jq -r '.[] | .Image'"
)
assert ret["retcode"] == 0
used_images_ids = ret["stdout"].splitlines()
# retrieving tag of used images ID
_iid_line = str.join(" ", used_images_ids)
ret = execute_process(
f"docker image inspect {_iid_line} | jq '[.[] | {{Id, RepoTags, Author, Labels: .Config.Labels}}]'"
)
assert ret["retcode"] == 0
images_ids_tags = json.loads(ret["stdout"])
else:
ret = execute_process(
f"docker image inspect {args.dockerimage} | jq '[.[] | {{Id, RepoTags, Author, Labels: .Config.Labels}}]'"
)
assert ret["retcode"] == 0
images_ids_tags = json.loads(ret["stdout"])
# getting all existing projects from dependency-track
projects = get_existing_projects()
def split_docker_tag(tag):
m = re.fullmatch(DOCKER_TAG_RE, tag)
image = m.group("image")
version = m.group("version")[1:] if m.group("version") != None else "latest"
registry = m.group("registry")[:-1] if m.group("registry") != None else None
return {"image": image, "version": version, "registry": registry}
# pkg:docker/cassandra@latest
# for each used images
for img in images_ids_tags:
if len(img["RepoTags"]) == 0:
print("Image %s has no tags, passing." % (img["Id"]))
continue
assert len(img["RepoTags"]) == 1
repotag = img["RepoTags"][0]
print(f"Handling image {repotag}")
dd = split_docker_tag(repotag)
purl = f"pkg:docker/{dd['image']}@{dd['version']}" + (
("?repository_url=" + dd["registry"]) if dd["registry"] != None else ""
)
project_name = (
"docker-"
+ (dd["registry"] + "/" if dd["registry"] != None else "")
+ dd["image"]
)
# create project if not exists
if project_name not in projects:
j = create_project(project_name, dd["version"], purl)
# add new project to projects
projects[project_name] = {
"uuid": j["uuid"],
"versions": dict({dd["version"]: j["uuid"]}),
}
# create version if not exists
elif dd["version"] not in projects[project_name]["versions"]:
new_uuid = add_version(projects[project_name]["uuid"], dd["version"])
projects[project_name]["versions"][dd["version"]] = new_uuid
# generating SBOM
# res = execute_process(f"docker sbom --format cyclonedx-json {docker_project_name}")
res = execute_process(f"syft scan -o cyclonedx-json@1.5 {repotag}", timeout=300)
assert ret["retcode"] == 0
sbom = res["stdout"]
# uploading SBOM
upload_sbom(projects[project_name]["versions"][dd["version"]], sbom)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment