Skip to content

Instantly share code, notes, and snippets.

@searls
Created January 16, 2026 04:40
Show Gist options
  • Select an option

  • Save searls/1c8dcb51bfa99552811bfdf5fc16ffeb to your computer and use it in GitHub Desktop.

Select an option

Save searls/1c8dcb51bfa99552811bfdf5fc16ffeb to your computer and use it in GitHub Desktop.
A script to normalize all of your North American phone numbers to start with a +1 so that iOS/iMessage is less confused when you add an overseas SIM card. Requires an app password to iCloud
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import datetime as dt
import getpass
import os
import re
import sys
import textwrap
import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
NS = {
"d": "DAV:",
"c": "urn:ietf:params:xml:ns:carddav",
}
ICLOUD_BASE_URL = "https://contacts.icloud.com"
def _e164_plus_one(ten_digits: str) -> str:
area_code = ten_digits[0:3]
exchange_code = ten_digits[3:6]
subscriber_number = ten_digits[6:10]
return f"+1 ({area_code}) {exchange_code}-{subscriber_number}"
def _digits_only(text: str) -> str:
return "".join(character for character in text if character.isdigit())
def _now_rev_value() -> str:
return dt.datetime.now(dt.UTC).strftime("%Y%m%dT%H%M%SZ")
def _unfold_vcard_lines(vcard_text: str) -> list[str]:
raw_lines = vcard_text.replace("\r\n", "\n").replace("\r", "\n").split("\n")
lines: list[str] = []
for raw_line in raw_lines:
if not raw_line:
lines.append(raw_line)
continue
if raw_line.startswith((" ", "\t")) and lines:
lines[-1] = lines[-1] + raw_line[1:]
continue
lines.append(raw_line)
return lines
def _fold_vcard_lines(lines: list[str]) -> str:
return "\r\n".join(lines).rstrip("\r\n") + "\r\n"
def _extract_display_name(vcard_lines: list[str], fallback: str) -> str:
for line in vcard_lines:
if line.startswith("FN:"):
return line.split(":", 1)[1].strip()
return fallback
def _card_has_us_address(vcard_lines: list[str]) -> bool:
for line in vcard_lines:
if not line.startswith("ADR"):
continue
value = line.split(":", 1)[1] if ":" in line else ""
parts = value.split(";")
country = parts[6] if len(parts) >= 7 else value
normalized_country = country.strip().lower()
if normalized_country in {"us", "usa", "united states", "united states of america"}:
return True
if "united states" in normalized_country:
return True
return False
def _normalize_tel_value(
value: str,
*,
only_eleven: bool,
require_us_address_for_ten: bool,
has_us_address: bool,
normalize_existing_plus_one: bool,
) -> str | None:
stripped_value = value.strip()
digits = _digits_only(stripped_value)
if stripped_value.startswith("+"):
if normalize_existing_plus_one and digits.startswith("1") and len(digits) == 11:
return _e164_plus_one(digits[1:])
return None
if len(digits) == 11 and digits.startswith("1"):
return _e164_plus_one(digits[1:])
if only_eleven:
return None
if len(digits) != 10:
return None
if require_us_address_for_ten and not has_us_address:
return None
if digits[0] not in "23456789":
return None
return _e164_plus_one(digits)
def _update_vcard(
vcard_text: str,
*,
fallback_name: str,
only_eleven: bool,
require_us_address_for_ten: bool,
normalize_existing_plus_one: bool,
) -> tuple[str, list[tuple[str, str]], str]:
lines = _unfold_vcard_lines(vcard_text)
display_name = _extract_display_name(lines, fallback_name)
has_us_address = _card_has_us_address(lines)
changes: list[tuple[str, str]] = []
updated_lines: list[str] = []
saw_rev = False
for line in lines:
if line.startswith("REV:"):
saw_rev = True
updated_lines.append(line)
continue
if not line.startswith("TEL"):
updated_lines.append(line)
continue
if ":" not in line:
updated_lines.append(line)
continue
left, right = line.split(":", 1)
new_value = _normalize_tel_value(
right,
only_eleven=only_eleven,
require_us_address_for_ten=require_us_address_for_ten,
has_us_address=has_us_address,
normalize_existing_plus_one=normalize_existing_plus_one,
)
if new_value is None or new_value == right:
updated_lines.append(line)
continue
updated_lines.append(f"{left}:{new_value}")
changes.append((right, new_value))
if changes:
new_rev_line = f"REV:{_now_rev_value()}"
updated_with_rev: list[str] = []
replaced_existing_rev = False
for line in updated_lines:
if line.startswith("REV:"):
updated_with_rev.append(new_rev_line)
replaced_existing_rev = True
else:
updated_with_rev.append(line)
if not replaced_existing_rev:
inserted = False
output_lines: list[str] = []
for line in updated_with_rev:
if not inserted and line.startswith("FN:"):
output_lines.append(line)
output_lines.append(new_rev_line)
inserted = True
else:
output_lines.append(line)
if inserted:
updated_with_rev = output_lines
else:
end_index = None
for index, line in enumerate(updated_with_rev):
if line == "END:VCARD":
end_index = index
break
if end_index is None:
updated_with_rev.append(new_rev_line)
else:
updated_with_rev.insert(end_index, new_rev_line)
updated_lines = updated_with_rev
return _fold_vcard_lines(updated_lines), changes, display_name
def _auth_header(username: str, password: str) -> str:
userpass = f"{username}:{password}".encode("utf-8")
return "Basic " + base64.b64encode(userpass).decode("ascii")
def _dav_request(
*,
method: str,
url: str,
auth: str,
headers: dict[str, str] | None = None,
body: bytes | None = None,
max_redirects: int = 5,
) -> tuple[int, dict[str, str], bytes, str]:
current_url = url
current_method = method
current_body = body
for _ in range(max_redirects + 1):
request_headers = {
"Authorization": auth,
"User-Agent": "plus_1ify_icloud_contacts_phone_numbers/1.0",
}
if headers:
request_headers.update(headers)
request = urllib.request.Request(
current_url,
data=current_body,
headers=request_headers,
method=current_method,
)
try:
with urllib.request.urlopen(request, timeout=60) as response:
response_body = response.read()
response_headers = {k.lower(): v for k, v in response.headers.items()}
if response.status in {301, 302, 303, 307, 308}:
location = response.headers.get("Location")
if not location:
return response.status, response_headers, response_body, current_url
current_url = urllib.parse.urljoin(current_url, location)
if response.status == 303:
current_method = "GET"
current_body = None
continue
return response.status, response_headers, response_body, current_url
except urllib.error.HTTPError as error:
if error.code in {301, 302, 303, 307, 308}:
location = error.headers.get("Location")
if not location:
raise
current_url = urllib.parse.urljoin(current_url, location)
if error.code == 303:
current_method = "GET"
current_body = None
continue
raise
raise RuntimeError(f"Too many redirects starting from {url}")
def _parse_multistatus(xml_bytes: bytes) -> ET.Element:
try:
return ET.fromstring(xml_bytes)
except ET.ParseError as error:
raise RuntimeError(f"Failed to parse DAV XML: {error}") from error
def _href_from_prop(element: ET.Element, xpath: str) -> str | None:
href = element.findtext(xpath, namespaces=NS)
return href.strip() if href else None
def _discover_addressbooks(*, base_url: str, auth: str) -> list[tuple[str, str]]:
base_url = base_url.rstrip("/")
discovery_urls = [
urllib.parse.urljoin(base_url + "/", ".well-known/carddav"),
base_url + "/co",
base_url + "/",
]
propfind_discovery = textwrap.dedent(
"""\
<?xml version="1.0" encoding="utf-8"?>
<d:propfind xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:carddav">
<d:prop>
<d:current-user-principal />
<d:principal-URL />
<c:addressbook-home-set />
</d:prop>
</d:propfind>
"""
).encode("utf-8")
principal_url = None
home_url = None
last_status: int | None = None
last_final_url: str | None = None
last_body: bytes = b""
for discovery_url in discovery_urls:
status, _, body, final_url = _dav_request(
method="PROPFIND",
url=discovery_url,
auth=auth,
headers={"Depth": "0", "Content-Type": "application/xml; charset=utf-8"},
body=propfind_discovery,
)
last_status = status
last_final_url = final_url
last_body = body
if status not in {207, 200}:
continue
try:
multistatus = _parse_multistatus(body)
except Exception:
continue
for response in multistatus.findall("d:response", namespaces=NS):
if not principal_url:
current_user_href = _href_from_prop(response, ".//d:current-user-principal/d:href")
if current_user_href:
principal_url = urllib.parse.urljoin(final_url, current_user_href)
if not principal_url:
principal_href = _href_from_prop(response, ".//d:principal-URL/d:href")
if principal_href:
principal_url = urllib.parse.urljoin(final_url, principal_href)
if not home_url:
home_href = _href_from_prop(response, ".//c:addressbook-home-set/d:href")
if home_href:
home_url = urllib.parse.urljoin(final_url, home_href)
if principal_url or home_url:
break
if principal_url or home_url:
break
if home_url is None and principal_url is not None:
propfind_home_set = textwrap.dedent(
"""\
<?xml version="1.0" encoding="utf-8"?>
<d:propfind xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:carddav">
<d:prop>
<c:addressbook-home-set />
</d:prop>
</d:propfind>
"""
).encode("utf-8")
status, _, body, principal_final_url = _dav_request(
method="PROPFIND",
url=principal_url,
auth=auth,
headers={"Depth": "0", "Content-Type": "application/xml; charset=utf-8"},
body=propfind_home_set,
)
if status not in {207, 200}:
raise RuntimeError(f"Unexpected PROPFIND status {status} from {principal_final_url}")
multistatus = _parse_multistatus(body)
home_href = None
for response in multistatus.findall("d:response", namespaces=NS):
home_href = _href_from_prop(response, ".//c:addressbook-home-set/d:href")
if home_href:
break
if not home_href:
raise RuntimeError("Failed to discover addressbook-home-set href")
home_url = urllib.parse.urljoin(principal_final_url, home_href)
if home_url is None:
status_text = str(last_status) if last_status is not None else "unknown"
url_text = last_final_url or "(none)"
snippet = last_body.decode("utf-8", errors="replace")[:400]
raise RuntimeError(f"Failed to discover principal/home-set (status={status_text}, url={url_text}, body={snippet!r})")
propfind_addressbooks = textwrap.dedent(
"""\
<?xml version="1.0" encoding="utf-8"?>
<d:propfind xmlns:d="DAV:">
<d:prop>
<d:displayname />
<d:resourcetype />
</d:prop>
</d:propfind>
"""
).encode("utf-8")
status, _, body, home_final_url = _dav_request(
method="PROPFIND",
url=home_url,
auth=auth,
headers={"Depth": "1", "Content-Type": "application/xml; charset=utf-8"},
body=propfind_addressbooks,
)
if status not in {207, 200}:
raise RuntimeError(f"Unexpected PROPFIND status {status} from {home_final_url}")
multistatus = _parse_multistatus(body)
addressbooks: list[tuple[str, str]] = []
for response in multistatus.findall("d:response", namespaces=NS):
href = response.findtext("d:href", namespaces=NS)
if not href:
continue
href_url = urllib.parse.urljoin(home_final_url, href.strip())
displayname = response.findtext(".//d:displayname", namespaces=NS) or href.strip().strip("/")
if response.find(".//d:resourcetype/c:addressbook", namespaces=NS) is None:
continue
addressbooks.append((displayname.strip(), href_url))
if not addressbooks:
raise RuntimeError("No addressbooks found in addressbook-home-set")
return addressbooks
def _addressbook_query(*, addressbook_url: str, auth: str, limit: int | None) -> list[dict[str, str]]:
report_body = textwrap.dedent(
"""\
<?xml version="1.0" encoding="utf-8"?>
<c:addressbook-query xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:carddav">
<d:prop>
<d:getetag />
<c:address-data />
</d:prop>
</c:addressbook-query>
"""
).encode("utf-8")
status, _, body, final_url = _dav_request(
method="REPORT",
url=addressbook_url,
auth=auth,
headers={"Depth": "1", "Content-Type": "application/xml; charset=utf-8"},
body=report_body,
)
if status != 207:
raise RuntimeError(f"Unexpected REPORT status {status} from {final_url}")
multistatus = _parse_multistatus(body)
resources: list[dict[str, str]] = []
for response in multistatus.findall("d:response", namespaces=NS):
href = response.findtext("d:href", namespaces=NS)
if not href:
continue
href_url = urllib.parse.urljoin(final_url, href.strip())
if href_url.rstrip("/") == final_url.rstrip("/"):
continue
etag = response.findtext(".//d:getetag", namespaces=NS) or ""
address_data = response.findtext(".//c:address-data", namespaces=NS) or ""
if not address_data:
continue
resources.append({"href": href_url, "etag": etag, "vcard": address_data})
if limit is not None and len(resources) >= limit:
break
return resources
def _put_vcard(*, href: str, etag: str, vcard: str, auth: str) -> None:
headers = {
"Content-Type": "text/vcard; charset=utf-8",
}
if etag:
headers["If-Match"] = etag
status, _, _, final_url = _dav_request(
method="PUT",
url=href,
auth=auth,
headers=headers,
body=vcard.encode("utf-8"),
)
if status not in {200, 201, 204}:
raise RuntimeError(f"Unexpected PUT status {status} for {final_url}")
def main(argv: list[str]) -> int:
epilog = textwrap.dedent(
"""\
Requirements:
- iCloud Contacts enabled
- An app-specific password (Apple ID -> Sign-In and Security -> App-Specific Passwords)
Environment:
- ICLOUD_USERNAME: optional default for --username
- ICLOUD_APP_PASSWORD: optional; if not set, you'll be prompted
Safety:
- Default is a dry run; no changes are written
- Use --commit to write to iCloud (server-side)
Examples:
export ICLOUD_USERNAME="you@icloud.com"
export ICLOUD_APP_PASSWORD="xxxx-xxxx-xxxx-xxxx"
plus_1ify_icloud_contacts_phone_numbers --limit 25
plus_1ify_icloud_contacts_phone_numbers --commit
"""
)
parser = argparse.ArgumentParser(
prog="plus_1ify_icloud_contacts_phone_numbers",
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Normalize iCloud Contacts phone numbers to +1 (AAA) BBB-CCCC.",
epilog=epilog,
usage="%(prog)s [--limit N] [--only-11] [--require-us-address-for-10] [--no-normalize-plus-one] [--addressbook-url URL] [--commit]",
)
parser.add_argument("--username", default=None)
parser.add_argument("--addressbook-url", default=None)
parser.add_argument("--limit", type=int, default=None)
parser.add_argument("--only-11", action="store_true", help="Only convert 11-digit numbers starting with 1")
parser.add_argument("--require-us-address-for-10", action="store_true", help="Only convert 10-digit numbers if ADR country is US/United States")
parser.add_argument("--no-normalize-plus-one", action="store_true", help="Do not rewrite existing +1 numbers into '+1 (AAA) BBB-CCCC'")
parser.add_argument("--commit", action="store_true", help="Commit changes (default is dry run)")
args = parser.parse_args(argv)
username = args.username or os.environ.get("ICLOUD_USERNAME")
if not username:
try:
username = input("iCloud username: ").strip()
except (EOFError, KeyboardInterrupt):
print("\nAborted.", file=sys.stderr)
return 130
if not username:
print("Missing iCloud username.", file=sys.stderr)
return 2
password = os.environ.get("ICLOUD_APP_PASSWORD") or getpass.getpass(f"App-specific password for {username}: ")
auth = _auth_header(username, password)
normalize_existing_plus_one = not args.no_normalize_plus_one
dry_run = not args.commit
if args.addressbook_url:
addressbook_url = args.addressbook_url
else:
try:
addressbooks = _discover_addressbooks(base_url=ICLOUD_BASE_URL, auth=auth)
except Exception as error:
print(f"CardDAV discovery failed: {error}", file=sys.stderr)
print(
"\nTry providing the addressbook URL explicitly. For iCloud this often looks like:\n"
" https://pXX-contacts.icloud.com/<DSID>/carddavhome/card/\n"
"or:\n"
" https://contacts.icloud.com/<DSID>/carddavhome/card/\n"
"\nYou can then run:\n"
" plus_1ify_icloud_contacts_phone_numbers --addressbook-url \"...\"\n",
file=sys.stderr,
)
return 1
_, addressbook_url = addressbooks[0]
resources = _addressbook_query(addressbook_url=addressbook_url, auth=auth, limit=args.limit)
changed = 0
skipped_conflicts = 0
for resource in resources:
href = resource["href"]
etag = resource["etag"]
vcard = resource["vcard"]
updated_vcard, changes, display_name = _update_vcard(
vcard,
fallback_name=href,
only_eleven=args.only_11,
require_us_address_for_ten=args.require_us_address_for_10,
normalize_existing_plus_one=normalize_existing_plus_one,
)
if not changes:
continue
changed += 1
for old, new in changes:
print(f"{display_name} : {old} -> {new}")
if dry_run:
continue
try:
_put_vcard(href=href, etag=etag, vcard=updated_vcard, auth=auth)
except urllib.error.HTTPError as error:
if error.code == 412:
skipped_conflicts += 1
print(f"SKIP (etag conflict): {display_name}", file=sys.stderr)
continue
if error.code == 403:
print(f"SKIP (forbidden): {display_name}", file=sys.stderr)
continue
raise
if dry_run:
print(f"DRY RUN: would update {changed} contacts. Re-run with --commit to apply.")
else:
print(f"DONE: updated {changed} contacts.")
if skipped_conflicts:
print(f"Skipped due to conflicts: {skipped_conflicts}", file=sys.stderr)
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment