Skip to content

Instantly share code, notes, and snippets.

@jo-migo
Last active November 3, 2016 22:37
Show Gist options
  • Select an option

  • Save jo-migo/481b475309e40f1d1fefe45236c10891 to your computer and use it in GitHub Desktop.

Select an option

Save jo-migo/481b475309e40f1d1fefe45236c10891 to your computer and use it in GitHub Desktop.
Batch Update/Delete Tool Using V2 REST API
import ast
import uuid
import json
import sys
import bravado
from distutils import util
from pyramid import settings as p_settings
from bravado.client import SwaggerClient
DELETE_MAP, EDIT_MAP, GET_MAP, ID_TO_PAGE_ID, ID_TO_PROJ_ID, AUTH_OPTIONS = ({} for _ in range(6))
OPERATION, ENTITY, CLIENT = ("" for _ in range(3))
def main():
load_swagger()
global AUTH_OPTIONS, OPERATION, ENTITY
token = get_token(prompt='Enter OAuth Token: ')
AUTH_OPTIONS = {'headers': {'Authorization': token}}
OPERATION = ask_until_satisfied('EDIT or DELETE: ', ['edit', 'delete'])
method = ask_until_satisfied('ID or PROPERTY: ', ['id', 'property'])
if OPERATION == 'edit':
ENTITY = ask_until_satisfied('Name of entity to perform batch updates on (ex. campaign, pageevent, etc.): ', EDIT_MAP.keys())
else:
ENTITY = ask_until_satisfied('Name of entity to perform batch deletion on (ex. campaign, experiment, etc.): ', DELETE_MAP.keys())
if method == 'id':
perform_changes_by_id()
elif method == 'property':
perform_changes_by_property()
def get_token(prompt):
return "Bearer {}".format(input(prompt))
def load_swagger():
global CLIENT
CLIENT = SwaggerClient.from_url('https://api.optimizely.com/v2/swagger.json', config={'also_return_response': True})
# To use public API:
CLIENT.swagger_spec.api_url = 'http://api.optimizely.com'
global DELETE_MAP, EDIT_MAP, GET_MAP
DELETE_MAP = {
'experiment': [CLIENT.Experiments.delete_experiment, 'experiment_id'],
'page': [CLIENT.Pages.delete_page, 'page_id'],
'campaign': [CLIENT.Campaigns.delete_campaign, 'campaign_id'],
'pageevent': [CLIENT.Events.delete_page_event, 'event_id'],
'customevent': [CLIENT.Events.delete_custom_event, 'event_id']
}
EDIT_MAP = {
'project': [CLIENT.Projects.update_project, 'project_id'],
'customevent': [CLIENT.Events.update_custom_event, 'event_id'],
'page': [CLIENT.Pages.update_page, 'page_id'],
'pageevent': [CLIENT.Events.update_page_event, 'event_id'],
'audience': [CLIENT.Audiences.update_audience, 'audience_id'],
'campaign': [CLIENT.Campaigns.update_campaign, 'campaign_id'],
'experiment': [CLIENT.Experiments.update_experiment, 'experiment_id']
}
# Have to perform special check for click/custom events because get_event gets both
# types but update and delete are type-specific
GET_MAP = {
'project': CLIENT.Projects.list_projects,
'customevent': CLIENT.Events.list_events,
'page': CLIENT.Pages.list_pages,
'pageevent': CLIENT.Events.list_events,
'audience': CLIENT.Audiences.list_audiences,
'campaign': CLIENT.Campaigns.list_campaigns,
'experiment': CLIENT.Experiments.list_experiments
}
def filter_events(event_list, page_ids):
target_events = []
event_type = ENTITY.rstrip('event')
for event in event_list:
if event.event_type == event_type:
if event.event_type == 'custom' or (event.event_type == 'click' and int(event.page_id) in page_ids):
target_events.append(event)
return target_events
def display_projects():
# Go through projects, typing NEXT to see the next page if it exists
print_projects = util.strtobool(input("Would you like to see your projects? [y/n]: "))
i = 1
while print_projects:
projects = CLIENT.Projects.list_projects(page=i, _request_options=AUTH_OPTIONS)
projects, http_resp = projects.result()
print('%-15s %-50s' %("Project ID", "Project Name"))
for p in projects:
print('%-15s %-50s'%(p.id, p.name))
print('\n\n')
if 'next' in (http_resp.headers.get('Link') or []):
print_projects = util.strtobool(input("Would you like to display your next {} projects? [y/n]: ".format(25)))
else:
print_projects = False
i += 1
def display_pages(project_ids):
print_pages = util.strtobool(input("Would you like to see your pages? [y/n]: "))
i = 1
while print_pages:
pages = CLIENT.Pages.list_pages(project_id=project_id, page=i, _request_options=AUTH_OPTIONS)
pages, http_resp = pages.result()
print('%-15s %-50s %-15s' %("Page ID", "Page Name", "Project ID"))
for p in pages:
print('%-15s %-50s %-15s' %(p.id, p.name, project_id))
print('\n\n')
if 'next' in (http_resp.headers.get('Link') or []):
print_pages = util.strtobool(input("Would you like to display your next {} pages? [y/n]: ".format(25)))
else:
print_pages = False
i += 1
def list_entities_under_projects(project_ids, page_ids):
print_projects = util.strtobool(input("Would you like to see your relevant {}s? [y/n]: ".format(ENTITY)))
get_function = GET_MAP[ENTITY]
if ENTITY == 'pageevent':
print('%-15s %-50s %-15s %-15s' %("{} ID".format(ENTITY.title()), "{} Name".format(ENTITY.title()), "Project ID", "Page ID"))
if ENTITY != 'pageevent':
print('%-15s %-50s %-15s' %("{} ID".format(ENTITY.title()), "{} Name".format(ENTITY.title()), "Project ID"))
for project_id in project_ids:
print_next = True
i = 1
while print_next:
object_list, http_resp = get_function(project_id=project_id, per_page=50, page=i, _request_options=AUTH_OPTIONS).result()
if ENTITY == 'pageevent':
filtered_object_list = []
for page_id in page_ids:
eligible_events = filter_results(object_list, [('page_id', page_id), ('event_type', 'click')])
filtered_object_list += eligible_events
for obj in eligible_events:
ID_TO_PAGE_ID[obj.id] = page_id
print('%-15s %-50s %-15s %-15s'%(obj.id, obj.name, project_id, page_id))
object_list = filtered_object_list
else:
if ENTITY == 'customevent':
object_list = filter_results(object_list, [('event_type', 'custom')])
for obj in object_list:
ID_TO_PROJ_ID[obj.id] = project_id
print('%-15s %-50s %-15s'%(obj.id, obj.name, project_id))
i += 1
if 'next' in (http_resp.headers.get('Link') or []):
print_next = util.strtobool(input("Would you like to display your next {} {}s? [y/n]: ".format(25, ENTITY)))
else:
print_next = False
print('\n\n')
def perform_changes_by_id():
page_ids = None
display_projects()
if ENTITY != 'project':
project_ids = input("Project IDs under which you'll be {}ing {}s by ID, separated by whitespace: ".format(OPERATION, ENTITY))
project_ids = list(map(int, project_ids.split()))
if ENTITY == 'pageevent':
display_pages(project_ids)
page_ids = input("Page IDs on which you'll be {}ing {}s by ID: ".format(OPERATION, ENTITY))
page_ids = list(map(int, page_ids.split()))
list_entities_under_projects(project_ids, page_ids)
ids = input('IDs of entities to be {} separated by whitespace: '.format('edited' if OPERATION=='edit' else 'deleted'))
id_list = list(map(int, ids.split()))
if OPERATION == 'edit':
updates = input('JSON Dict of Changes to apply to selected entities: ')
try:
json_changes = json.loads(updates)
except ValueError as error:
sys.exit('Invalid JSON: ', error.message)
if OPERATION == 'delete':
for entity_id in id_list:
make_deletion(entity_id)
else:
for entity_id in id_list:
make_update(entity_id, json_changes)
def perform_changes_by_property():
# Search for items to edit by property values (i.e. edit all experiments with status==active)
page_ids = None
if ENTITY != 'project':
display_projects()
project_ids = input("Project IDs under which you'll be {}ing {}s by property, separated by whitespace: ".format(OPERATION, ENTITY))
project_ids = list(map(int, project_ids.split()))
if ENTITY == 'pageevent':
display_pages(project_ids)
page_ids = input("Page IDs on which you'll be {}ing {}s by property, separated by whitespaces: ".format(OPERATION, ENTITY))
page_ids = list(map(int, page_ids.split()))
list_entities_under_projects(project_ids, page_ids)
property_def = input("Equality filter for entities to {} i.e. <property> = <property_val>: ".format(OPERATION))
property_tuple = create_property_filter(property_def)
if OPERATION == 'edit':
operation_map = EDIT_MAP
updates = input('JSON Dict of Changes to apply to selected entities: ')
try:
json_changes = json.loads(updates)
except ValueError as error:
sys.exit('Invalid JSON: ', error.message)
else:
operation_map = DELETE_MAP
if ENTITY in operation_map:
get_function = GET_MAP[ENTITY]
if ENTITY == 'project':
# this is just a list of ENTITY IDs
ids_to_hit = get_project_ids_to_change(get_function, property_tuple)
else:
ids_to_hit = get_ids_to_change(project_ids, get_function, property_tuple, page_ids)
if OPERATION == 'edit':
for target_id in ids_to_hit:
make_update(target_id, json_changes)
else:
for target_id in ids_to_hit:
make_deletion(target_id)
if not ids_to_hit:
sys.exit("No entities matching that property were found")
def make_update(target_id, json_changes):
edit_function = EDIT_MAP[ENTITY][0]
edit_kwarg_name = EDIT_MAP[ENTITY][1]
if ENTITY == 'pageevent':
page_id = ID_TO_PAGE_ID[target_id]
result, _ = edit_function(page_id=page_id, **{edit_kwarg_name: target_id}, body=json_changes, _request_options=AUTH_OPTIONS).result()
elif ENTITY == 'customevent':
project_id = ID_TO_PROJ_ID[target_id]
result, _ = edit_function(project_id=project_id, **{edit_kwarg_name: target_id}, body=json_changes, _request_options=AUTH_OPTIONS).result()
else:
result, _ = edit_function(**{edit_kwarg_name: target_id}, body=json_changes, _request_options=AUTH_OPTIONS).result()
print(result)
def make_deletion(target_id):
delete_function = DELETE_MAP[ENTITY][0]
delete_kwarg_name = DELETE_MAP[ENTITY][1]
if ENTITY == 'pageevent':
page_id = ID_TO_PAGE_ID[target_id]
result, _ = delete_function(page_id=page_id, **{delete_kwarg_name: target_id}, body=json_changes, _request_options=AUTH_OPTIONS).result()
elif ENTITY == 'customevent':
project_id = ID_TO_PROJ_ID[target_id]
result, _ = delete_function(project_id=project_id, **{delete_kwarg_name: target_id}, body=json_changes, _request_options=AUTH_OPTIONS).result()
else:
result, _ = delete_function(**{delete_kwarg_name: target_id}, _request_options=AUTH_OPTIONS).result()
print("Successfully deleted: {} {}!".format(ENTITY, target_id))
def create_property_filter(property_def):
equality_list = property_def.split("=")
if equality_list[1].strip()[0] == "{":
value = json.loads(equality_list[1].strip())
elif equality_list[1].strip()[0] == "[":
value = ast.literal_eval(equality_list[1].strip())
else:
value = equality_list[1].strip()
return (equality_list[0].lower().strip(), value)
def get_ids_to_change(project_ids, get_function, property_tuple, page_ids):
ids_to_hit = []
for project_id in project_ids:
next_exists = True
i = 1
while next_exists:
object_list, http_resp = get_function(project_id=project_id, per_page=100, page=i, _request_options=AUTH_OPTIONS).result()
if ENTITY == 'pageevent' or ENTITY == 'customevent':
object_list = filter_events(object_list, page_ids)
for item in object_list:
if clean_and_match_input(getattr(item, str(property_tuple[0])), property_tuple[1]):
ids_to_hit.append(item.id)
i += 1
if 'next' not in (http_resp.headers.get('Link') or []):
next_exists = False
return ids_to_hit
def get_project_ids_to_change(get_function, property_tuple):
ids_to_hit = []
next_exists = True
i = 1
while next_exists:
object_list, http_resp = get_function(per_page=100, page=i, _request_options=AUTH_OPTIONS).result()
for item in object_list:
if clean_and_match_input(getattr(item, str(property_tuple[0])), property_tuple[1]):
ids_to_hit.append(item.id)
i += 1
if 'next' not in (http_resp.headers.get('Link') or []):
next_exists = False
return ids_to_hit
def clean_and_match_input(actual_attribute, attr_input):
if isinstance(actual_attribute, str):
attr_input = attr_input.strip().lower()
actual_attribute = actual_attribute.strip().lower()
elif isinstance(actual_attribute, bool):
attr_input = p_settings.asbool(attr_input)
elif isinstance(actual_attribute, int):
attr_input = int(attr_input)
return attr_input == actual_attribute
def filter_results(input_list, filter_list):
# filter_list is a list of tuples where each is (property, value)
# Returns anything that has property == value for all the filter tuples in the list
return [x for x in input_list if meets_criteria(x, filter_list)]
def meets_criteria(entity_instance, filter_list):
for property_filter in filter_list:
if getattr(entity_instance, str(property_filter[0])) != property_filter[1]:
return False
return True
def ask_until_satisfied(question, choices):
# prompt for input and continue asking for input until user gives input contained in list of choices or types 'EXIT'
while True:
answer = input(question)
if answer.lower().strip() in choices:
return answer.lower().strip()
elif answer == "EXIT":
sys.exit("SEE YA")
print("I'm sorry, that's not a valid choice.\n Please choose from the following or type 'EXIT':\n")
for p in choices:
print(p)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment