Created
January 16, 2026 04:40
-
-
Save searls/1c8dcb51bfa99552811bfdf5fc16ffeb to your computer and use it in GitHub Desktop.
A script to normalize all of your North American phone numbers to start with a +1 so that iOS/iMessage is less confused when you add an overseas SIM card. Requires an app password to iCloud
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import argparse | |
| import base64 | |
| import datetime as dt | |
| import getpass | |
| import os | |
| import re | |
| import sys | |
| import textwrap | |
| import urllib.error | |
| import urllib.parse | |
| import urllib.request | |
| import xml.etree.ElementTree as ET | |
| NS = { | |
| "d": "DAV:", | |
| "c": "urn:ietf:params:xml:ns:carddav", | |
| } | |
| ICLOUD_BASE_URL = "https://contacts.icloud.com" | |
| def _e164_plus_one(ten_digits: str) -> str: | |
| area_code = ten_digits[0:3] | |
| exchange_code = ten_digits[3:6] | |
| subscriber_number = ten_digits[6:10] | |
| return f"+1 ({area_code}) {exchange_code}-{subscriber_number}" | |
| def _digits_only(text: str) -> str: | |
| return "".join(character for character in text if character.isdigit()) | |
| def _now_rev_value() -> str: | |
| return dt.datetime.now(dt.UTC).strftime("%Y%m%dT%H%M%SZ") | |
| def _unfold_vcard_lines(vcard_text: str) -> list[str]: | |
| raw_lines = vcard_text.replace("\r\n", "\n").replace("\r", "\n").split("\n") | |
| lines: list[str] = [] | |
| for raw_line in raw_lines: | |
| if not raw_line: | |
| lines.append(raw_line) | |
| continue | |
| if raw_line.startswith((" ", "\t")) and lines: | |
| lines[-1] = lines[-1] + raw_line[1:] | |
| continue | |
| lines.append(raw_line) | |
| return lines | |
| def _fold_vcard_lines(lines: list[str]) -> str: | |
| return "\r\n".join(lines).rstrip("\r\n") + "\r\n" | |
| def _extract_display_name(vcard_lines: list[str], fallback: str) -> str: | |
| for line in vcard_lines: | |
| if line.startswith("FN:"): | |
| return line.split(":", 1)[1].strip() | |
| return fallback | |
| def _card_has_us_address(vcard_lines: list[str]) -> bool: | |
| for line in vcard_lines: | |
| if not line.startswith("ADR"): | |
| continue | |
| value = line.split(":", 1)[1] if ":" in line else "" | |
| parts = value.split(";") | |
| country = parts[6] if len(parts) >= 7 else value | |
| normalized_country = country.strip().lower() | |
| if normalized_country in {"us", "usa", "united states", "united states of america"}: | |
| return True | |
| if "united states" in normalized_country: | |
| return True | |
| return False | |
| def _normalize_tel_value( | |
| value: str, | |
| *, | |
| only_eleven: bool, | |
| require_us_address_for_ten: bool, | |
| has_us_address: bool, | |
| normalize_existing_plus_one: bool, | |
| ) -> str | None: | |
| stripped_value = value.strip() | |
| digits = _digits_only(stripped_value) | |
| if stripped_value.startswith("+"): | |
| if normalize_existing_plus_one and digits.startswith("1") and len(digits) == 11: | |
| return _e164_plus_one(digits[1:]) | |
| return None | |
| if len(digits) == 11 and digits.startswith("1"): | |
| return _e164_plus_one(digits[1:]) | |
| if only_eleven: | |
| return None | |
| if len(digits) != 10: | |
| return None | |
| if require_us_address_for_ten and not has_us_address: | |
| return None | |
| if digits[0] not in "23456789": | |
| return None | |
| return _e164_plus_one(digits) | |
| def _update_vcard( | |
| vcard_text: str, | |
| *, | |
| fallback_name: str, | |
| only_eleven: bool, | |
| require_us_address_for_ten: bool, | |
| normalize_existing_plus_one: bool, | |
| ) -> tuple[str, list[tuple[str, str]], str]: | |
| lines = _unfold_vcard_lines(vcard_text) | |
| display_name = _extract_display_name(lines, fallback_name) | |
| has_us_address = _card_has_us_address(lines) | |
| changes: list[tuple[str, str]] = [] | |
| updated_lines: list[str] = [] | |
| saw_rev = False | |
| for line in lines: | |
| if line.startswith("REV:"): | |
| saw_rev = True | |
| updated_lines.append(line) | |
| continue | |
| if not line.startswith("TEL"): | |
| updated_lines.append(line) | |
| continue | |
| if ":" not in line: | |
| updated_lines.append(line) | |
| continue | |
| left, right = line.split(":", 1) | |
| new_value = _normalize_tel_value( | |
| right, | |
| only_eleven=only_eleven, | |
| require_us_address_for_ten=require_us_address_for_ten, | |
| has_us_address=has_us_address, | |
| normalize_existing_plus_one=normalize_existing_plus_one, | |
| ) | |
| if new_value is None or new_value == right: | |
| updated_lines.append(line) | |
| continue | |
| updated_lines.append(f"{left}:{new_value}") | |
| changes.append((right, new_value)) | |
| if changes: | |
| new_rev_line = f"REV:{_now_rev_value()}" | |
| updated_with_rev: list[str] = [] | |
| replaced_existing_rev = False | |
| for line in updated_lines: | |
| if line.startswith("REV:"): | |
| updated_with_rev.append(new_rev_line) | |
| replaced_existing_rev = True | |
| else: | |
| updated_with_rev.append(line) | |
| if not replaced_existing_rev: | |
| inserted = False | |
| output_lines: list[str] = [] | |
| for line in updated_with_rev: | |
| if not inserted and line.startswith("FN:"): | |
| output_lines.append(line) | |
| output_lines.append(new_rev_line) | |
| inserted = True | |
| else: | |
| output_lines.append(line) | |
| if inserted: | |
| updated_with_rev = output_lines | |
| else: | |
| end_index = None | |
| for index, line in enumerate(updated_with_rev): | |
| if line == "END:VCARD": | |
| end_index = index | |
| break | |
| if end_index is None: | |
| updated_with_rev.append(new_rev_line) | |
| else: | |
| updated_with_rev.insert(end_index, new_rev_line) | |
| updated_lines = updated_with_rev | |
| return _fold_vcard_lines(updated_lines), changes, display_name | |
| def _auth_header(username: str, password: str) -> str: | |
| userpass = f"{username}:{password}".encode("utf-8") | |
| return "Basic " + base64.b64encode(userpass).decode("ascii") | |
| def _dav_request( | |
| *, | |
| method: str, | |
| url: str, | |
| auth: str, | |
| headers: dict[str, str] | None = None, | |
| body: bytes | None = None, | |
| max_redirects: int = 5, | |
| ) -> tuple[int, dict[str, str], bytes, str]: | |
| current_url = url | |
| current_method = method | |
| current_body = body | |
| for _ in range(max_redirects + 1): | |
| request_headers = { | |
| "Authorization": auth, | |
| "User-Agent": "plus_1ify_icloud_contacts_phone_numbers/1.0", | |
| } | |
| if headers: | |
| request_headers.update(headers) | |
| request = urllib.request.Request( | |
| current_url, | |
| data=current_body, | |
| headers=request_headers, | |
| method=current_method, | |
| ) | |
| try: | |
| with urllib.request.urlopen(request, timeout=60) as response: | |
| response_body = response.read() | |
| response_headers = {k.lower(): v for k, v in response.headers.items()} | |
| if response.status in {301, 302, 303, 307, 308}: | |
| location = response.headers.get("Location") | |
| if not location: | |
| return response.status, response_headers, response_body, current_url | |
| current_url = urllib.parse.urljoin(current_url, location) | |
| if response.status == 303: | |
| current_method = "GET" | |
| current_body = None | |
| continue | |
| return response.status, response_headers, response_body, current_url | |
| except urllib.error.HTTPError as error: | |
| if error.code in {301, 302, 303, 307, 308}: | |
| location = error.headers.get("Location") | |
| if not location: | |
| raise | |
| current_url = urllib.parse.urljoin(current_url, location) | |
| if error.code == 303: | |
| current_method = "GET" | |
| current_body = None | |
| continue | |
| raise | |
| raise RuntimeError(f"Too many redirects starting from {url}") | |
| def _parse_multistatus(xml_bytes: bytes) -> ET.Element: | |
| try: | |
| return ET.fromstring(xml_bytes) | |
| except ET.ParseError as error: | |
| raise RuntimeError(f"Failed to parse DAV XML: {error}") from error | |
| def _href_from_prop(element: ET.Element, xpath: str) -> str | None: | |
| href = element.findtext(xpath, namespaces=NS) | |
| return href.strip() if href else None | |
| def _discover_addressbooks(*, base_url: str, auth: str) -> list[tuple[str, str]]: | |
| base_url = base_url.rstrip("/") | |
| discovery_urls = [ | |
| urllib.parse.urljoin(base_url + "/", ".well-known/carddav"), | |
| base_url + "/co", | |
| base_url + "/", | |
| ] | |
| propfind_discovery = textwrap.dedent( | |
| """\ | |
| <?xml version="1.0" encoding="utf-8"?> | |
| <d:propfind xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:carddav"> | |
| <d:prop> | |
| <d:current-user-principal /> | |
| <d:principal-URL /> | |
| <c:addressbook-home-set /> | |
| </d:prop> | |
| </d:propfind> | |
| """ | |
| ).encode("utf-8") | |
| principal_url = None | |
| home_url = None | |
| last_status: int | None = None | |
| last_final_url: str | None = None | |
| last_body: bytes = b"" | |
| for discovery_url in discovery_urls: | |
| status, _, body, final_url = _dav_request( | |
| method="PROPFIND", | |
| url=discovery_url, | |
| auth=auth, | |
| headers={"Depth": "0", "Content-Type": "application/xml; charset=utf-8"}, | |
| body=propfind_discovery, | |
| ) | |
| last_status = status | |
| last_final_url = final_url | |
| last_body = body | |
| if status not in {207, 200}: | |
| continue | |
| try: | |
| multistatus = _parse_multistatus(body) | |
| except Exception: | |
| continue | |
| for response in multistatus.findall("d:response", namespaces=NS): | |
| if not principal_url: | |
| current_user_href = _href_from_prop(response, ".//d:current-user-principal/d:href") | |
| if current_user_href: | |
| principal_url = urllib.parse.urljoin(final_url, current_user_href) | |
| if not principal_url: | |
| principal_href = _href_from_prop(response, ".//d:principal-URL/d:href") | |
| if principal_href: | |
| principal_url = urllib.parse.urljoin(final_url, principal_href) | |
| if not home_url: | |
| home_href = _href_from_prop(response, ".//c:addressbook-home-set/d:href") | |
| if home_href: | |
| home_url = urllib.parse.urljoin(final_url, home_href) | |
| if principal_url or home_url: | |
| break | |
| if principal_url or home_url: | |
| break | |
| if home_url is None and principal_url is not None: | |
| propfind_home_set = textwrap.dedent( | |
| """\ | |
| <?xml version="1.0" encoding="utf-8"?> | |
| <d:propfind xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:carddav"> | |
| <d:prop> | |
| <c:addressbook-home-set /> | |
| </d:prop> | |
| </d:propfind> | |
| """ | |
| ).encode("utf-8") | |
| status, _, body, principal_final_url = _dav_request( | |
| method="PROPFIND", | |
| url=principal_url, | |
| auth=auth, | |
| headers={"Depth": "0", "Content-Type": "application/xml; charset=utf-8"}, | |
| body=propfind_home_set, | |
| ) | |
| if status not in {207, 200}: | |
| raise RuntimeError(f"Unexpected PROPFIND status {status} from {principal_final_url}") | |
| multistatus = _parse_multistatus(body) | |
| home_href = None | |
| for response in multistatus.findall("d:response", namespaces=NS): | |
| home_href = _href_from_prop(response, ".//c:addressbook-home-set/d:href") | |
| if home_href: | |
| break | |
| if not home_href: | |
| raise RuntimeError("Failed to discover addressbook-home-set href") | |
| home_url = urllib.parse.urljoin(principal_final_url, home_href) | |
| if home_url is None: | |
| status_text = str(last_status) if last_status is not None else "unknown" | |
| url_text = last_final_url or "(none)" | |
| snippet = last_body.decode("utf-8", errors="replace")[:400] | |
| raise RuntimeError(f"Failed to discover principal/home-set (status={status_text}, url={url_text}, body={snippet!r})") | |
| propfind_addressbooks = textwrap.dedent( | |
| """\ | |
| <?xml version="1.0" encoding="utf-8"?> | |
| <d:propfind xmlns:d="DAV:"> | |
| <d:prop> | |
| <d:displayname /> | |
| <d:resourcetype /> | |
| </d:prop> | |
| </d:propfind> | |
| """ | |
| ).encode("utf-8") | |
| status, _, body, home_final_url = _dav_request( | |
| method="PROPFIND", | |
| url=home_url, | |
| auth=auth, | |
| headers={"Depth": "1", "Content-Type": "application/xml; charset=utf-8"}, | |
| body=propfind_addressbooks, | |
| ) | |
| if status not in {207, 200}: | |
| raise RuntimeError(f"Unexpected PROPFIND status {status} from {home_final_url}") | |
| multistatus = _parse_multistatus(body) | |
| addressbooks: list[tuple[str, str]] = [] | |
| for response in multistatus.findall("d:response", namespaces=NS): | |
| href = response.findtext("d:href", namespaces=NS) | |
| if not href: | |
| continue | |
| href_url = urllib.parse.urljoin(home_final_url, href.strip()) | |
| displayname = response.findtext(".//d:displayname", namespaces=NS) or href.strip().strip("/") | |
| if response.find(".//d:resourcetype/c:addressbook", namespaces=NS) is None: | |
| continue | |
| addressbooks.append((displayname.strip(), href_url)) | |
| if not addressbooks: | |
| raise RuntimeError("No addressbooks found in addressbook-home-set") | |
| return addressbooks | |
| def _addressbook_query(*, addressbook_url: str, auth: str, limit: int | None) -> list[dict[str, str]]: | |
| report_body = textwrap.dedent( | |
| """\ | |
| <?xml version="1.0" encoding="utf-8"?> | |
| <c:addressbook-query xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:carddav"> | |
| <d:prop> | |
| <d:getetag /> | |
| <c:address-data /> | |
| </d:prop> | |
| </c:addressbook-query> | |
| """ | |
| ).encode("utf-8") | |
| status, _, body, final_url = _dav_request( | |
| method="REPORT", | |
| url=addressbook_url, | |
| auth=auth, | |
| headers={"Depth": "1", "Content-Type": "application/xml; charset=utf-8"}, | |
| body=report_body, | |
| ) | |
| if status != 207: | |
| raise RuntimeError(f"Unexpected REPORT status {status} from {final_url}") | |
| multistatus = _parse_multistatus(body) | |
| resources: list[dict[str, str]] = [] | |
| for response in multistatus.findall("d:response", namespaces=NS): | |
| href = response.findtext("d:href", namespaces=NS) | |
| if not href: | |
| continue | |
| href_url = urllib.parse.urljoin(final_url, href.strip()) | |
| if href_url.rstrip("/") == final_url.rstrip("/"): | |
| continue | |
| etag = response.findtext(".//d:getetag", namespaces=NS) or "" | |
| address_data = response.findtext(".//c:address-data", namespaces=NS) or "" | |
| if not address_data: | |
| continue | |
| resources.append({"href": href_url, "etag": etag, "vcard": address_data}) | |
| if limit is not None and len(resources) >= limit: | |
| break | |
| return resources | |
| def _put_vcard(*, href: str, etag: str, vcard: str, auth: str) -> None: | |
| headers = { | |
| "Content-Type": "text/vcard; charset=utf-8", | |
| } | |
| if etag: | |
| headers["If-Match"] = etag | |
| status, _, _, final_url = _dav_request( | |
| method="PUT", | |
| url=href, | |
| auth=auth, | |
| headers=headers, | |
| body=vcard.encode("utf-8"), | |
| ) | |
| if status not in {200, 201, 204}: | |
| raise RuntimeError(f"Unexpected PUT status {status} for {final_url}") | |
| def main(argv: list[str]) -> int: | |
| epilog = textwrap.dedent( | |
| """\ | |
| Requirements: | |
| - iCloud Contacts enabled | |
| - An app-specific password (Apple ID -> Sign-In and Security -> App-Specific Passwords) | |
| Environment: | |
| - ICLOUD_USERNAME: optional default for --username | |
| - ICLOUD_APP_PASSWORD: optional; if not set, you'll be prompted | |
| Safety: | |
| - Default is a dry run; no changes are written | |
| - Use --commit to write to iCloud (server-side) | |
| Examples: | |
| export ICLOUD_USERNAME="you@icloud.com" | |
| export ICLOUD_APP_PASSWORD="xxxx-xxxx-xxxx-xxxx" | |
| plus_1ify_icloud_contacts_phone_numbers --limit 25 | |
| plus_1ify_icloud_contacts_phone_numbers --commit | |
| """ | |
| ) | |
| parser = argparse.ArgumentParser( | |
| prog="plus_1ify_icloud_contacts_phone_numbers", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| description="Normalize iCloud Contacts phone numbers to +1 (AAA) BBB-CCCC.", | |
| epilog=epilog, | |
| usage="%(prog)s [--limit N] [--only-11] [--require-us-address-for-10] [--no-normalize-plus-one] [--addressbook-url URL] [--commit]", | |
| ) | |
| parser.add_argument("--username", default=None) | |
| parser.add_argument("--addressbook-url", default=None) | |
| parser.add_argument("--limit", type=int, default=None) | |
| parser.add_argument("--only-11", action="store_true", help="Only convert 11-digit numbers starting with 1") | |
| parser.add_argument("--require-us-address-for-10", action="store_true", help="Only convert 10-digit numbers if ADR country is US/United States") | |
| parser.add_argument("--no-normalize-plus-one", action="store_true", help="Do not rewrite existing +1 numbers into '+1 (AAA) BBB-CCCC'") | |
| parser.add_argument("--commit", action="store_true", help="Commit changes (default is dry run)") | |
| args = parser.parse_args(argv) | |
| username = args.username or os.environ.get("ICLOUD_USERNAME") | |
| if not username: | |
| try: | |
| username = input("iCloud username: ").strip() | |
| except (EOFError, KeyboardInterrupt): | |
| print("\nAborted.", file=sys.stderr) | |
| return 130 | |
| if not username: | |
| print("Missing iCloud username.", file=sys.stderr) | |
| return 2 | |
| password = os.environ.get("ICLOUD_APP_PASSWORD") or getpass.getpass(f"App-specific password for {username}: ") | |
| auth = _auth_header(username, password) | |
| normalize_existing_plus_one = not args.no_normalize_plus_one | |
| dry_run = not args.commit | |
| if args.addressbook_url: | |
| addressbook_url = args.addressbook_url | |
| else: | |
| try: | |
| addressbooks = _discover_addressbooks(base_url=ICLOUD_BASE_URL, auth=auth) | |
| except Exception as error: | |
| print(f"CardDAV discovery failed: {error}", file=sys.stderr) | |
| print( | |
| "\nTry providing the addressbook URL explicitly. For iCloud this often looks like:\n" | |
| " https://pXX-contacts.icloud.com/<DSID>/carddavhome/card/\n" | |
| "or:\n" | |
| " https://contacts.icloud.com/<DSID>/carddavhome/card/\n" | |
| "\nYou can then run:\n" | |
| " plus_1ify_icloud_contacts_phone_numbers --addressbook-url \"...\"\n", | |
| file=sys.stderr, | |
| ) | |
| return 1 | |
| _, addressbook_url = addressbooks[0] | |
| resources = _addressbook_query(addressbook_url=addressbook_url, auth=auth, limit=args.limit) | |
| changed = 0 | |
| skipped_conflicts = 0 | |
| for resource in resources: | |
| href = resource["href"] | |
| etag = resource["etag"] | |
| vcard = resource["vcard"] | |
| updated_vcard, changes, display_name = _update_vcard( | |
| vcard, | |
| fallback_name=href, | |
| only_eleven=args.only_11, | |
| require_us_address_for_ten=args.require_us_address_for_10, | |
| normalize_existing_plus_one=normalize_existing_plus_one, | |
| ) | |
| if not changes: | |
| continue | |
| changed += 1 | |
| for old, new in changes: | |
| print(f"{display_name} : {old} -> {new}") | |
| if dry_run: | |
| continue | |
| try: | |
| _put_vcard(href=href, etag=etag, vcard=updated_vcard, auth=auth) | |
| except urllib.error.HTTPError as error: | |
| if error.code == 412: | |
| skipped_conflicts += 1 | |
| print(f"SKIP (etag conflict): {display_name}", file=sys.stderr) | |
| continue | |
| if error.code == 403: | |
| print(f"SKIP (forbidden): {display_name}", file=sys.stderr) | |
| continue | |
| raise | |
| if dry_run: | |
| print(f"DRY RUN: would update {changed} contacts. Re-run with --commit to apply.") | |
| else: | |
| print(f"DONE: updated {changed} contacts.") | |
| if skipped_conflicts: | |
| print(f"Skipped due to conflicts: {skipped_conflicts}", file=sys.stderr) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment