Skip to content

Instantly share code, notes, and snippets.

@tembleking
Created July 7, 2021 08:51
Show Gist options
  • Select an option

  • Save tembleking/99c3087c9628f603966a76618c16a8c7 to your computer and use it in GitHub Desktop.

Select an option

Save tembleking/99c3087c9628f603966a76618c16a8c7 to your computer and use it in GitHub Desktop.
Exports Policies and Rules in CSV format from Sysdig Secure
#!/usr/bin/env python
import csv
import os
from multiprocessing.pool import ThreadPool
from sdcclient import SdSecureClient
from sdcclient.secure import PolicyClientV2
severity_mapping = ["High"] * 4 + ["Medium"] * 2 + ["Low", "Info"]
def retrieve_connection_details_from_env_vars():
url = os.getenv("SDC_SECURE_URL")
token = os.getenv("SDC_SECURE_TOKEN")
ssl_validation = os.getenv("SDC_SSL_VERIFY", True)
if url is None:
raise ValueError("The SDC_SECURE_URL environment variable must be set.")
if token is None:
raise ValueError("The SDC_SECURE_TOKEN environment variable must be set.")
return url, token, ssl_validation
def write_to_csv(policies, rules_by_name):
headers = ["Runtime Policy Severity",
"Runtime Policy Name",
"Runtime Policy Description",
"Falco Rule Name",
"Falco Rule Condition",
"Falco Rule Output",
"Falco Rule Description",
"Falco Rule Tags"]
all_rows = []
for policy in policies:
rules_in_this_policy = [rules_by_name[rule_name] for rule_name in policy["ruleNames"]]
for rule in rules_in_this_policy:
all_rows.append(
[severity_mapping[policy["severity"]],
policy["name"],
policy["description"],
rule["name"],
rule["details"]["condition"]["condition"],
rule["details"]["output"],
rule["description"],
",".join(rule["tags"])]
)
with open('policies_and_rules.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_MINIMAL)
writer.writerow(headers)
writer.writerows(all_rows)
def retrieve_rule(client: SdSecureClient, rule_id):
ok, rule = client.get_rule_id(rule_id)
if not ok:
raise RuntimeError(rule)
return rule
def retrieve_rules(client: SdSecureClient):
ok, rules = client.list_rules()
if not ok:
raise RuntimeError(rules)
pool = ThreadPool()
all_rules_info = [pool.apply_async(retrieve_rule, (client, id)) for rule in rules for id in rule["ids"]]
pool.close()
all_rules_info = [info.get() for info in all_rules_info]
return all_rules_info
def main():
url, token, ssl_validation = retrieve_connection_details_from_env_vars()
policy_client = PolicyClientV2(sdc_url=url, token=token, ssl_verify=ssl_validation)
rule_client = SdSecureClient(sdc_url=url, token=token, ssl_verify=ssl_validation)
rules = retrieve_rules(rule_client)
rules_by_name = {rule["name"]: rule for rule in rules}
ok, policies = policy_client.list_policies()
if not ok:
raise RuntimeError(policies)
write_to_csv(policies, rules_by_name)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment