Created
March 4, 2026 21:32
-
-
Save Nateliason/071d72f3874cc17208ed7fdb6c28a421 to your computer and use it in GitHub Desktop.
xpost CLI - X/Twitter API v2 posting tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """xpost — X/Twitter API v2 CLI for Felix. OAuth 1.0a HMAC-SHA1.""" | |
| import sys, os, json, time, uuid, hashlib, hmac, base64, urllib.parse, urllib.request | |
| KEYS_FILE = os.path.expanduser("~/.config/x-api/keys.env") | |
| BASE = "https://api.x.com/2" | |
| def load_keys(): | |
| keys = {} | |
| with open(KEYS_FILE) as f: | |
| for line in f: | |
| line = line.strip() | |
| if "=" in line and not line.startswith("#"): | |
| k, v = line.split("=", 1) | |
| keys[k.strip()] = v.strip() | |
| return keys | |
| KEYS = load_keys() | |
| API_KEY = KEYS["X_API_KEY"] | |
| API_SECRET = KEYS["X_API_SECRET"] | |
| ACCESS_TOKEN = KEYS["X_ACCESS_TOKEN"] | |
| ACCESS_SECRET = KEYS["X_ACCESS_TOKEN_SECRET"] | |
| USER_ID = KEYS.get("X_USER_ID", "") | |
| def pct(s): | |
| return urllib.parse.quote(str(s), safe="") | |
| def oauth_header(method, url, query_params=None): | |
| nonce = uuid.uuid4().hex | |
| ts = str(int(time.time())) | |
| oauth = { | |
| "oauth_consumer_key": API_KEY, | |
| "oauth_nonce": nonce, | |
| "oauth_signature_method": "HMAC-SHA1", | |
| "oauth_timestamp": ts, | |
| "oauth_token": ACCESS_TOKEN, | |
| "oauth_version": "1.0", | |
| } | |
| all_params = {**oauth, **(query_params or {})} | |
| sorted_str = "&".join(f"{pct(k)}={pct(v)}" for k, v in sorted(all_params.items())) | |
| base = f"{method}&{pct(url)}&{pct(sorted_str)}" | |
| key = f"{pct(API_SECRET)}&{pct(ACCESS_SECRET)}" | |
| sig = base64.b64encode(hmac.new(key.encode(), base.encode(), hashlib.sha1).digest()).decode() | |
| oauth["oauth_signature"] = sig | |
| return "OAuth " + ", ".join(f'{pct(k)}="{pct(v)}"' for k, v in sorted(oauth.items())) | |
| def api_get(path, params=None): | |
| url = f"{BASE}{path}" | |
| auth = oauth_header("GET", url, params) | |
| qs = urllib.parse.urlencode(params) if params else "" | |
| full = f"{url}?{qs}" if qs else url | |
| req = urllib.request.Request(full, headers={"Authorization": auth}) | |
| try: | |
| with urllib.request.urlopen(req) as r: | |
| return json.loads(r.read()) | |
| except urllib.error.HTTPError as e: | |
| return json.loads(e.read()) | |
| def api_post(path, body): | |
| url = f"{BASE}{path}" | |
| auth = oauth_header("POST", url) | |
| data = json.dumps(body).encode() | |
| req = urllib.request.Request(url, data=data, method="POST", | |
| headers={"Authorization": auth, "Content-Type": "application/json"}) | |
| try: | |
| with urllib.request.urlopen(req) as r: | |
| return json.loads(r.read()) | |
| except urllib.error.HTTPError as e: | |
| return json.loads(e.read()) | |
| UPLOAD_BASE = "https://upload.twitter.com/1.1" | |
| def media_upload(file_path): | |
| """Chunked media upload (v1.1) — supports images and video.""" | |
| import mimetypes | |
| mime = mimetypes.guess_type(file_path)[0] or "application/octet-stream" | |
| fsize = os.path.getsize(file_path) | |
| media_category = "tweet_video" if mime.startswith("video") else "tweet_image" | |
| # INIT | |
| url = f"{UPLOAD_BASE}/media/upload.json" | |
| init_params = { | |
| "command": "INIT", | |
| "total_bytes": str(fsize), | |
| "media_type": mime, | |
| "media_category": media_category, | |
| } | |
| auth = oauth_header("POST", url, init_params) | |
| data = urllib.parse.urlencode(init_params).encode() | |
| req = urllib.request.Request(url, data=data, method="POST", | |
| headers={"Authorization": auth, "Content-Type": "application/x-www-form-urlencoded"}) | |
| try: | |
| with urllib.request.urlopen(req) as r: | |
| init_resp = json.loads(r.read()) | |
| except urllib.error.HTTPError as e: | |
| print(f"INIT failed ({e.code}): {e.read().decode()}", file=sys.stderr) | |
| sys.exit(1) | |
| media_id = str(init_resp["media_id"]) | |
| # APPEND (chunked, 5MB per chunk) | |
| CHUNK = 5 * 1024 * 1024 | |
| seg = 0 | |
| with open(file_path, "rb") as f: | |
| while True: | |
| chunk = f.read(CHUNK) | |
| if not chunk: | |
| break | |
| boundary = uuid.uuid4().hex | |
| body = b"" | |
| for k, v in [("command", "APPEND"), ("media_id", media_id), ("segment_index", str(seg))]: | |
| body += f"--{boundary}\r\nContent-Disposition: form-data; name=\"{k}\"\r\n\r\n{v}\r\n".encode() | |
| body += f"--{boundary}\r\nContent-Disposition: form-data; name=\"media\"; filename=\"chunk\"\r\nContent-Type: application/octet-stream\r\n\r\n".encode() | |
| body += chunk | |
| body += f"\r\n--{boundary}--\r\n".encode() | |
| auth = oauth_header("POST", url) | |
| req = urllib.request.Request(url, data=body, method="POST", | |
| headers={"Authorization": auth, "Content-Type": f"multipart/form-data; boundary={boundary}"}) | |
| try: | |
| with urllib.request.urlopen(req) as r: | |
| r.read() | |
| except urllib.error.HTTPError as e: | |
| print(f"APPEND seg {seg} failed ({e.code}): {e.read().decode()}", file=sys.stderr) | |
| sys.exit(1) | |
| seg += 1 | |
| # FINALIZE | |
| fin_params = {"command": "FINALIZE", "media_id": media_id} | |
| auth = oauth_header("POST", url, fin_params) | |
| data = urllib.parse.urlencode(fin_params).encode() | |
| req = urllib.request.Request(url, data=data, method="POST", | |
| headers={"Authorization": auth, "Content-Type": "application/x-www-form-urlencoded"}) | |
| with urllib.request.urlopen(req) as r: | |
| fin_resp = json.loads(r.read()) | |
| # Wait for processing (video) | |
| if "processing_info" in fin_resp: | |
| while True: | |
| state = fin_resp.get("processing_info", {}).get("state") | |
| if state == "succeeded": | |
| break | |
| if state == "failed": | |
| print(json.dumps(fin_resp), file=sys.stderr) | |
| sys.exit(1) | |
| wait = fin_resp.get("processing_info", {}).get("check_after_secs", 5) | |
| time.sleep(wait) | |
| check_params = {"command": "STATUS", "media_id": media_id} | |
| auth = oauth_header("GET", url, check_params) | |
| check_url = f"{url}?{urllib.parse.urlencode(check_params)}" | |
| req = urllib.request.Request(check_url, method="GET", headers={"Authorization": auth}) | |
| with urllib.request.urlopen(req) as r: | |
| fin_resp = json.loads(r.read()) | |
| return media_id | |
| def api_delete(path): | |
| url = f"{BASE}{path}" | |
| auth = oauth_header("DELETE", url) | |
| req = urllib.request.Request(url, method="DELETE", headers={"Authorization": auth}) | |
| try: | |
| with urllib.request.urlopen(req) as r: | |
| return json.loads(r.read()) | |
| except urllib.error.HTTPError as e: | |
| return json.loads(e.read()) | |
| def fmt(data, mode): | |
| if mode == "text": | |
| if "data" in data: | |
| items = data["data"] if isinstance(data["data"], list) else [data["data"]] | |
| users = {u["id"]: u for u in data.get("includes", {}).get("users", [])} | |
| for t in items: | |
| aid = t.get("author_id", "") | |
| u = users.get(aid, {}).get("username", aid) | |
| print(f'[@{u}] ({t.get("id","")}) {t.get("created_at","")}') | |
| print(f' {t.get("text","")}') | |
| print() | |
| elif "errors" in data: | |
| for e in data["errors"]: | |
| print(f'ERROR: {e.get("message", e)}') | |
| else: | |
| print(json.dumps(data, indent=2)) | |
| elif mode == "pretty": | |
| print(json.dumps(data, indent=2)) | |
| else: | |
| print(json.dumps(data)) | |
| # --- CLI --- | |
| args = sys.argv[1:] | |
| mode = "json" | |
| filtered = [] | |
| for a in args: | |
| if a == "--pretty": mode = "pretty" | |
| elif a == "--text": mode = "text" | |
| elif a == "--json": mode = "json" | |
| else: filtered.append(a) | |
| cmd = filtered[0] if filtered else "help" | |
| rest = filtered[1:] | |
| TWEET_FIELDS = "created_at,author_id,conversation_id,in_reply_to_user_id,public_metrics" | |
| EXPANSIONS = "author_id" | |
| USER_FIELDS = "username,name" | |
| if cmd == "post": | |
| text = None | |
| media_path = None | |
| i = 0 | |
| while i < len(rest): | |
| if rest[i] == "--media" and i+1 < len(rest): | |
| media_path = rest[i+1]; i += 2 | |
| elif text is None: | |
| text = rest[i]; i += 1 | |
| else: | |
| i += 1 | |
| if not text and not media_path: | |
| sys.exit("Usage: xpost post \"text\" [--media path]") | |
| body = {} | |
| if text: | |
| body["text"] = text | |
| if media_path: | |
| mid = media_upload(os.path.expanduser(media_path)) | |
| body["media"] = {"media_ids": [mid]} | |
| fmt(api_post("/tweets", body), mode) | |
| elif cmd == "reply": | |
| if len(rest) < 2: sys.exit("Usage: xpost reply <tweet_id> \"text\"") | |
| fmt(api_post("/tweets", {"text": rest[1], "reply": {"in_reply_to_tweet_id": rest[0]}}), mode) | |
| elif cmd == "quote": | |
| if len(rest) < 2: sys.exit("Usage: xpost quote <tweet_id> \"text\"") | |
| fmt(api_post("/tweets", {"text": rest[1], "quote_tweet_id": rest[0]}), mode) | |
| elif cmd == "mentions": | |
| count = "20" | |
| i = 0 | |
| pagination_token = None | |
| while i < len(rest): | |
| if rest[i] == "--count": count = rest[i+1]; i += 2 | |
| elif rest[i] == "--next": pagination_token = rest[i+1]; i += 2 | |
| else: i += 1 | |
| params = {"max_results": count, "tweet.fields": TWEET_FIELDS, "expansions": EXPANSIONS, "user.fields": USER_FIELDS} | |
| if pagination_token: | |
| params["pagination_token"] = pagination_token | |
| fmt(api_get(f"/users/{USER_ID}/mentions", params), mode) | |
| elif cmd == "timeline": | |
| username = rest[0] if rest else sys.exit("Usage: xpost timeline <username> [--count N]") | |
| count = "10" | |
| for i in range(1, len(rest)): | |
| if rest[i] == "--count" and i+1 < len(rest): count = rest[i+1] | |
| udata = api_get(f"/users/by/username/{username}", {"user.fields": "id"}) | |
| uid = udata.get("data", {}).get("id") | |
| if not uid: fmt(udata, mode); sys.exit(1) | |
| fmt(api_get(f"/users/{uid}/tweets", {"max_results": count, "tweet.fields": TWEET_FIELDS, "expansions": EXPANSIONS, "user.fields": USER_FIELDS}), mode) | |
| elif cmd == "search": | |
| query = rest[0] if rest else sys.exit('Usage: xpost search "query" [--count N]') | |
| count = "10" | |
| for i in range(1, len(rest)): | |
| if rest[i] == "--count" and i+1 < len(rest): count = rest[i+1] | |
| fmt(api_get("/tweets/search/recent", {"query": query, "max_results": count, "tweet.fields": TWEET_FIELDS, "expansions": EXPANSIONS, "user.fields": USER_FIELDS}), mode) | |
| elif cmd == "home": | |
| count = "20" | |
| for i in range(len(rest)): | |
| if rest[i] == "--count" and i+1 < len(rest): count = rest[i+1] | |
| fmt(api_get(f"/users/{USER_ID}/timelines/reverse_chronological", {"max_results": count, "tweet.fields": TWEET_FIELDS, "expansions": EXPANSIONS, "user.fields": USER_FIELDS}), mode) | |
| elif cmd == "like": | |
| tid = rest[0] if rest else sys.exit("Usage: xpost like <tweet_id>") | |
| fmt(api_post(f"/users/{USER_ID}/likes", {"tweet_id": tid}), mode) | |
| elif cmd == "retweet": | |
| tid = rest[0] if rest else sys.exit("Usage: xpost retweet <tweet_id>") | |
| fmt(api_post(f"/users/{USER_ID}/retweets", {"tweet_id": tid}), mode) | |
| elif cmd == "delete": | |
| tid = rest[0] if rest else sys.exit("Usage: xpost delete <tweet_id>") | |
| fmt(api_delete(f"/tweets/{tid}"), mode) | |
| elif cmd == "get": | |
| tid = rest[0] if rest else sys.exit("Usage: xpost get <tweet_id>") | |
| fmt(api_get(f"/tweets/{tid}", {"tweet.fields": TWEET_FIELDS, "expansions": EXPANSIONS, "user.fields": USER_FIELDS}), mode) | |
| elif cmd == "me": | |
| fmt(api_get("/users/me", {"user.fields": "id,username,name,public_metrics"}), mode) | |
| else: | |
| print("""xpost — X/Twitter API v2 CLI | |
| Usage: xpost <command> [args] [--pretty|--text|--json] | |
| Commands: | |
| post "text" Post a tweet | |
| reply <id> "text" Reply to a tweet | |
| quote <id> "text" Quote tweet | |
| mentions [--count N] Get recent mentions | |
| timeline <user> [--count N] Get user timeline | |
| search "query" [--count N] Search recent tweets | |
| home [--count N] Home timeline (reverse chron) | |
| like <id> Like a tweet | |
| retweet <id> Retweet | |
| delete <id> Delete a tweet | |
| get <id> Get a single tweet | |
| me Get authenticated user info""") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment