-
-
Save foxwill/6578e2fe9f25e4270b94303c8e8225e5 to your computer and use it in GitHub Desktop.
| import requests | |
| class OneLogin(object): | |
| def __init__(self, shard='US'): | |
| """ | |
| Specify the shard of the system being used (us or eu) | |
| :param shard: us or eu | |
| :return: | |
| """ | |
| self.shard = shard.lower() | |
| if 'us' in self.shard: | |
| self.base_url = 'https://api.us.onelogin.com' | |
| elif 'eu' in self.shard: | |
| self.base_url = 'https://api.eu.onelogin.com' | |
| elif 'shadow01' in self.shard: | |
| self.base_url = 'https://oapi01-shadow01.use1.onlgn.net' | |
| def set_attributes(self, kwargs): | |
| for kwarg_key, kwarg_value in kwargs.iteritems(): | |
| setattr(self, kwarg_key, kwarg_value) | |
| def handle_error(self, **kwargs): | |
| error = {} | |
| for k,v in kwargs.iteritems(): | |
| error[k] = v | |
| return error | |
| class Token(OneLogin): | |
| """ | |
| Create the token object to be used for calling the OneLogin API. | |
| """ | |
| def __init__(token, **kwargs): | |
| for key in ('client_id', 'client_secret', 'shard'): | |
| if key in kwargs: | |
| setattr(token, key, kwargs[key]) | |
| token.session = requests.session() | |
| token.session.headers = {'Content-Type': 'application/json'} | |
| oauth_endpoint = '/auth/oauth2' | |
| try: | |
| OneLogin.__init__(token, token.shard) | |
| except: | |
| if token.client_id == '0': | |
| raise ValueError('Client_ID is required') | |
| elif token.client_secret == '0': | |
| raise ValueError('Client_Secret is required') | |
| token.target = token.base_url + oauth_endpoint | |
| token.get_token() | |
| def get_token(token): | |
| """ | |
| Get a new OAUTH token | |
| :return: | |
| """ | |
| authorization = 'client_id: %s, client_secret: %s' % (token.client_id, | |
| token.client_secret) | |
| token.session.headers.update({'Authorization':authorization}) | |
| r = token.session.post(token.target + '/token', verify=False, | |
| json={'grant_type':'client_credentials'}) | |
| if r.status_code != 200: | |
| print token.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': token.target + '/token', | |
| 'headers':token.session.headers}) | |
| return False | |
| else: | |
| token.set_attributes({ | |
| 'access_token':r.json()['data'][0]['access_token'], | |
| 'refresh_token':r.json()['data'][0]['refresh_token'], | |
| 'token_created_at':r.json()['data'][0]['created_at'], | |
| 'token_expires_at':r.json()['data'][0]['expires_in']}) | |
| return True | |
| def refresh_the_token(token): | |
| """ | |
| Refresh the current OAUTH token | |
| :return: | |
| """ | |
| r = token.session.post(token.target + '/token', verify=False, | |
| json={ | |
| 'grant_type':'refresh_token', | |
| 'refresh_token':token.refresh_token, | |
| 'access_token':token.access_token}) | |
| if r.status_code != 200: | |
| print token.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': token.target + '/token', | |
| 'headers':token.session.headers}) | |
| return False | |
| else: | |
| token.set_attributes({ | |
| 'access_token':r.json()['data'][0]['access_token'], | |
| 'refresh_token':r.json()['data'][0]['refresh_token'], | |
| 'created_at':r.json()['data'][0]['created_at'], | |
| 'expires_in':r.json()['data'][0]['expires_in'] | |
| }) | |
| return True | |
| def revoke_the_token(token): | |
| """ | |
| Revoke the current OAUTH token | |
| :return: | |
| """ | |
| r = token.session.post(token.target + '/revoke', verify=False, | |
| json={ | |
| 'grant_type':'revoke_token', | |
| 'access_token':token.access_token, | |
| 'client_id':token.client_id, | |
| 'client_secret':token.client_secret | |
| }) | |
| if r.status_code != 200: | |
| print token.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': token.target + '/revoke', | |
| 'headers':token.session.headers}) | |
| return False | |
| else: | |
| return True | |
| def check_token_expiration(self): | |
| """ | |
| TODO: Calculate expiration time of token, if expired, call refresh_token | |
| to update access_token | |
| :return: | |
| """ | |
| def check_rate_limit(token): | |
| """ | |
| check rate limit | |
| :return: | |
| """ | |
| if token.access_token: | |
| authorization = 'Bearer:%s' % token.access_token | |
| token.session.headers.update({'Authorization':authorization}) | |
| else: | |
| return 'Access Token not found' | |
| r = token.session.get(token.base_url + '/auth/rate_limit', verify=False) | |
| if r.status_code != 200: | |
| print token.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': token.target + '/revoke', | |
| 'headers':token.session.headers}) | |
| return False | |
| else: | |
| return r | |
| class User(Token): | |
| """ | |
| """ | |
| def __init__(user, token): | |
| """ | |
| :param kwargs: | |
| :return: | |
| """ | |
| # user.set_attributes(kwargs) | |
| user.session = requests.session() | |
| user.session.headers = {'Content-Type': 'application/json'} | |
| user.user_endpoint = '/api/1/users' | |
| try: | |
| user.base_url = token.base_url | |
| user.session.headers.update({'Authorization': 'Bearer:%s' % token.access_token}) | |
| except: | |
| raise ValueError('Token not found, have you initialized the Token yet?') | |
| # try: | |
| # Token.__init__(user, **{'client_id':user.client_id, | |
| # 'client_secret':user.client_secret, | |
| # 'shard':user.shard}) | |
| # except: | |
| # if user.client_id == '0': | |
| # raise ValueError('client_id is required') | |
| # elif user.client_secret == '0': | |
| # raise ValueError('client_secret is required') | |
| # try: | |
| # user.session.headers.update({'Authorization': 'Bearer:%s' % user.access_token}) | |
| # except: | |
| # raise ValueError('invalid access_token, please check your client_id' | |
| # ' and client_secret, did you specify the shard') | |
| def get_all_users(user, sort=False, fields=''): | |
| """ | |
| Get all users, specify sort and fields to filter results | |
| :param sort: Sort results by ID, use 1 to sort asc, 2 for desc, default is no sort | |
| :param fields: specify fields to include in result 'lastname, firstname, email' | |
| :return: Dictionary of user's with each page of response corresponding to key, | |
| 1,2,etc | |
| """ | |
| count = 0 | |
| next_page = 1 | |
| response = {} | |
| original_user_base_url = user.base_url | |
| while next_page != 0: | |
| if sort == 0: | |
| r = user.session.get(user.base_url + user.user_endpoint + ('?' if '?' not in user.user_endpoint else '&') + | |
| '&fields=%s' % str(fields), verify=False) | |
| elif sort == 1: | |
| r = user.session.get(user.base_url + user.user_endpoint + ('?' if '?' not in user.user_endpoint else '&') + | |
| 'sort=id&fields=%s' % str(fields), verify=False) | |
| else: | |
| r = user.session.get(user.base_url + user.user_endpoint + ('?' if '?' not in user.user_endpoint else '&') + | |
| 'sort=-id&fields=%s' % str(fields), verify=False) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.base_url + user.user_endpoint, | |
| 'headers':user.session.headers}) | |
| next_page == 0 | |
| return False | |
| else: | |
| if r.json()['pagination']['next_link'] == None: | |
| next_page == 0 | |
| response[count] = r.json() | |
| return response | |
| else: | |
| user.user_endpoint = original_user_base_url + '?after_cursor=' + r.json()['pagination']['after_cursor'] | |
| response[count] = r.json() | |
| count += 1 | |
| def get_user_by_id(user, id): | |
| r = user.session.get(user.base_url + user.user_endpoint + '/%s' % str(id), verify=False) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.target + '/token', | |
| 'headers':user.session.headers}) | |
| return False | |
| else: | |
| return r.json() | |
| def get_apps_for_user(user, id): | |
| r = user.session.get(user.base_url + user.user_endpoint + '/%s/apps' % str(id), verify=False) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.base_url + user.user_endpoint, | |
| 'headers':user.session.headers}) | |
| exit() | |
| else: | |
| return r.json() | |
| def get_roles_for_user(user, id): | |
| r = user.session.get(user.base_url + user.user_endpoint + '/%s/roles' % str(id), verify=False) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.base_url + user.user_endpoint, | |
| 'headers':user.session.headers}) | |
| exit() | |
| else: | |
| return r.json() | |
| def get_custom_attributes(user): | |
| r = user.session.get(user.base_url + user.user_endpoint + '/custom_attributes', verify=False) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.base_url + user.user_endpoint, | |
| 'headers':user.session.headers}) | |
| exit() | |
| else: | |
| return r.json() | |
| def user_exists(user,email): | |
| r = user.session.get(user.base_url + user.user_endpoint + '?email=%s&fields=id,email,username' % email) | |
| if 'email' not in r.content: | |
| return False | |
| else: | |
| return True | |
| def search_users(user, | |
| s_field='', | |
| s_string='', | |
| sort=False, | |
| r_sort=False, | |
| since='', | |
| until='', | |
| fields=''): | |
| """ | |
| Search for Users | |
| :param s_field: search field (ex. email) | |
| :param s_string: search query (ex. *@onelogin.com) | |
| :param sort: Enable sorting by ID (True/False) | |
| :param r_sort: Sort by ID in reverse (True/False) | |
| :param since: Example: 2016-01-01T00:00:00.001Z | |
| :param until: Example: 2016-03-01T00:00:00.001Z | |
| :param fields: fields in response (Ex. firstname, email, username) | |
| :return: | |
| """ | |
| count = 0 | |
| next_page = True | |
| response = {} | |
| original_user_base_url = user.base_url | |
| while next_page: | |
| if not sort and not r_sort: | |
| search_terms = str(s_field) + '=' + str(s_string) + '&fields=' + str(fields) + '&since=' + str(since) + '&until=' + str(until) | |
| r = user.session.get((user.base_url + user.user_endpoint + ('?' if '?' not in user.user_endpoint else '&') + search_terms), verify=False) | |
| elif sort and not r_sort: | |
| search_terms = 'sort=id&' + str(s_field) + '=' + str(s_string) + \ | |
| '&fields=' + str(fields) + '&since=' + str(since) + \ | |
| '&until=' + str(until) | |
| r = user.session.get(user.base_url + | |
| user.user_endpoint + | |
| ('?' if '?' not in user.user_endpoint else '&') + | |
| str(search_terms)) | |
| elif sort and r_sort: | |
| search_terms = ('sort=-id&' + str(s_field) + '=' + str(s_string) + | |
| '&fields=' + str(fields) + '&since=' + str(since) + | |
| '&until=' + str(until)) | |
| r = user.session.get(user.base_url + | |
| user.user_endpoint + | |
| ('?' if '?' not in user.user_endpoint else '&') + | |
| str(search_terms)) | |
| elif not sort and r_sort: | |
| search_terms = 'sort=-id&' + str(s_field) + '=' + str(s_string) + \ | |
| '&fields=' + str(fields) + '&since=' + str(since) + \ | |
| '&until=' + str(until) | |
| r = user.session.get(user.base_url + | |
| user.user_endpoint + | |
| ('?' if '?' not in user.user_endpoint else '&') + | |
| str(search_terms)) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.base_url + user.user_endpoint, | |
| 'headers':user.session.headers}) | |
| return False | |
| else: | |
| if r.json()['pagination']['next_link'] == None: | |
| user.next_page = False | |
| response[count] = r.json() | |
| return response | |
| else: | |
| user.user_endpoint = original_user_base_url + '?after_cursor=' + r.json()['pagination']['after_cursor'] | |
| print "..searching....page = {0}".format(count) | |
| response[count] = r.json() | |
| count += 1 | |
| def create_user(user, **kwargs): | |
| """ | |
| Create a user via the API, will not update custom attributes (use | |
| set_user_custom_attributes) | |
| :param kwargs: required: firstname, lastname, email, username -- optional: | |
| directory_id, distinguished_name, external_id, group_id, invalid_login_attempts, | |
| locale_code, manager_ad_id, member_of, notes, openid_name, phone, samaccountname, | |
| userprincipalname | |
| :return: | |
| """ | |
| if 'firstname' not in kwargs: | |
| raise ValueError('firstname value is required to create user') | |
| if 'lastname' not in kwargs: | |
| raise ValueError('lastname is required to create user') | |
| if 'username' not in kwargs: | |
| raise ValueError('username is required to create user') | |
| if 'email' not in kwargs: | |
| raise ValueError('email is required to create user') | |
| payload = {} | |
| for k,v in kwargs.iteritems(): | |
| payload[k] = v | |
| r = user.session.post(user.base_url + user.user_endpoint, json=payload) | |
| if r.status_code not in (200,201): | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.base_url + user.user_endpoint, | |
| 'headers':user.session.headers, | |
| 'payload':payload}) | |
| return False | |
| else: | |
| return r.json() | |
| def create_session_login_token(user, username_or_email, password, subdomain, return_to_url='', ip_address='', browser_id=''): | |
| """ | |
| Create login onelogin_session token for use with other applications and user login | |
| :param username_or_email: testuser@acme.com | |
| :param password: 12345 | |
| :param subdomain: testcorp | |
| :param return_to_url: //NOT IMPLEMENTED// | |
| :param ip_address: //NOT IMPLEMENTED// | |
| :param browser_id: //NOT IMPLEMENTED// | |
| :return: dictionary that contains session_token | |
| """ | |
| user.session_endpoint = '/api/1/login/auth' | |
| payload = { | |
| 'username_or_email':username_or_email, | |
| 'password':password, | |
| 'subdomain':subdomain | |
| } | |
| r = user.session.post(user.base_url + user.session_endpoint, json=payload) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.base_url + user.session_endpoint, | |
| 'headers':user.session.headers, | |
| 'payload':payload}) | |
| return False | |
| else: | |
| return r.json() | |
| def update_user(user, id, **kwargs): | |
| """ | |
| :param id: | |
| :param kwargs: | |
| :return: | |
| """ | |
| payload = {} | |
| user.update_user_endpoint = '/api/1/users/%s' % id | |
| for k,v in kwargs.iteritems(): | |
| payload[k] = v | |
| r = user.session.put(user.base_url + user.update_user_endpoint, json=payload) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.base_url + user.update_user_endpoint, | |
| 'headers':user.session.headers, | |
| 'payload':payload}) | |
| return False | |
| else: | |
| return r.json() | |
| def add_roles_to_user(user, id, role_id_array): | |
| """ | |
| :param id: | |
| :param role_id_array: | |
| :return: | |
| """ | |
| user.add_roles_endpoint = '/api/1/users/%s/add_roles' % id | |
| payload = { | |
| 'role_id_array':role_id_array | |
| } | |
| r = user.session.put(user.base_url + user.add_roles_endpoint, json=payload) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.base_url + user.add_roles_endpoint, | |
| 'headers':user.session.headers, | |
| 'payload':payload}) | |
| return False | |
| else: | |
| return r.json() | |
| def remove_roles_from_user(user, id, role_id_array): | |
| """ | |
| :param id: | |
| :param role_id_array: | |
| :return: | |
| """ | |
| user.remove_roles_endpoint = '/api/1/users/%s/remove_roles' % id | |
| payload = { | |
| 'role_id_array':role_id_array | |
| } | |
| r = user.session.put(user.base_url + user.remove_roles_endpoint, json=payload) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.base_url + user.add_roles_endpoint, | |
| 'headers':user.session.headers, | |
| 'payload':payload}) | |
| return False | |
| else: | |
| return r.json() | |
| def set_password_with_cleartext(user, id, password, password_confirmation): | |
| """ | |
| :param id: | |
| :param password: | |
| :param confirmation: | |
| :return: | |
| """ | |
| user.set_clear_password_endpoint = '/api/1/users/set_password_clear_text/%s' % id | |
| payload = { | |
| 'password':password, | |
| 'password_confirmation':password_confirmation | |
| } | |
| r = user.session.put(user.base_url + user.set_clear_password_endpoint, json=payload) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.target + '/token', | |
| 'headers':user.session.headers, | |
| 'payload':payload}) | |
| return False | |
| else: | |
| return r.json() | |
| def set_password_with_salt_sha256(user, id, password, password_confirmation, | |
| password_salt, password_algorithm='salt+sha256'): | |
| user.set_clear_password_endpoint = '/api/1/users/set_password_using_salt/%s' % id | |
| payload = { | |
| 'password':password, | |
| 'password_confirmation':password_confirmation, | |
| 'password_algorithm':password_algorithm, | |
| 'password_salt':password_salt | |
| } | |
| r = user.session.put(user.base_url + user.set_clear_password_endpoint, json=payload) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.target + '/token', | |
| 'headers':user.session.headers, | |
| 'payload':payload}) | |
| return False | |
| else: | |
| return r.json() | |
| def set_user_custom_attributes(user, id, **kwargs): | |
| """ | |
| :param id: | |
| :param kwargs: | |
| :return: | |
| """ | |
| user.update_attributes_endpoint = '/api/1/users/%s/set_custom_attributes' % id | |
| payload = {} | |
| for k,v in kwargs.iteritems(): | |
| payload[k] = v | |
| r = user.session.put(user.base_url + user.update_attributes_endpoint, json=payload) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.target + '/token', | |
| 'headers':user.session.headers, | |
| 'payload':payload}) | |
| return False | |
| else: | |
| return r.json() | |
| def log_user_out(user, id): | |
| """ | |
| :return: | |
| """ | |
| user.log_user_out_endpoint = '/api/1/users/%s/logout' % id | |
| r = user.session.put(user.base_url + user.log_user_out_endpoint) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.target + '/token', | |
| 'headers':user.session.headers, | |
| 'payload':payload}) | |
| return False | |
| else: | |
| return r.json() | |
| def lock_user(user, id, locked_until=0): | |
| """ | |
| :param id: | |
| :param locked_until: | |
| :return: | |
| """ | |
| user.lock_user_endpoint = '/api/1/users/%s/lock_user' % id | |
| r = user.session.put(user.base_url + user.lock_user_endpoint, | |
| json={'locked_until':locked_until}) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.target + '/token', | |
| 'headers':user.session.headers, | |
| 'payload':payload}) | |
| return False | |
| else: | |
| return r.json() | |
| def delete_user(user, id): | |
| """ | |
| Delete an existing user | |
| :return: | |
| """ | |
| user.user_delete_endpoint = '/api/1/users/%s' % id | |
| r = user.session.delete(user.base_url + user.user_delete_endpoint) | |
| if r.status_code != 200: | |
| print user.handle_error(**{'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': user.target + '/token', | |
| 'headers':user.session.headers, | |
| 'payload':payload}) | |
| return False | |
| else: | |
| return r.json() | |
| class Role(Token): | |
| """ | |
| TODO: Call Role API to list, create, destory, update, and search Roles | |
| """ | |
| def __init__(role, token): | |
| """ | |
| TODO: Initialize the Role Object | |
| :return: | |
| """ | |
| role.session = requests.session() | |
| role.session.headers = {'Content-Type': 'application/json'} | |
| role.roles_endpoint = '/api/1/roles' | |
| try: | |
| role.base_url = token.base_url | |
| role.session.headers.update({'Authorization': 'Bearer:%s' % token.access_token}) | |
| except: | |
| raise ValueError('Token not found, have you initialized the Token yet?') | |
| def get_roles(role, id=0, name=0): | |
| """ | |
| :return: | |
| """ | |
| r = role.session.get(role.base_url + role.roles_endpoint + '?%s%s' % | |
| ('&id=' + str(id),'&name=' + str(name))) | |
| if r.status_code != 200: | |
| print role.handle_error(**{ | |
| 'status_code':r.status_code, | |
| 'message_body':r.text, | |
| 'url': role.base_url + | |
| role.roles_endpoint + | |
| '?%s%s' % ('&id=' + str(id), '&name=' + str(name)), | |
| 'headers':role.session.headers}) | |
| exit() | |
| else: | |
| return r.json() | |
| class Group(Token): | |
| """ | |
| TODO: Call Group API to list, create, destory, and search roles | |
| """ | |
| def __init__(group, token): | |
| """ | |
| TODO: Initialize the Group Object | |
| :return: | |
| """ | |
| group.session = requests.session() | |
| group.session.headers = {'Content-Type': 'application/json'} | |
| group.group_endpoint = '/api/1/groups' | |
| try: | |
| group.base_url = token.base_url | |
| group.session.headers.update({'Authorization': 'Bearer:%s' % token.access_token}) | |
| except: | |
| raise ValueError('Token not found, have you initialized the Token yet?') | |
| def create_group(self): | |
| """ | |
| TODO: Create a new Group | |
| :return: | |
| """ | |
| def update_group(self): | |
| """ | |
| TODO: Update an existing Group | |
| :return: | |
| """ | |
| def delete_group(self): | |
| """ | |
| TODO: Delete an existing Group | |
| :return: | |
| """ | |
| def search_group(self): | |
| """ | |
| TODO: Search for an existing Group | |
| """ | |
| class Events(Token): | |
| """ | |
| TODO: Events | |
| """ | |
| def __init__(events, token): | |
| events.session = requests.session() | |
| events.session.headers = {'Content-Type': 'application/json'} | |
| events.events_endpoint = '/api/1/events' | |
| try: | |
| events.base_url = token.base_url | |
| events.session.headers.update({'Authorization': 'Bearer:%s' % token.access_token}) | |
| except: | |
| raise ValueError('Token not found, have you initialized the Token yet?') | |
| def get_events(events, query=''): | |
| """ | |
| :param query: | |
| :return: | |
| """ | |
| events.query = query[1:] if query.startswith("?") or query.startswith("&") else query | |
| count = 0 | |
| response = {} | |
| events.next_page = True | |
| original_event_base_url = events.events_endpoint | |
| while events.next_page: | |
| r = events.session.get(events.base_url + events.events_endpoint + ('?' if '?' not in events.events_endpoint else '&') + events.query, verify=False) | |
| if r.json()['pagination']['next_link'] == None: | |
| events.next_page = False | |
| response[count] = r.json() | |
| return response | |
| else: | |
| events.events_endpoint = original_event_base_url + '?after_cursor=' + r.json()['pagination']['after_cursor'] | |
| print "..searching....page = {0}".format(count) | |
| response[count] = r.json() | |
| count += 1 | |
| def search_events(events, **kwargs): | |
| query = '' | |
| for key in ('client_id', 'client_secret', 'directory_id', 'event_type_id', 'id', | |
| 'resolution', 'since', 'until', 'user_id'): | |
| if key in kwargs: | |
| query += '&%s=%s' % (key, kwargs[key]) | |
| if query.startswith("&"): | |
| query = query[1:] | |
| result = events.get_events(query) | |
| return result |
Updated, thanks for the report!
@foxwill, could you suggest how to create_user than create_app (or edit_app) than assign_user_to_app?
@dmakhno sorry for the late reply, must have missed this request.
For anyone else looking, the best way to do this would be to create a role in the account that has the app needed, then you can assign this role via the update_user or add_role_to_user
update from a colleague to fix a bug in get_all_users (https://gist.github.com/foxwill/6578e2fe9f25e4270b94303c8e8225e5#file-onelogin-py-L218)
Working with a large directory exposed what i think is a change in returned pagination URL,
It seems to have changed from a relative URL, to a full URL:
https://api.eu.onelogin.com/api/1/users?fields=&after_cursor=YWNjb3VudF9pZD..252335625
I propose a fix, this one for users class, events class might need it as well:
diff --git a/OneLogin.py b/OneLogin.py
index 4b78668..e4269bc 100644
--- a/OneLogin.py
+++ b/OneLogin.py
@@ -191,7 +193,7 @@ class User(Token):
count = 0
next_page = 1
response = {}
- original_user_base_url = user.base_url
+ original_user_endpoint = user.user_endpoint
while next_page != 0:
if sort == 0:
r = user.session.get(user.base_url + user.user_endpoint + ('?' if '?' not in user.user_endpoint else '&') +
@@ -215,7 +217,7 @@ class User(Token):
response[count] = r.json()
return response
else:
- user.user_endpoint = original_user_base_url + '?after_cursor=' + r.json()['pagination']['after_cursor']
+ user.user_endpoint = original_user_endpoint + '?after_cursor=' + r.json()['pagination']['after_cursor']
response[count] = r.json()
count += 1
@@ -293,7 +295,7 @@ class User(Token):
count = 0
next_page = True
response = {}
- original_user_base_url = user.base_url
+ original_user_endpoint = user.user_endpoint
while next_page:
if not sort and not r_sort:
search_terms = str(s_field) + '=' + str(s_string) + '&fields=' + str(fields) + '&since=' + str(since) + '&until=' + str(until)
@@ -335,7 +337,7 @@ class User(Token):
response[count] = r.json()
return response
else:
- user.user_endpoint = original_user_base_url + '?after_cursor=' + r.json()['pagination']['after_cursor']
+ user.user_endpoint = original_user_user_endpoint + '?after_cursor=' + r.json()['pagination']['after_cursor']
print "..searching....page = {0}".format(count)
response[count] = r.json()
count += 1for some reason in python 3.7 I am getting an an error importing or running this script:
File "OneLogin.py", line 62 print token.handle_error(**{'status_code':r.status_code, ^ SyntaxError: invalid syntax
am i missing something here?
for some reason in python 3.7 I am getting an an error importing or running this script:
File "OneLogin.py", line 62 print token.handle_error(**{'status_code':r.status_code, ^ SyntaxError: invalid syntaxam i missing something here?
Yeah, with python 3, print is now a function so you need to encapsulate in parens
@caffeinatedsecurity what do you mean by encapsulate in parens ? Can you give an example how those lines needs to be changed ?
@caffeinatedsecurity what do you mean by encapsulate in parens ? Can you give an example how those lines needs to be changed ?
print(token.handle_error(**{'status_code':r.status_code, 'message_body':r.text, 'url': token.target + '/token', 'headers':token.session.headers}))
many thanks! I have completley overseen to solve it with the starting brackets ! Thanks for precise answer - highly appreciated
I want to use these functions in my restService.py. So What I did:
1- Put this code into file OneLogin.py OK
=> I remplaced 'client_id' and 'client_secret' by mind in line 35 of OneLogin.py of couse.
2- Create my file restService.py in the same folder OK
3- In my restService.py I have the following code:
from flask import Flask
import requests
from OneLogin import OneLogin, Token, User
@app.route("/api/test")
def getAllUsers():
myOneLoginObject = OneLogin()
print(myOneLoginObject) #output = https://api.us.onelogin.com
#Now I want to use the function get_token in class Token(OneLogin), then call get_all_users in class User(Token)
#How can I do ????
if __name__ == '__main__':
app.run(host="127.0.0.1", port=8081, debug=True)I just want to generate a token in order to call OneLogin API and then just get all users to test if it's working good.
The real trouble here, is that I don't really know how to code...
So please how can i achieve correctly what I want.
Thanks
@foxwill
I had to include
import requestsat the top of this script.