Skip to content

Instantly share code, notes, and snippets.

@maxcutlyp
Created November 26, 2024 08:58
Show Gist options
  • Select an option

  • Save maxcutlyp/dbea4e3e52658200c98c1c7535503ae8 to your computer and use it in GitHub Desktop.

Select an option

Save maxcutlyp/dbea4e3e52658200c98c1c7535503ae8 to your computer and use it in GitHub Desktop.
SharePoint Permissions Audit
'''
# SharePoint permissions audit
Creates perms.csv, designed to be pivoted in Excel, containing all unique
(non-inherited) permissions on all SharePoint folders across all sites in
the tenant. Pivot on the URL field to get all users/groups with permission
to a given folder, or pivot on the User/Group field to get all the folders
a given user/group has permissions on.
You'll need to create an App Registration - follow this guide:
https://learn.microsoft.com/en-us/entra/identity-platform/quickstart-register-app?tabs=client-secret
Add the permission Sites.FullControl.All and grant admin consent. Copy the tenant
ID, client ID, create a secret and copy its value. Add them as environment
variables (see below).
It will generate some edge-case errors, for example with hidden sites or
folders that get deleted between indexing and fetching. The assumption is
that these won't have any permissions that you care about anyway, so they
are logged and ignored.
'''
import asyncio
import msal
import typing as t
import aiohttp
import aiofiles
import os
import urllib.parse
from datetime import datetime, timedelta
try:
CLIENT_ID = os.environ['CLIENT_ID']
CLIENT_SECRET = os.environ['CLIENT_SECRET']
TENANT_ID = os.environ['TENANT_ID']
except KeyError:
print('Please set environment variables CLIENT_ID, CLIENT_SECRET, and TENANT_ID.')
print('Application must have Sites.FullControl.All permission.')
exit(1)
SCOPES = ['https://graph.microsoft.com/.default']
BASE_URL = 'https://graph.microsoft.com/v1.0'
async def get_msal_token(tenant_id: str, client_id: str, client_secret: str) -> str:
app = msal.ConfidentialClientApplication(
client_id,
authority=f'https://login.microsoftonline.com/{tenant_id}',
client_credential=client_secret,
)
result = app.acquire_token_for_client(scopes=SCOPES)
assert result is not None
try:
return result['access_token']
except KeyError:
raise Exception(
'Couldn\'t get access_token from Azure AD:\n' +
f'{result.get("error")}\n' +
f'{result.get("error_description")}\n' +
f'Correlation ID: {result.get("correlation_id")}'
)
class HttpException(Exception):
def __init__(self, endpoint: str, status: int, response_text: str, response_obj: aiohttp.ClientResponse):
super().__init__(f'{status}: {endpoint}')
self.endpoint = endpoint
self.status = status
self.response_text = response_text
self.response_obj = response_obj
def __str__(self):
return super().__str__() + '\n\nResponse:\n' + self.response_text
wait_until: datetime | None = None
async def invoke_graph_api(
access_token: str,
endpoint: str,
method: str,
data: t.Optional[str] = None,
raise_on_error: bool = True,
extra_headers: dict[str, str] | None = None,
):
global wait_until
if not extra_headers:
extra_headers = {}
if wait_until:
secs = (wait_until - datetime.now()).total_seconds()
await asyncio.sleep(secs)
async with session.request(
method,
f'{BASE_URL}/{endpoint}',
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json',
**extra_headers
},
data = data,
) as resp:
await resp.read()
if resp.status == 429:
resp_json = await resp.json()
assumed = False
try:
timeout = int(resp.headers['Retry-After'])
except KeyError:
try:
timeout = resp_json['retryAfterSeconds']
except KeyError:
assumed = True
timeout = 15
print(f'Rate limited for {'(assumed) ' if assumed else ''}{timeout} seconds...')
wait_until = datetime.now() + timedelta(seconds=timeout)
return await invoke_graph_api(access_token, endpoint, method, data, raise_on_error, extra_headers)
if raise_on_error and resp.status >= 300:
raise HttpException(endpoint, resp.status, await resp.text(), resp)
return resp
async def list_all(token: str, endpoint: str) -> list[t.Any]:
objs = []
skiptoken = None
while True:
try:
resp = await (await invoke_graph_api(
token,
f'{endpoint}{f'{'&' if '?' in endpoint else '?'}$skiptoken={skiptoken}' if skiptoken else ''}',
'GET',
)).json()
except HttpException as exc:
# Sometimes things get removed when we're in the middle of accessing
# them and it causes strange errors; just log them and forget about it.
if exc.status == 403:
print(f'Forbidden: {endpoint}')
break
if exc.status == 404:
print(f'Not found: {endpoint}')
break
if exc.status == 400:
print(f'Bad request: {endpoint}, {(await exc.response_obj.json())['error']['message']}')
break
raise
objs += resp['value']
try:
skiptoken = urllib.parse.parse_qs(resp['@odata.nextLink'])['$skiptoken'][0]
except (KeyError,IndexError):
break
return objs
async def get_sites(token: str):
return await list_all(token, '/sites/?search=*&$select=id,webUrl')
async def get_libraries(token: str, site_id: str):
return await list_all(token, f'/sites/{site_id}/drives?$select=id,webUrl')
async def update_perm(token: str, site_id: str, drive: dict[str, t.Any], folder: dict[str, t.Any] | None):
if folder is None:
folder_comp = 'root'
else:
folder_comp = f'items/{folder['id']}'
for perm in await list_all(token, f'/sites/{site_id}/drives/{drive['id']}/{folder_comp}/permissions'):
if 'inheritedFrom' in perm and perm['inheritedFrom']:
continue
if folder is None:
weburl = drive['webUrl']
else:
weburl = folder['webUrl']
for role in perm['roles']:
try:
await csvf.write(f'"{weburl}","{perm['grantedTo']['user']['displayName']}",{role}\n')
except KeyError:
pass
async def get_folders(token: str, site_id: str, drive: dict[str, t.Any], folder: dict[str, t.Any] | None):
if folder is None:
folder_comp = 'root'
else:
folder_comp = f'items/{folder['id']}'
drive_id = drive['id']
_, lst = await asyncio.gather(
update_perm(token, site_id, drive, folder),
list_all(token, f'/sites/{site_id}/drives/{drive_id}/{folder_comp}/children?$filter=folder ne null&$select=id,webUrl'),
)
return lst
async def main():
token = await get_msal_token(TENANT_ID, CLIENT_ID, CLIENT_SECRET)
print('Got token')
sites = await get_sites(token)
print(f'Got {len(sites)} sites')
site_drives = await asyncio.gather(*[
get_libraries(token, site['id'])
for site in sites
])
drives = [
(site['id'], drive)
for site,drives in zip(sites, site_drives)
for drive in drives
]
print(f'Got {len(drives)} drives')
folders = [
(site_id, drive, None)
for site_id, drive in drives
]
i = 0
while len(folders) > 0:
subfolders = await asyncio.gather(*[
get_folders(token, site_id, drive, folder)
for site_id, drive, folder in folders
])
folders = [
(site_id, drive, folder)
for (site_id, drive), folders in zip(drives, subfolders)
for folder in folders
]
print(f'Folder level {i} : Got {len(folders)} folders across {len(set(drive['id'] for _, drive, _ in folders))} drives')
i += 1
async def _main():
global csvf
global session
async with aiofiles.open('perms.csv', 'w') as csvf, aiohttp.ClientSession() as session:
await csvf.write('URL,User/Group,Permission\n')
return await main()
if __name__ == '__main__':
asyncio.run(_main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment