Skip to content

Instantly share code, notes, and snippets.

@delannoy
Last active May 26, 2021 15:29
Show Gist options
  • Select an option

  • Save delannoy/91580a3f2a3c57a47c9143a86b3cb8bd to your computer and use it in GitHub Desktop.

Select an option

Save delannoy/91580a3f2a3c57a47c9143a86b3cb8bd to your computer and use it in GitHub Desktop.
Pandas wrapper for slack API
#!/usr/bin/env python3
import json, os, pandas, requests, sys, typing
def getToken(jsonFilePath:str) -> str:
'''Read "token" key from json file'''
try:
with open(jsonFilePath, 'r') as jsonFile: token = json.load(jsonFile).get('token')
if token == 'api_key': raise Exception
except:
with open(jsonFilePath, 'w') as jsonFile: jsonFile.write('{\n"token": "api_key"\n}')
print(f"Please specify 'api key/token' in '{jsonFilePath}'")
print(f"A Slack API key can be obtained from: [https://api.slack.com/apps]")
sys.exit()
return token
def getRequest(method:str, limit:int=1000, **kwargs) -> typing.Dict[str,typing.Any]:
'''Query API with given "method" and pass any "kwargs" to the request "params"'''
token = getToken('token.json')
url = f"https://slack.com/api/{method}"
params = {'token':token, 'limit':limit, **kwargs}
return requests.get(url=url, params=params).json()
def flattenCol(dfCol:pandas.Series, prefix:str=None) -> pandas.DataFrame:
'''flatten {dfCol} recursively and prepend {prefix} to {dfCol.name} (series/column name)'''
def fillNan(dfCol:pandas.Series, obj:typing.Union[typing.List,typing.Dict]) -> pandas.Series:
'''replace 'NaN' values with {obj} [https://stackoverflow.com/a/62689667/13019084]'''
return dfCol.fillna({i:obj for i in dfCol.index})
def concatFlatCols(df:pandas.DataFrame) -> pandas.DataFrame:
'''apply the parent function, 'flattenCol', to all columns in {df} and concatentate the result'''
return pandas.concat([flattenCol(df[col]) for col in df], axis=1) # if isinstance(df, pandas.DataFrame) else df
if any(isinstance(row, list) for row in dfCol):
'''if {dfCol} contains any 'list' entries, fill any 'NaN' values with an empty list,
flatten via 'pandas.Series.values', and prepend {dfCol.name} to each child's column name
[https://stackoverflow.com/a/44821357/13019084]'''
dfCol = fillNan(dfCol, [])
listDF = pandas.concat([pandas.DataFrame(dfCol.values.tolist()).add_prefix(f'{dfCol.name}_')], axis=1)
return concatFlatCols(listDF) if not listDF.empty else dfCol.where(dfCol.str.len() > 0, None) # [https://stackoverflow.com/a/44825004/13019084]
elif any(isinstance(row, dict) for row in dfCol):
'''if {dfCol} contains any 'dict' entries, fill any 'NaN' values with empty dict,
flatten via 'pandas.json_normalize', and prepend {dfCol.name} to each child's column name'''
dfCol = fillNan(dfCol, {})
if all(dfCol.str.len() == 0): dfCol = dfCol.where(dfCol.str.len() > 0, None)
dictDF = pandas.json_normalize(dfCol).add_prefix(f'{dfCol.name}_')
try: return concatFlatCols(dictDF) if not dictDF.empty else dfCol.where(dfCol.str.len() > 0, None)
except: return dictDF
else:
'''try to return {dfCol} as float64 or int64 depending on the data supplied
and prepend {prefix} to the column name
Note that an integer array will be cast as float if any elements are NaN
[https://pandas.pydata.org/pandas-docs/stable/user_guide/gotchas.html#support-for-integer-na]'''
dfCol = dfCol.apply(pandas.to_numeric, errors='ignore')
return dfCol.rename(f'{prefix}_{dfCol.name}') if prefix else dfCol
def usersList() -> pandas.DataFrame:
# [https://api.slack.com/methods/users.list]
resp = getRequest('users.list')
users = {user.get('id'):user.get('name') for user in resp.get('members')}
return pandas.DataFrame(resp.get('members'))
def channelsList() -> pandas.DataFrame:
# [https://api.slack.com/methods/conversations.list]
resp = getRequest('conversations.list')
ch = pandas.DataFrame(resp.get('channels'))
return pandas.concat([flattenCol(ch[col]) for col in ch.columns], axis=1)
def filesList(downloadFiles:bool=True) -> pandas.DataFrame:
# [https://api.slack.com/methods/files.list]
resp = getRequest("files.list") #, show_files_hidden_by_limit=True)
files = pandas.DataFrame(resp.get('files'))
files = pandas.concat([flattenCol(files[col]) for col in files.columns], axis=1)
# fileIDs = [file.get('id') for file in resp.get('files')]
# fileNames = [file.get('name') for file in resp.get('files')]
# fileURLs = [file.get('url_private') for file in resp.get('files')]
if downloadFiles:
import os, pathlib, time
userAgent = os.environ.get('USERAGENT')
cookieFile = 'slack.cookie'
for fileID,fileName,fileURL in zip(files.id, files.name, files.url_private): # zip(fileIDs,fileNames,fileURLs):
outFile = pathlib.Path.home().joinpath('cmsbril').joinpath(f'{fileID}.{fileName}')
wgetFlags = f'--quiet --show-progress --no-glob --no-check-certificate --trust-server-names --user-agent="{userAgent}" --load-cookies={cookieFile}'
_ = os.system(f'wget {wgetFlags} --output-document={outFile} {fileURL}')
# outDir = pathlib.Path.home().joinpath('cmsbril')
# _ = os.system(f'wget {wgetFlags} --output-document={outFile} {fileURL}') # --output-document will force overwrite if filename clobber
# _ = os.system(f'wget {wgetFlags} --directory-prefix={outDir} {fileURL}')
time.sleep(2)
return pandas.DataFrame(resp.get('files'))
def imList(user:str) -> str:
# [https://api.slack.com/methods/conversations.list]
users = pandas.DataFrame(getRequest('users.list').get('members'))
uID = users[users.name==user].id.iloc[0]
conversations = pandas.DataFrame(getRequest('users.conversations', types='im,mpim').get('channels'))
return conversations[conversations.user==uID].id.iloc[0]
def imHistory(user:str) -> pandas.DataFrame:
# [https://api.slack.com/methods/conversations.history]
imID = imList(user)
resp = getRequest('conversations.history', channel=imID)
return pandas.DataFrame(resp.get('messages'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment