Last active
January 3, 2026 19:37
-
-
Save jikamens/6ffc52a7f85439046c32e05c7a67a906 to your computer and use it in GitHub Desktop.
Python script to export or import Mastodon filters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # Export or import Mastodon filters | |
| # | |
| # This script exports or imports filters from your Mastodon account. | |
| # To use it, you first need to create an "Application" in your | |
| # Mastodon account with read:filters and write:filters scopes; if you | |
| # just want to export, you only need read:filters. Go to Preferences | |
| # -> Development in the Mastodon web UI to do this. Then click on the | |
| # newly created application and copy its access token, since that's | |
| # what you'll need for this script. | |
| # | |
| # You can pass the access token into the script via the | |
| # MASTODON_ACCESS_TOKEN environment variable or the --access-token | |
| # command-line flag; if neither of those is set, the script will look | |
| # for the token for the specified account in <config dir>/<script | |
| # name>/keys. You can use the save-access-token subcommand to save a | |
| # token in that file. | |
| # | |
| # If you try to export or import without specifying an account, and | |
| # you don't specify an access token on the command line or in the | |
| # environment variable, and there's a single access token stored in | |
| # the keys file, then the account associated with that access token is | |
| # used automatically. | |
| # | |
| # Author: Jonathan Kamens <jik@kamens.us> | |
| # Available at: | |
| # https://gist.github.com/jikamens/6ffc52a7f85439046c32e05c7a67a906 | |
| # | |
| # This script is released into the public domain. Do whatever you want | |
| # with it. | |
| import argparse | |
| from contextlib import contextmanager | |
| from copy import deepcopy | |
| import datetime | |
| import json | |
| import os | |
| from platformdirs import user_config_dir | |
| import re | |
| import requests | |
| import sys | |
| import time | |
| from urllib.parse import quote_plus | |
| def parse_args(): | |
| global_parser = argparse.ArgumentParser(add_help=False) | |
| global_parser.add_argument('--verbose', action='store_true') | |
| global_parser.add_argument('--account', action='store') | |
| global_parser.add_argument('--access-token', action='store') | |
| parser = argparse.ArgumentParser( | |
| description='Export or import Mastodon filters', | |
| parents=[global_parser]) | |
| subparsers = parser.add_subparsers(required=True) | |
| save_access_token_subparser = subparsers.add_parser( | |
| 'save-access-token', parents=[global_parser]) | |
| save_access_token_subparser.set_defaults(func=do_save_access_token) | |
| export_subparser = subparsers.add_parser( | |
| 'export', parents=[global_parser]) | |
| export_subparser.set_defaults(func=do_export) | |
| export_subparser.add_argument('filename', nargs='?') | |
| import_subparser = subparsers.add_parser( | |
| 'import', parents=[global_parser]) | |
| import_subparser.set_defaults(func=do_import) | |
| import_subparser.add_argument('--dryrun', action='store_true') | |
| import_subparser.add_argument('filename', nargs='?') | |
| args = parser.parse_args() | |
| if args.account and '@' not in args.account: | |
| parser.exit('Account must be user@host') | |
| return args | |
| def get_access_token_file(args): | |
| program = re.sub(r'\.py$', '', os.path.basename(__file__), flags=re.I) | |
| config_dir = user_config_dir(program) | |
| os.makedirs(config_dir, mode=0o700, exist_ok=True) | |
| return os.path.join(config_dir, 'keys') | |
| def get_access_token(args, use_config=True): | |
| if args.access_token: | |
| return (args.access_token, args.account) | |
| if token := os.getenv("MASTODON_ACCESS_TOKEN"): | |
| return (token, args.account) | |
| if use_config: | |
| token_file = get_access_token_file(args) | |
| try: | |
| with open(token_file, 'r') as f: | |
| first_token = None | |
| for line in f: | |
| (account, token) = line.strip().split(':', 1) | |
| if args.account: | |
| if account == args.account: | |
| return token | |
| elif first_token: | |
| sys.exit('Account not specified and there are ' | |
| 'multiple saved accounts') | |
| else: | |
| first_token = token | |
| if first_token: | |
| return (first_token, account) | |
| except FileNotFoundError: | |
| sys.exit(f'Access token not specified and ' | |
| f'{token_file} does not exist') | |
| sys.exit('No access token specified') | |
| def do_save_access_token(args): | |
| if not args.account: | |
| sys.exit('You must specify account when saving access token') | |
| access_token, _account = get_access_token(args, use_config=False) | |
| access_token_file = get_access_token_file(args) | |
| contents = '' | |
| try: | |
| with open(access_token_file, 'r') as f: | |
| for line in f: | |
| (account, token) = line.strip().split(':', 1) | |
| if account == args.account: | |
| continue | |
| contents += f'{account}:{token}\n' | |
| except FileNotFoundError: | |
| pass | |
| contents += f'{args.account}:{access_token}\n' | |
| new_file = f'{access_token_file}.new' | |
| with open(new_file, 'w') as f: | |
| os.fchmod(f.fileno(), 0o600) | |
| f.write(contents) | |
| os.rename(new_file, access_token_file) | |
| def get_filters(args, access_token=None, account=None): | |
| if (not access_token) != (not account): | |
| sys.exit( | |
| 'Internal error: get_filters: token w/o account or vice versa') | |
| elif not access_token: | |
| access_token, account = get_access_token(args) | |
| if not account: | |
| sys.exit('Account not specified') | |
| _username, server = account.split('@', 1) | |
| response = requests.get( | |
| f'https://{server}/api/v2/filters', | |
| headers={'Authorization': f'Bearer {access_token}'}) | |
| response.raise_for_status() | |
| return response.json() | |
| @contextmanager | |
| def get_file(args, mode): | |
| if not args.filename or args.filename == '-': | |
| yield sys.stdin if mode == 'r' else sys.stdout | |
| else: | |
| f = open(args.filename, mode) | |
| yield f | |
| f.close() | |
| def do_export(args): | |
| filters = get_filters(args) | |
| with get_file(args, 'w') as output_file: | |
| print(json.dumps(filters, indent=2), file=output_file) | |
| def canonicalize_filter(f): | |
| f = deepcopy(f) | |
| f.pop('id', None) | |
| f['context'].sort() | |
| f['statuses'].sort() | |
| for keyword in f['keywords']: | |
| keyword.pop('id', None) | |
| f['keywords'].sort(key=lambda k: k['keyword']) | |
| if expires_at := f.get('expires_at', None): | |
| f['expires_at'] = round(datetime.datetime.fromisoformat(expires_at). | |
| timestamp() / 60) | |
| return json.dumps(f, sort_keys=True) | |
| def make_request(f, expires_in=None): | |
| req = [] | |
| req.append(('title', f['title'])) | |
| for c in f['context']: | |
| req.append(('context[]', c)) | |
| req.append(('filter_action', f['filter_action'])) | |
| if expires_in: | |
| req.append(('expires_in', str(expires_in))) | |
| for keyword in f['keywords']: | |
| req.append(('keywords_attributes[][keyword]', keyword['keyword'])) | |
| if keyword.get('whole_word', None) is not None: | |
| req.append(('keywords_attributes[][whole_word]', | |
| str(keyword['whole_word']).lower())) | |
| return '&'.join((f'{k}={quote_plus(v)}' for k, v in req)) | |
| def do_import(args): | |
| # 1. Read filters being imported to confirm the JSON is valid | |
| # before we waste time reading data from the server. | |
| # 2. Get existing filters. | |
| # 3. Canonicalize and index existing filters. | |
| # 4. For each filter being imported, canonicalize it, skip it if | |
| # it already exists, and import it otherwise. | |
| with get_file(args, 'r') as input_file: | |
| new_filters = json.load(input_file) | |
| access_token, account = get_access_token(args) | |
| old_filters = get_filters(args, access_token=access_token, account=account) | |
| indexed_filters = {canonicalize_filter(f): f for f in old_filters} | |
| for filter in new_filters: | |
| if canonicalize_filter(filter) in indexed_filters: | |
| if args.verbose: | |
| print(f'Skipping existing filter {filter["title"]}') | |
| continue | |
| expires_in = None | |
| if filter['expires_at']: | |
| expires_in = int( | |
| datetime.datetime.fromisoformat( | |
| filter['expires_at']).timestamp() - | |
| time.time()) | |
| if expires_in < 0: | |
| if args.verbose: | |
| print(f'Skipping expired filter {filter['title']}') | |
| continue | |
| data = make_request(filter, expires_in) | |
| if args.verbose: | |
| print(f'Importing filter {filter['title']}') | |
| if args.dryrun: | |
| continue | |
| _username, server = account.split('@', 1) | |
| response = requests.post( | |
| f'https://{server}/api/v2/filters', | |
| headers={'Authorization': f'Bearer {access_token}'}, | |
| data=data) | |
| if response.status_code != 200: | |
| try: | |
| error = response.json()['error'] | |
| sys.exit(f'Error importing filter {filter['title']}: {error}') | |
| except Exception: | |
| response.raise_for_status() | |
| def main(): | |
| args = parse_args() | |
| args.func(args) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment