Last active
June 7, 2025 09:09
-
-
Save Kenny2github/05602faa447abb6ebec1a9bfc7c93cec to your computer and use it in GitHub Desktop.
A Python 3 script to copy block lists from one blog to another.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import re | |
| import pytumblr | |
| from requests_oauthlib import OAuth1Session | |
| def yes_no(prompt='?'): | |
| while 1: | |
| response = input(prompt + ' (y/n) ').strip().casefold() | |
| if response.startswith(('y', 't')): | |
| return True | |
| if response.startswith(('n', 'f')): | |
| return False | |
| def choose(options, indent=0): | |
| for i, opt in enumerate(options, start=1): | |
| print(f'{" "*indent}{i:3}.', opt) | |
| while 1: | |
| response = input(f'Choose one (1-{len(options)}): ').strip() | |
| try: | |
| return int(response) - 1 | |
| except ValueError: | |
| continue | |
| def multichoose(options, indent=0): | |
| for i, opt in enumerate(options, start=1): | |
| print(f'{" "*indent}{i:3}.', opt) | |
| while 1: | |
| response = input(f'Choose one or more e.g. 1,3-5,9 (default 1-{len(options)}): ').strip() | |
| if not response: | |
| return list(range(len(options))) | |
| choices = [] | |
| try: | |
| for m in re.finditer(r'(\d+)(?:-(\d+))?', response): | |
| if m.group(2): | |
| start, stop = m.groups() | |
| choices.append(list(range(int(start)-1, int(stop)))) | |
| else: | |
| choices.append([int(m.group(1)) - 1]) | |
| except ValueError: | |
| continue | |
| else: | |
| return sum(choices, start=[]) | |
| class BlockingClient(pytumblr.TumblrRestClient): | |
| def blocks(self, blogname, **kwargs): | |
| """ | |
| Gets the list of blocks | |
| :param blogname: a string, the url of the blog blocking others | |
| :returns: a dict created from the JSON response | |
| """ | |
| url = f'/v2/blog/{blogname}/blocks' | |
| return self.send_api_request('get', url, kwargs, ['limit', 'offset']) | |
| def bulk_block(self, blogname, **kwargs): | |
| """ | |
| Blocks a list of blogs | |
| :param blogname: a string, the url of the blog to block others from | |
| :param blocked_tumblelogs: a string, a comma-separated list of blogs to | |
| block; or an iterable of blogs to block | |
| :param force: a boolean, whether to bypass Post+ subscriptions | |
| :returns: a dict created from the JSON response | |
| """ | |
| if 'blocked_tumblelogs' in kwargs and not isinstance(kwargs['blocked_tumblelogs'], str): | |
| kwargs['blocked_tumblelogs'] = ','.join(kwargs['blocked_tumblelogs']) | |
| url = f'/v2/blog/{blogname}/blocks/bulk' | |
| return self.send_api_request('post', url, kwargs, ['blocked_tumblelogs', 'force']) | |
| def make_client(): | |
| # This function modified from: | |
| # https://github.com/tumblr/pytumblr/blob/master/interactive_console.py#L25-L59 | |
| print(f'{1:4}. Go to https://www.tumblr.com/oauth/apps') | |
| print(f'{2:4}. Register an application or choose an existing one.\n' | |
| f'{"":6}If registering, use https://example.com as your "Redirect URI"\n' | |
| f'{"":6}for convenience.') | |
| consumer_key = input(f'{3:4}. Paste the "OAuth Consumer Key" here: ').strip() | |
| consumer_secret = input(f'{4:4}. Show the secret key and paste it here: ').strip() | |
| request_token_url = 'http://www.tumblr.com/oauth/request_token' | |
| authorize_url = 'http://www.tumblr.com/oauth/authorize' | |
| access_token_url = 'http://www.tumblr.com/oauth/access_token' | |
| # STEP 1: Obtain request token | |
| oauth_session = OAuth1Session(consumer_key, client_secret=consumer_secret) | |
| fetch_response = oauth_session.fetch_request_token(request_token_url) | |
| resource_owner_key = fetch_response.get('oauth_token') | |
| resource_owner_secret = fetch_response.get('oauth_token_secret') | |
| # STEP 2: Authorize URL + Response | |
| full_authorize_url = oauth_session.authorization_url(authorize_url) | |
| # Redirect to authentication page | |
| print(f'{5:4}. Go to:', full_authorize_url) | |
| print(f'{6:4}. Click "Allow". You will be redirected to a website (like example.com/?...)') | |
| redirect_response = input(f'{7:4}. Paste the full URL you were redirected to here:\n').strip() | |
| # Retrieve oauth verifier | |
| oauth_response = oauth_session.parse_authorization_response(redirect_response) | |
| verifier = oauth_response.get('oauth_verifier') | |
| # STEP 3: Request final access token | |
| oauth_session = OAuth1Session( | |
| consumer_key, | |
| client_secret=consumer_secret, | |
| resource_owner_key=resource_owner_key, | |
| resource_owner_secret=resource_owner_secret, | |
| verifier=verifier | |
| ) | |
| oauth_tokens = oauth_session.fetch_access_token(access_token_url) | |
| return BlockingClient( | |
| consumer_key, | |
| consumer_secret, | |
| oauth_tokens.get('oauth_token'), | |
| oauth_tokens.get('oauth_token_secret'), | |
| ) | |
| if __name__ == '__main__': | |
| clients = [] | |
| step = 1 | |
| while 1: | |
| print(f'{step}. Sign into a Tumblr ACCOUNT to copy blocks from/to.') | |
| clients.append(make_client()) | |
| if not yes_no('Is there another Tumblr account in which your blog(s) live(s)?'): | |
| break | |
| step += 1 | |
| blogs = [client.info()['user']['blogs'] for client in clients] | |
| call_counts = [1] * len(clients) | |
| primaries = [[blog for blog in client_blogs if blog['primary']][0] for client_blogs in blogs] | |
| blogs_flat = [(blog, i) for i, client_blogs in enumerate(blogs) for blog in client_blogs] | |
| blog_descs = [f'{blog["name"]} (sideblog of {primaries[i]["name"]})' for blog, i in blogs_flat] | |
| while 1: | |
| print(f'{step}. Choose (a) blog(s) to copy blocks FROM:') | |
| srcs = multichoose(blog_descs) | |
| step += 1 | |
| print(f'{step}. Choose a blog to copy blocks TO:') | |
| dst = choose(blog_descs) | |
| step += 1 | |
| dst_blog, dst = blogs_flat[dst] | |
| blocks = set() | |
| print('Fetching blocks. Due to API rate limits, this only fetches 20 blocks per second.') | |
| print('At any point while fetching, press Ctrl/Control+C to stop fetching and move on\n' | |
| "to fetching the next blog's blocks.") | |
| for src in srcs: | |
| src_blog, src = blogs_flat[src] | |
| print(f' Fetching blocks for {src_blog["name"]}', end='', flush=True) | |
| offset = 0 | |
| block_count = len(blocks) | |
| while 1: | |
| if call_counts[src] >= 999 + (src == dst): | |
| print('\n\nERROR: The API rate limit of 1,000 calls per hour has been\n' | |
| f'reached for the {primaries[src]["name"]!r} account. This corresponds\n' | |
| 'to 20,000 total blocks across all blogs on that account. If you have\n' | |
| 'more blocks than that, this script cannot help you. Aborting block-\n' | |
| 'fetching for this account now.') | |
| break | |
| try: | |
| r = clients[src].blocks( | |
| src_blog["uuid"], | |
| offset=offset, | |
| limit=20, | |
| )['blocked_tumblelogs'] | |
| call_counts[src] += 1 | |
| blocks |= {blog['uuid'] for blog in r} | |
| if len(r) < 20: | |
| break | |
| offset += 20 | |
| time.sleep(1) | |
| print('.', end='', flush=True) | |
| except KeyboardInterrupt: | |
| break | |
| print(f'\n Fetched {len(blocks) - block_count} new blocks.') | |
| print(f'Blocking {len(blocks)} blogs from {dst_blog["name"]}... ', end='', flush=True) | |
| clients[dst].bulk_block( | |
| dst_blog["uuid"], | |
| blocked_tumblelogs=blocks, | |
| force=True, | |
| ) | |
| call_counts[dst] += 1 | |
| print('Done.') | |
| if not yes_no('Would you like to copy another blocklist?'): | |
| break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment