Skip to content

Instantly share code, notes, and snippets.

@ohong
Created November 13, 2019 01:23
Show Gist options
  • Select an option

  • Save ohong/660930c5c003c68bd88b361fcdb90b73 to your computer and use it in GitHub Desktop.

Select an option

Save ohong/660930c5c003c68bd88b361fcdb90b73 to your computer and use it in GitHub Desktop.
import json
import requests
import time
try:
from urllib.parse import urljoin, urlencode
except ImportError:
from urllib import urlencode
from urlparse import urljoin
ALLOWED_METHODS = {'post'}
DEFAULT_TIMEOUT = 600 # 10 minutes
try:
from json.decoder import JSONDecodeError
except ImportError:
# json parsing throws a ValueError in python2
JSONDecodeError = ValueError
def _requests_http_request(url, method, data, headers, timeout=DEFAULT_TIMEOUT):
normalized_method = method.lower()
if normalized_method in ALLOWED_METHODS:
return getattr(requests, normalized_method)(
url,
json=data,
headers=headers,
timeout=timeout,
)
else:
raise Exception(
'Invalid request method {}'.format(method)
)
def item_remove(access_token):
data = {
'client_id': 'clientid',
'secret': 'secret',
'access_token': access_token,
}
headers = {'Plaid-Version': '2018-05-22'}
method = 'POST'
response = _requests_http_request(
urljoin('https://production.plaid.com', '/item/remove'),
method,
data or {},
headers or {},
DEFAULT_TIMEOUT)
try:
response_body = json.loads(response.text)
except JSONDecodeError:
print(
'error_message: ' + response.text +
' error_type: ' + 'API_ERROR' +
' error_code: ' + 'INTERNAL_SERVER_ERROR'
)
return response_body
access_tokens = open('access_tokens.txt')
results = {}
for access_token in access_tokens:
response = item_remove(access_token)
time.sleep(1)
results[access_token] = response
filename = "results.json"
f= open(filename,"w+")
f.write(json.dumps(results))
f.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment