Created
June 4, 2025 04:22
-
-
Save SoftPoison/54796aff469f92be59db72447e2dd7ce to your computer and use it in GitHub Desktop.
Converts users collected from roadrecon into data that can be ingested into bloodhound legacy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sqlite3 | |
| import json | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| CONTAINER = "F9250CCA-DA7B-431C-9F4F-F942ED3D6536" | |
| @dataclass() | |
| class RoadGroup: | |
| objectId: str | |
| description: str | |
| displayName: str | |
| onPremisesSecurityIdentifier: str | |
| createdDateTime: datetime | |
| def to_bloodhound(self, all_users: list['RoadUser'], group_user_links: list[(str, str)], all_groups: list['RoadGroup'], group_childgroup_links: list[(str, str)], domain_name: str, domain_sid: str) -> dict: | |
| members = [] | |
| for (group_id, user_id) in group_user_links: | |
| if self.objectId != group_id: | |
| continue | |
| for user in all_users: # should use a hash map but I'm lazy | |
| if user_id == user.objectId: | |
| members.append({ | |
| 'ObjectIdentifier': user.onPremisesSecurityIdentifier, | |
| 'ObjectType': 'User', | |
| }) | |
| break | |
| for (group_id, childgroup_id) in group_childgroup_links: | |
| if self.objectId != group_id: | |
| continue | |
| for group in all_groups: # should use a hash map but I'm lazy | |
| if childgroup_id == group.objectId: | |
| members.append({ | |
| 'ObjectIdentifier': group.onPremisesSecurityIdentifier, | |
| 'ObjectType': 'Group', | |
| }) | |
| break | |
| return { | |
| "Properties": { | |
| "domain": domain_name.capitalize(), | |
| "name": self.displayName.capitalize() + '@' + domain_name.capitalize(), | |
| "distinguishedname": f"CN={self.displayName.capitalize()},CN=USERS,DC=" + ',DC='.join(domain_name.capitalize().split('.')), | |
| "domainsid": domain_sid, | |
| "samaccountname": self.displayName, | |
| "isaclprotected": False, | |
| "description": self.description, | |
| "whencreated": 1070889094, | |
| "admincount": False | |
| }, | |
| "Members": members, | |
| "Aces": [], | |
| "ObjectIdentifier": self.onPremisesSecurityIdentifier, | |
| "IsDeleted": False, | |
| "IsACLProtected": False, | |
| "ContainedBy": { | |
| "ObjectIdentifier": CONTAINER, | |
| "ObjectType": "Container" | |
| } | |
| } | |
| @dataclass() | |
| class RoadUser: | |
| displayName: str | |
| onPremisesSecurityIdentifier: str | |
| objectId: str | |
| onPremisesDistinguishedName: str | |
| accountEnabled: bool | |
| createdDateTime: str | |
| lastPasswordChangeDateTime: str | |
| def to_bloodhound(self, domain_name: str, domain_sid: str) -> dict: | |
| return { | |
| "Properties": { | |
| "domain": domain_name.capitalize(), | |
| "name": self.displayName.capitalize() + '@' + domain_name.capitalize(), | |
| "distinguishedname": self.onPremisesDistinguishedName, | |
| "domainsid": domain_sid, | |
| "samaccountname": self.displayName, | |
| "isaclprotected": False, | |
| "description": "", | |
| "whencreated": int(datetime.fromisoformat(self.createdDateTime).timestamp()), | |
| "sensitive": False, | |
| "dontreqpreauth": False, | |
| "passwordnotreqd": False, | |
| "unconstraineddelegation": False, | |
| "pwdneverexpires": True, | |
| "enabled": self.accountEnabled, | |
| "trustedtoauth": False, | |
| "smartcardrequired": False, | |
| "encryptedtextpwdallowed": False, | |
| "usedeskeyonly": False, | |
| "logonscriptenabled": False, | |
| "lockedout": False, | |
| "passwordcantchange": False, | |
| "passwordexpired": False, | |
| "lastlogon": -1, | |
| "lastlogontimestamp": -1, | |
| "pwdlastset": int(datetime.fromisoformat(self.lastPasswordChangeDateTime).timestamp()), | |
| "serviceprincipalnames": [], | |
| "hasspn": False, | |
| "displayname": self.displayName, | |
| "email": None, | |
| "title": None, | |
| "homedirectory": None, | |
| "userpassword": None, | |
| "unixpassword": None, | |
| "unicodepassword": None, | |
| "sfupassword": None, | |
| "logonscript": None, | |
| "useraccountcontrol": 66050, | |
| "profilepath": None, | |
| "admincount": None, | |
| "supportedencryptiontypes": None, | |
| "sidhistory": [] | |
| }, | |
| "AllowedToDelegate": [], | |
| "PrimaryGroupSID": f"{domain_sid}-513", | |
| "HasSIDHistory": [], | |
| "SPNTargets": [], | |
| "UnconstrainedDelegation": False, | |
| "DomainSID": domain_sid, | |
| "Aces": [], | |
| "ObjectIdentifier": self.onPremisesSecurityIdentifier, | |
| "IsDeleted": False, | |
| "IsACLProtected": False, | |
| "ContainedBy": { | |
| "ObjectIdentifier": CONTAINER, | |
| "ObjectType": "Container" | |
| } | |
| } | |
| def get_groups(con: sqlite3.Connection) -> list[RoadGroup]: | |
| cur = con.cursor() | |
| cur.execute(''' | |
| select objectId, | |
| description, | |
| displayName, | |
| onPremisesSecurityIdentifier, | |
| createdDateTime | |
| from Groups | |
| where onPremisesSecurityIdentifier is not NULL | |
| ''') | |
| groups = [] | |
| group = cur.fetchone() | |
| while group is not None: | |
| groups.append(RoadGroup(*group)) | |
| group = cur.fetchone() | |
| return groups | |
| def get_users(con: sqlite3.Connection) -> list[RoadUser]: | |
| cur = con.cursor() | |
| cur.execute(''' | |
| select displayName, | |
| onPremisesSecurityIdentifier, | |
| objectId, | |
| onPremisesDistinguishedName, | |
| accountEnabled, | |
| createdDateTime, | |
| lastPasswordChangeDateTime | |
| from Users | |
| where onPremisesSecurityIdentifier is not NULL | |
| ''') | |
| users = [] | |
| user = cur.fetchone() | |
| while user is not None: | |
| users.append(RoadUser(*user)) | |
| user = cur.fetchone() | |
| return users | |
| def get_group_user_links(con: sqlite3.Connection) -> list[(str, str)]: | |
| cur = con.cursor() | |
| cur.execute('select * from lnk_group_member_user') | |
| return cur.fetchall() | |
| def get_group_childgroup_links(con: sqlite3.Connection) -> list[(str, str)]: | |
| cur = con.cursor() | |
| cur.execute('select * from lnk_group_member_group') | |
| return cur.fetchall() | |
| def main(db: str, domain_name: str, domain_sid: str): | |
| with sqlite3.connect(db) as con: | |
| users = get_users(con) | |
| groups = get_groups(con) | |
| group_user_links = get_group_user_links(con) | |
| group_childgroup_links = get_group_childgroup_links(con) | |
| bh_users = json.dumps({ | |
| 'data': [u.to_bloodhound(domain_name, domain_sid) for u in users], | |
| 'meta': { | |
| "methods": 262885, | |
| "type": "users", | |
| "count": len(users), | |
| "version": 6, | |
| "collectorversion": "2.5.8.0" | |
| } | |
| }) | |
| bh_groups = json.dumps({ | |
| 'data': [g.to_bloodhound(users, group_user_links, groups, group_childgroup_links, domain_name, domain_sid) for g in groups], | |
| "meta": { | |
| "methods": 262885, | |
| "type": "groups", | |
| "count": len(groups), | |
| "version": 6, | |
| "collectorversion": "2.5.8.0" | |
| } | |
| }) | |
| with open('aad_users.json', 'w') as f: | |
| f.write(bh_users) | |
| with open('aad_groups.json', 'w') as f: | |
| f.write(bh_groups) | |
| if __name__ == '__main__': | |
| # todo: merge with existing bloodhound data | |
| from sys import argv | |
| if len(argv) != 4: | |
| print("Usage: python3 road2bh.py <roadrecon.db> <domain.example.com> <S-1-5-21-1234567890-1234567890-1234567890>") | |
| exit(1) | |
| main(argv[1], argv[2], argv[3]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment