Created
March 30, 2016 18:08
-
-
Save gene1wood/d0b6d1f030ab5747dad1837f53f9473f to your computer and use it in GitHub Desktop.
Example search script to look through CloudTrail files to either compress each record into a single line, search for calls from certain networks, or search for specific api actions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import sys | |
| import json | |
| import gzip | |
| # sudo pip install py2-ipaddress | |
| from ipaddress import IPv4Address, IPv4Network | |
| ips = [] | |
| def show_short(record, filename, networks=[]): | |
| if len(networks) > 0: | |
| try: | |
| record_ip = IPv4Address(record['sourceIPAddress']) | |
| except: | |
| # Not an IP | |
| return False | |
| for network in networks: | |
| if record_ip not in IPv4Network(network): | |
| return False | |
| # print("%s %s:%s" % (record['eventTime'], record['eventSource'], record['eventName'])) | |
| if 'userIdentity' in record: | |
| if record['userIdentity']['type'] == "IAMUser": | |
| identity = "%s:%s" % (record['userIdentity']['userName'], record['userIdentity']['type']) | |
| elif record['userIdentity']['type'] == "AssumedRole": | |
| if 'sessionContext' in record['userIdentity']: | |
| try: | |
| identity = "%s:%s" % (record['userIdentity']['sessionContext']['sessionIssuer']['userName'], record['userIdentity']['type']) | |
| except: | |
| identity = "unknown:unknown" | |
| print(record) | |
| elif 'arn' in record['userIdentity']: | |
| identity = "%s:%s" % (record['userIdentity']['arn'], record['userIdentity']['type']) | |
| elif record['userIdentity']['type'] == "Root": | |
| identity = "%s:%s" % ('Root', record['userIdentity']['type']) | |
| elif record['userIdentity']['type'] == "Unknown": | |
| identity = "unknown:unknown" | |
| else: | |
| identity = "unknown:unknown" | |
| print(record) | |
| else: | |
| user = "unknown:unknown" | |
| print(record) | |
| print("%s %s %s@%s %s:%s" % (filename, record['eventTime'], identity, record['sourceIPAddress'], record['eventSource'], record['eventName'])) | |
| def show_long(record, filename, event_name=None): | |
| record['filename'] = filename | |
| if event_name is not None and record['eventName'] != event_name: | |
| return False | |
| print(json.dumps(record, sort_keys=True, indent=4, separators=(',', ': '))) | |
| with gzip.open(sys.argv[1]) as f: | |
| j = json.load(f) | |
| for record in j['Records']: | |
| # show_short(record, sys.argv[1], networks=['203.0.113.0/24']) | |
| # show_short(record, sys.argv[1]) | |
| # show_long(record, sys.argv[1], 'CreateBucket') | |
| show_long(record, sys.argv[1]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment