Instantly share code, notes, and snippets.
Created
November 26, 2024 08:58
-
Star
0
(0)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
-
Save maxcutlyp/dbea4e3e52658200c98c1c7535503ae8 to your computer and use it in GitHub Desktop.
SharePoint Permissions Audit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ''' | |
| # SharePoint permissions audit | |
| Creates perms.csv, designed to be pivoted in Excel, containing all unique | |
| (non-inherited) permissions on all SharePoint folders across all sites in | |
| the tenant. Pivot on the URL field to get all users/groups with permission | |
| to a given folder, or pivot on the User/Group field to get all the folders | |
| a given user/group has permissions on. | |
| You'll need to create an App Registration - follow this guide: | |
| https://learn.microsoft.com/en-us/entra/identity-platform/quickstart-register-app?tabs=client-secret | |
| Add the permission Sites.FullControl.All and grant admin consent. Copy the tenant | |
| ID, client ID, create a secret and copy its value. Add them as environment | |
| variables (see below). | |
| It will generate some edge-case errors, for example with hidden sites or | |
| folders that get deleted between indexing and fetching. The assumption is | |
| that these won't have any permissions that you care about anyway, so they | |
| are logged and ignored. | |
| ''' | |
| import asyncio | |
| import msal | |
| import typing as t | |
| import aiohttp | |
| import aiofiles | |
| import os | |
| import urllib.parse | |
| from datetime import datetime, timedelta | |
| try: | |
| CLIENT_ID = os.environ['CLIENT_ID'] | |
| CLIENT_SECRET = os.environ['CLIENT_SECRET'] | |
| TENANT_ID = os.environ['TENANT_ID'] | |
| except KeyError: | |
| print('Please set environment variables CLIENT_ID, CLIENT_SECRET, and TENANT_ID.') | |
| print('Application must have Sites.FullControl.All permission.') | |
| exit(1) | |
| SCOPES = ['https://graph.microsoft.com/.default'] | |
| BASE_URL = 'https://graph.microsoft.com/v1.0' | |
| async def get_msal_token(tenant_id: str, client_id: str, client_secret: str) -> str: | |
| app = msal.ConfidentialClientApplication( | |
| client_id, | |
| authority=f'https://login.microsoftonline.com/{tenant_id}', | |
| client_credential=client_secret, | |
| ) | |
| result = app.acquire_token_for_client(scopes=SCOPES) | |
| assert result is not None | |
| try: | |
| return result['access_token'] | |
| except KeyError: | |
| raise Exception( | |
| 'Couldn\'t get access_token from Azure AD:\n' + | |
| f'{result.get("error")}\n' + | |
| f'{result.get("error_description")}\n' + | |
| f'Correlation ID: {result.get("correlation_id")}' | |
| ) | |
| class HttpException(Exception): | |
| def __init__(self, endpoint: str, status: int, response_text: str, response_obj: aiohttp.ClientResponse): | |
| super().__init__(f'{status}: {endpoint}') | |
| self.endpoint = endpoint | |
| self.status = status | |
| self.response_text = response_text | |
| self.response_obj = response_obj | |
| def __str__(self): | |
| return super().__str__() + '\n\nResponse:\n' + self.response_text | |
| wait_until: datetime | None = None | |
| async def invoke_graph_api( | |
| access_token: str, | |
| endpoint: str, | |
| method: str, | |
| data: t.Optional[str] = None, | |
| raise_on_error: bool = True, | |
| extra_headers: dict[str, str] | None = None, | |
| ): | |
| global wait_until | |
| if not extra_headers: | |
| extra_headers = {} | |
| if wait_until: | |
| secs = (wait_until - datetime.now()).total_seconds() | |
| await asyncio.sleep(secs) | |
| async with session.request( | |
| method, | |
| f'{BASE_URL}/{endpoint}', | |
| headers = { | |
| 'Authorization': f'Bearer {access_token}', | |
| 'Content-Type': 'application/json', | |
| **extra_headers | |
| }, | |
| data = data, | |
| ) as resp: | |
| await resp.read() | |
| if resp.status == 429: | |
| resp_json = await resp.json() | |
| assumed = False | |
| try: | |
| timeout = int(resp.headers['Retry-After']) | |
| except KeyError: | |
| try: | |
| timeout = resp_json['retryAfterSeconds'] | |
| except KeyError: | |
| assumed = True | |
| timeout = 15 | |
| print(f'Rate limited for {'(assumed) ' if assumed else ''}{timeout} seconds...') | |
| wait_until = datetime.now() + timedelta(seconds=timeout) | |
| return await invoke_graph_api(access_token, endpoint, method, data, raise_on_error, extra_headers) | |
| if raise_on_error and resp.status >= 300: | |
| raise HttpException(endpoint, resp.status, await resp.text(), resp) | |
| return resp | |
| async def list_all(token: str, endpoint: str) -> list[t.Any]: | |
| objs = [] | |
| skiptoken = None | |
| while True: | |
| try: | |
| resp = await (await invoke_graph_api( | |
| token, | |
| f'{endpoint}{f'{'&' if '?' in endpoint else '?'}$skiptoken={skiptoken}' if skiptoken else ''}', | |
| 'GET', | |
| )).json() | |
| except HttpException as exc: | |
| # Sometimes things get removed when we're in the middle of accessing | |
| # them and it causes strange errors; just log them and forget about it. | |
| if exc.status == 403: | |
| print(f'Forbidden: {endpoint}') | |
| break | |
| if exc.status == 404: | |
| print(f'Not found: {endpoint}') | |
| break | |
| if exc.status == 400: | |
| print(f'Bad request: {endpoint}, {(await exc.response_obj.json())['error']['message']}') | |
| break | |
| raise | |
| objs += resp['value'] | |
| try: | |
| skiptoken = urllib.parse.parse_qs(resp['@odata.nextLink'])['$skiptoken'][0] | |
| except (KeyError,IndexError): | |
| break | |
| return objs | |
| async def get_sites(token: str): | |
| return await list_all(token, '/sites/?search=*&$select=id,webUrl') | |
| async def get_libraries(token: str, site_id: str): | |
| return await list_all(token, f'/sites/{site_id}/drives?$select=id,webUrl') | |
| async def update_perm(token: str, site_id: str, drive: dict[str, t.Any], folder: dict[str, t.Any] | None): | |
| if folder is None: | |
| folder_comp = 'root' | |
| else: | |
| folder_comp = f'items/{folder['id']}' | |
| for perm in await list_all(token, f'/sites/{site_id}/drives/{drive['id']}/{folder_comp}/permissions'): | |
| if 'inheritedFrom' in perm and perm['inheritedFrom']: | |
| continue | |
| if folder is None: | |
| weburl = drive['webUrl'] | |
| else: | |
| weburl = folder['webUrl'] | |
| for role in perm['roles']: | |
| try: | |
| await csvf.write(f'"{weburl}","{perm['grantedTo']['user']['displayName']}",{role}\n') | |
| except KeyError: | |
| pass | |
| async def get_folders(token: str, site_id: str, drive: dict[str, t.Any], folder: dict[str, t.Any] | None): | |
| if folder is None: | |
| folder_comp = 'root' | |
| else: | |
| folder_comp = f'items/{folder['id']}' | |
| drive_id = drive['id'] | |
| _, lst = await asyncio.gather( | |
| update_perm(token, site_id, drive, folder), | |
| list_all(token, f'/sites/{site_id}/drives/{drive_id}/{folder_comp}/children?$filter=folder ne null&$select=id,webUrl'), | |
| ) | |
| return lst | |
| async def main(): | |
| token = await get_msal_token(TENANT_ID, CLIENT_ID, CLIENT_SECRET) | |
| print('Got token') | |
| sites = await get_sites(token) | |
| print(f'Got {len(sites)} sites') | |
| site_drives = await asyncio.gather(*[ | |
| get_libraries(token, site['id']) | |
| for site in sites | |
| ]) | |
| drives = [ | |
| (site['id'], drive) | |
| for site,drives in zip(sites, site_drives) | |
| for drive in drives | |
| ] | |
| print(f'Got {len(drives)} drives') | |
| folders = [ | |
| (site_id, drive, None) | |
| for site_id, drive in drives | |
| ] | |
| i = 0 | |
| while len(folders) > 0: | |
| subfolders = await asyncio.gather(*[ | |
| get_folders(token, site_id, drive, folder) | |
| for site_id, drive, folder in folders | |
| ]) | |
| folders = [ | |
| (site_id, drive, folder) | |
| for (site_id, drive), folders in zip(drives, subfolders) | |
| for folder in folders | |
| ] | |
| print(f'Folder level {i} : Got {len(folders)} folders across {len(set(drive['id'] for _, drive, _ in folders))} drives') | |
| i += 1 | |
| async def _main(): | |
| global csvf | |
| global session | |
| async with aiofiles.open('perms.csv', 'w') as csvf, aiohttp.ClientSession() as session: | |
| await csvf.write('URL,User/Group,Permission\n') | |
| return await main() | |
| if __name__ == '__main__': | |
| asyncio.run(_main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment