Created
January 24, 2024 09:54
-
-
Save crablab/e538237bda772ff24a48cd3fb1466a35 to your computer and use it in GitHub Desktop.
Migrate PagerDuty Event Rules to Event Orchestration in bulk. See: https://support.pagerduty.com/docs/migrate-to-event-orchestration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import requests | |
| import argparse | |
| import logging | |
| PD_KEY = os.getenv('PD_KEY') | |
| parser = argparse.ArgumentParser( | |
| description='Migrate event rules for all services in a PD account') | |
| parser.add_argument('--include', dest='include', | |
| action='append', help='Only migrate these service IDs') | |
| args = parser.parse_args() | |
| logging.basicConfig(level=logging.INFO) | |
| def get_services(): | |
| url = 'https://api.pagerduty.com/services' | |
| headers = { | |
| 'Accept': 'application/json', | |
| 'Content-Type': 'application/json', | |
| 'Authorization': 'Token token={token}'.format(token=PD_KEY), | |
| } | |
| more = True | |
| services = [] | |
| while more == True: | |
| response = requests.get(url, headers=headers, params={ | |
| 'offset': len(services), 'limit': 25}) | |
| if (response.status_code != 200): | |
| logging.error('Received {status} response from PD API'.format( | |
| status=response.status_code)) | |
| decoded_response = response.json() | |
| services.extend(decoded_response['services']) | |
| if (decoded_response['more'] != True): | |
| more = False | |
| logging.info('Retrieved {count} services.'.format(count=len(services))) | |
| return (services) | |
| def is_event_rule_migrated(id: str): | |
| url = 'https://api.pagerduty.com/services/{id}/rules'.format(id=id) | |
| querystring = {'include[]': 'migrated_metadata'} | |
| headers = { | |
| 'Accept': 'application/json', | |
| 'Content-Type': 'application/json', | |
| 'Authorization': 'Token token={token}'.format(token=PD_KEY), | |
| } | |
| response = requests.get(url, headers=headers, params=querystring) | |
| if (response.status_code != 200): | |
| logging.error('Received {status} response from PD API'.format( | |
| status=response.status_code)) | |
| return (response.json()['migrated_status'] == 'completed') | |
| def migrate_event_rules(id: str): | |
| import requests | |
| url = 'https://api.pagerduty.com/services/{id}/rules/convert'.format(id=id) | |
| headers = { | |
| 'Accept': 'application/json', | |
| 'Content-Type': 'application/json', | |
| 'Authorization': 'Token token={token}'.format(token=PD_KEY), | |
| } | |
| response = requests.post(url, headers=headers) | |
| if (response.status_code != 200): | |
| logging.error('Received {status} response from PD API'.format( | |
| status=response.status_code)) | |
| return (response.json()['convert_status'] == 'completed') | |
| def activate_event_orchestration(id: str): | |
| import requests | |
| url = 'https://api.pagerduty.com/event_orchestrations/services/{id}/active'.format( | |
| id=id) | |
| payload = {'active': True} | |
| headers = { | |
| 'Accept': 'application/json', | |
| 'Content-Type': 'application/json', | |
| 'Authorization': 'Token token={token}'.format(token=PD_KEY), | |
| } | |
| response = requests.put(url, json=payload, headers=headers) | |
| if (response.status_code != 200): | |
| logging.error('Received {status} response from PD API'.format( | |
| status=response.status_code)) | |
| return (response.json()['active'] == True) | |
| if __name__ == '__main__': | |
| services = get_services() | |
| logging.info('Found {num} services'.format(num=len(services))) | |
| for service in services: | |
| if args.include != None and service['id'] not in args.include: | |
| logging.info('Skipping service {id}'.format(id=service['id'])) | |
| continue | |
| else: | |
| logging.info('Migrating service {id}'.format(id=service['id'])) | |
| try: | |
| if not is_event_rule_migrated(service['id']): | |
| if (migrate_event_rules(service['id']) != True): | |
| logging.error('Failed to migrate event rules for service {id}'.format( | |
| id=service['id'])) | |
| continue | |
| if (activate_event_orchestration(service['id']) != True): | |
| logging.error('Failed to activate event orchestration for service {id}'.format( | |
| id=service['id'])) | |
| continue | |
| logging.info('Successfully migrated event rules for service {id}'.format( | |
| id=service['id'])) | |
| else: | |
| logging.info( | |
| 'Event rules already migrated for service {id}'.format(id=service['id'])) | |
| except Exception as e: | |
| logging.error('Unable to migrate event rules for service {id} due to: {exception}'.format( | |
| id=service['id'], exception=e)) | |
| continue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment