Skip to content

Instantly share code, notes, and snippets.

@gene1wood
Created March 30, 2016 18:08
Show Gist options
  • Select an option

  • Save gene1wood/d0b6d1f030ab5747dad1837f53f9473f to your computer and use it in GitHub Desktop.

Select an option

Save gene1wood/d0b6d1f030ab5747dad1837f53f9473f to your computer and use it in GitHub Desktop.
Example search script to look through CloudTrail files to either compress each record into a single line, search for calls from certain networks, or search for specific api actions
#!/usr/bin/env python
import sys
import json
import gzip
# sudo pip install py2-ipaddress
from ipaddress import IPv4Address, IPv4Network
ips = []
def show_short(record, filename, networks=[]):
if len(networks) > 0:
try:
record_ip = IPv4Address(record['sourceIPAddress'])
except:
# Not an IP
return False
for network in networks:
if record_ip not in IPv4Network(network):
return False
# print("%s %s:%s" % (record['eventTime'], record['eventSource'], record['eventName']))
if 'userIdentity' in record:
if record['userIdentity']['type'] == "IAMUser":
identity = "%s:%s" % (record['userIdentity']['userName'], record['userIdentity']['type'])
elif record['userIdentity']['type'] == "AssumedRole":
if 'sessionContext' in record['userIdentity']:
try:
identity = "%s:%s" % (record['userIdentity']['sessionContext']['sessionIssuer']['userName'], record['userIdentity']['type'])
except:
identity = "unknown:unknown"
print(record)
elif 'arn' in record['userIdentity']:
identity = "%s:%s" % (record['userIdentity']['arn'], record['userIdentity']['type'])
elif record['userIdentity']['type'] == "Root":
identity = "%s:%s" % ('Root', record['userIdentity']['type'])
elif record['userIdentity']['type'] == "Unknown":
identity = "unknown:unknown"
else:
identity = "unknown:unknown"
print(record)
else:
user = "unknown:unknown"
print(record)
print("%s %s %s@%s %s:%s" % (filename, record['eventTime'], identity, record['sourceIPAddress'], record['eventSource'], record['eventName']))
def show_long(record, filename, event_name=None):
record['filename'] = filename
if event_name is not None and record['eventName'] != event_name:
return False
print(json.dumps(record, sort_keys=True, indent=4, separators=(',', ': ')))
with gzip.open(sys.argv[1]) as f:
j = json.load(f)
for record in j['Records']:
# show_short(record, sys.argv[1], networks=['203.0.113.0/24'])
# show_short(record, sys.argv[1])
# show_long(record, sys.argv[1], 'CreateBucket')
show_long(record, sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment