Last active
January 3, 2018 14:04
-
-
Save nicoster/9c1c2641388bfc780b55ef1e8bb44fb0 to your computer and use it in GitHub Desktop.
to make gitlab admin life easier.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import requests as httpc | |
| import sys | |
| import json | |
| ''' | |
| API reference: | |
| . https://docs.gitlab.com/ee/api/members.html#edit-a-member-of-a-group-or-project | |
| . https://docs.gitlab.com/ce/api/projects.html#list-all-projects | |
| . https://docs.gitlab.com/ce/api/users.html#list-emails-for-user | |
| ''' | |
| # replace git.example.com with your gitlab host | |
| URL = 'https://git.example.com/api/v4' | |
| # personal access token. see https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html | |
| TOKEN = 'XXXXXXXXXXX' | |
| def get_all_items(type): | |
| i = 0 | |
| items = [] | |
| while True: | |
| i += 1 | |
| r = httpc.get(URL + '/' + type + '?per_page=100&page=' + str(i), headers={'PRIVATE-TOKEN': TOKEN}) | |
| if r.status_code >= 300: | |
| print(r.url, r.reason, r.status_code) | |
| print(r.text) | |
| break | |
| if r.text == '[]': break | |
| items += r.json() | |
| return items | |
| CACHE_PREFIX='/tmp/gitlab_cache_' | |
| def write_cache(type, data): | |
| try: | |
| cache = open(CACHE_PREFIX + type, 'w') | |
| json.dump(data, cache) | |
| cache.close() | |
| except OSError as e: | |
| pass | |
| def read_cache(type): | |
| try: | |
| cache = open(CACHE_PREFIX + type) | |
| data = json.load(cache) | |
| cache.close() | |
| return data | |
| except: | |
| # print(e) | |
| return [] | |
| def get_users(emails): | |
| allusers = read_cache('users') | |
| users = [u for u in allusers if u['email'] in emails] | |
| if len(users) != len(emails): | |
| allusers = get_all_items('users') | |
| write_cache('users', allusers) | |
| users = [u for u in allusers if u['email'] in emails] | |
| if users == []: | |
| print('invalid users, abort') | |
| exit(1) | |
| return users | |
| # access: {developer: 30, master: 40, owner: 50} | |
| def add_user_to_group(emails, access, groupname): | |
| emails = list(map(lambda email: email if '@' in email else email + '@example.com', emails)) | |
| users = get_users(emails) | |
| allgroups = get_all_items('groups') | |
| group = [g for g in allgroups if g['name'] == groupname] | |
| if group == []: | |
| print('invalid group, abort') | |
| exit(2) | |
| if len(group) != 1: | |
| print('more than 1 group of ' + groupname + ', abort') | |
| exit(3) | |
| group = group[0] | |
| for user in users: | |
| r = httpc.post(URL + '/groups/' + str(group['id']) + '/members', headers={'PRIVATE-TOKEN': TOKEN}, data={'user_id': user['id'], 'access_level': access}) | |
| if r.reason == 'Conflict': | |
| r = httpc.put(URL + '/groups/' + str(group['id']) + '/members/' + str(user['id']) + '?access_level=' + str(access), headers={'PRIVATE-TOKEN': TOKEN}) | |
| print(r.url, r.reason) | |
| print(r.text) | |
| def main(cmd, args): | |
| if cmd == 'addgroupuser': | |
| add_user_to_group(args[2:], args[1], args[0]) | |
| elif cmd == 'query': | |
| items = get_all_items(args[0]) | |
| if len(args) > 1: | |
| print(json.dumps([i for i in items if i['name'] in args[1:]])) | |
| else: | |
| print(json.dumps(items)) | |
| if __name__ == '__main__': | |
| if len(sys.argv) < 2: | |
| print("usage: " + sys.argv[0] + " CMD ARG1 ARG2 ..") | |
| print("supported commands:") | |
| print("\taddgroupuser GROUPNAME ACCESS EMAIL1 [EMAIL2 ..]") | |
| print("\tquery groups|projects|groups/304/members|.. [NAME1 NAME2 ..]") | |
| exit(99) | |
| main(sys.argv[1], sys.argv[2:]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment