Skip to content

Instantly share code, notes, and snippets.

@Nateliason
Created March 4, 2026 21:32
Show Gist options
  • Select an option

  • Save Nateliason/071d72f3874cc17208ed7fdb6c28a421 to your computer and use it in GitHub Desktop.

Select an option

Save Nateliason/071d72f3874cc17208ed7fdb6c28a421 to your computer and use it in GitHub Desktop.
xpost CLI - X/Twitter API v2 posting tool
#!/usr/bin/env python3
"""xpost — X/Twitter API v2 CLI for Felix. OAuth 1.0a HMAC-SHA1."""
import sys, os, json, time, uuid, hashlib, hmac, base64, urllib.parse, urllib.request
KEYS_FILE = os.path.expanduser("~/.config/x-api/keys.env")
BASE = "https://api.x.com/2"
def load_keys():
keys = {}
with open(KEYS_FILE) as f:
for line in f:
line = line.strip()
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
keys[k.strip()] = v.strip()
return keys
KEYS = load_keys()
API_KEY = KEYS["X_API_KEY"]
API_SECRET = KEYS["X_API_SECRET"]
ACCESS_TOKEN = KEYS["X_ACCESS_TOKEN"]
ACCESS_SECRET = KEYS["X_ACCESS_TOKEN_SECRET"]
USER_ID = KEYS.get("X_USER_ID", "")
def pct(s):
return urllib.parse.quote(str(s), safe="")
def oauth_header(method, url, query_params=None):
nonce = uuid.uuid4().hex
ts = str(int(time.time()))
oauth = {
"oauth_consumer_key": API_KEY,
"oauth_nonce": nonce,
"oauth_signature_method": "HMAC-SHA1",
"oauth_timestamp": ts,
"oauth_token": ACCESS_TOKEN,
"oauth_version": "1.0",
}
all_params = {**oauth, **(query_params or {})}
sorted_str = "&".join(f"{pct(k)}={pct(v)}" for k, v in sorted(all_params.items()))
base = f"{method}&{pct(url)}&{pct(sorted_str)}"
key = f"{pct(API_SECRET)}&{pct(ACCESS_SECRET)}"
sig = base64.b64encode(hmac.new(key.encode(), base.encode(), hashlib.sha1).digest()).decode()
oauth["oauth_signature"] = sig
return "OAuth " + ", ".join(f'{pct(k)}="{pct(v)}"' for k, v in sorted(oauth.items()))
def api_get(path, params=None):
url = f"{BASE}{path}"
auth = oauth_header("GET", url, params)
qs = urllib.parse.urlencode(params) if params else ""
full = f"{url}?{qs}" if qs else url
req = urllib.request.Request(full, headers={"Authorization": auth})
try:
with urllib.request.urlopen(req) as r:
return json.loads(r.read())
except urllib.error.HTTPError as e:
return json.loads(e.read())
def api_post(path, body):
url = f"{BASE}{path}"
auth = oauth_header("POST", url)
data = json.dumps(body).encode()
req = urllib.request.Request(url, data=data, method="POST",
headers={"Authorization": auth, "Content-Type": "application/json"})
try:
with urllib.request.urlopen(req) as r:
return json.loads(r.read())
except urllib.error.HTTPError as e:
return json.loads(e.read())
UPLOAD_BASE = "https://upload.twitter.com/1.1"
def media_upload(file_path):
"""Chunked media upload (v1.1) — supports images and video."""
import mimetypes
mime = mimetypes.guess_type(file_path)[0] or "application/octet-stream"
fsize = os.path.getsize(file_path)
media_category = "tweet_video" if mime.startswith("video") else "tweet_image"
# INIT
url = f"{UPLOAD_BASE}/media/upload.json"
init_params = {
"command": "INIT",
"total_bytes": str(fsize),
"media_type": mime,
"media_category": media_category,
}
auth = oauth_header("POST", url, init_params)
data = urllib.parse.urlencode(init_params).encode()
req = urllib.request.Request(url, data=data, method="POST",
headers={"Authorization": auth, "Content-Type": "application/x-www-form-urlencoded"})
try:
with urllib.request.urlopen(req) as r:
init_resp = json.loads(r.read())
except urllib.error.HTTPError as e:
print(f"INIT failed ({e.code}): {e.read().decode()}", file=sys.stderr)
sys.exit(1)
media_id = str(init_resp["media_id"])
# APPEND (chunked, 5MB per chunk)
CHUNK = 5 * 1024 * 1024
seg = 0
with open(file_path, "rb") as f:
while True:
chunk = f.read(CHUNK)
if not chunk:
break
boundary = uuid.uuid4().hex
body = b""
for k, v in [("command", "APPEND"), ("media_id", media_id), ("segment_index", str(seg))]:
body += f"--{boundary}\r\nContent-Disposition: form-data; name=\"{k}\"\r\n\r\n{v}\r\n".encode()
body += f"--{boundary}\r\nContent-Disposition: form-data; name=\"media\"; filename=\"chunk\"\r\nContent-Type: application/octet-stream\r\n\r\n".encode()
body += chunk
body += f"\r\n--{boundary}--\r\n".encode()
auth = oauth_header("POST", url)
req = urllib.request.Request(url, data=body, method="POST",
headers={"Authorization": auth, "Content-Type": f"multipart/form-data; boundary={boundary}"})
try:
with urllib.request.urlopen(req) as r:
r.read()
except urllib.error.HTTPError as e:
print(f"APPEND seg {seg} failed ({e.code}): {e.read().decode()}", file=sys.stderr)
sys.exit(1)
seg += 1
# FINALIZE
fin_params = {"command": "FINALIZE", "media_id": media_id}
auth = oauth_header("POST", url, fin_params)
data = urllib.parse.urlencode(fin_params).encode()
req = urllib.request.Request(url, data=data, method="POST",
headers={"Authorization": auth, "Content-Type": "application/x-www-form-urlencoded"})
with urllib.request.urlopen(req) as r:
fin_resp = json.loads(r.read())
# Wait for processing (video)
if "processing_info" in fin_resp:
while True:
state = fin_resp.get("processing_info", {}).get("state")
if state == "succeeded":
break
if state == "failed":
print(json.dumps(fin_resp), file=sys.stderr)
sys.exit(1)
wait = fin_resp.get("processing_info", {}).get("check_after_secs", 5)
time.sleep(wait)
check_params = {"command": "STATUS", "media_id": media_id}
auth = oauth_header("GET", url, check_params)
check_url = f"{url}?{urllib.parse.urlencode(check_params)}"
req = urllib.request.Request(check_url, method="GET", headers={"Authorization": auth})
with urllib.request.urlopen(req) as r:
fin_resp = json.loads(r.read())
return media_id
def api_delete(path):
url = f"{BASE}{path}"
auth = oauth_header("DELETE", url)
req = urllib.request.Request(url, method="DELETE", headers={"Authorization": auth})
try:
with urllib.request.urlopen(req) as r:
return json.loads(r.read())
except urllib.error.HTTPError as e:
return json.loads(e.read())
def fmt(data, mode):
if mode == "text":
if "data" in data:
items = data["data"] if isinstance(data["data"], list) else [data["data"]]
users = {u["id"]: u for u in data.get("includes", {}).get("users", [])}
for t in items:
aid = t.get("author_id", "")
u = users.get(aid, {}).get("username", aid)
print(f'[@{u}] ({t.get("id","")}) {t.get("created_at","")}')
print(f' {t.get("text","")}')
print()
elif "errors" in data:
for e in data["errors"]:
print(f'ERROR: {e.get("message", e)}')
else:
print(json.dumps(data, indent=2))
elif mode == "pretty":
print(json.dumps(data, indent=2))
else:
print(json.dumps(data))
# --- CLI ---
args = sys.argv[1:]
mode = "json"
filtered = []
for a in args:
if a == "--pretty": mode = "pretty"
elif a == "--text": mode = "text"
elif a == "--json": mode = "json"
else: filtered.append(a)
cmd = filtered[0] if filtered else "help"
rest = filtered[1:]
TWEET_FIELDS = "created_at,author_id,conversation_id,in_reply_to_user_id,public_metrics"
EXPANSIONS = "author_id"
USER_FIELDS = "username,name"
if cmd == "post":
text = None
media_path = None
i = 0
while i < len(rest):
if rest[i] == "--media" and i+1 < len(rest):
media_path = rest[i+1]; i += 2
elif text is None:
text = rest[i]; i += 1
else:
i += 1
if not text and not media_path:
sys.exit("Usage: xpost post \"text\" [--media path]")
body = {}
if text:
body["text"] = text
if media_path:
mid = media_upload(os.path.expanduser(media_path))
body["media"] = {"media_ids": [mid]}
fmt(api_post("/tweets", body), mode)
elif cmd == "reply":
if len(rest) < 2: sys.exit("Usage: xpost reply <tweet_id> \"text\"")
fmt(api_post("/tweets", {"text": rest[1], "reply": {"in_reply_to_tweet_id": rest[0]}}), mode)
elif cmd == "quote":
if len(rest) < 2: sys.exit("Usage: xpost quote <tweet_id> \"text\"")
fmt(api_post("/tweets", {"text": rest[1], "quote_tweet_id": rest[0]}), mode)
elif cmd == "mentions":
count = "20"
i = 0
pagination_token = None
while i < len(rest):
if rest[i] == "--count": count = rest[i+1]; i += 2
elif rest[i] == "--next": pagination_token = rest[i+1]; i += 2
else: i += 1
params = {"max_results": count, "tweet.fields": TWEET_FIELDS, "expansions": EXPANSIONS, "user.fields": USER_FIELDS}
if pagination_token:
params["pagination_token"] = pagination_token
fmt(api_get(f"/users/{USER_ID}/mentions", params), mode)
elif cmd == "timeline":
username = rest[0] if rest else sys.exit("Usage: xpost timeline <username> [--count N]")
count = "10"
for i in range(1, len(rest)):
if rest[i] == "--count" and i+1 < len(rest): count = rest[i+1]
udata = api_get(f"/users/by/username/{username}", {"user.fields": "id"})
uid = udata.get("data", {}).get("id")
if not uid: fmt(udata, mode); sys.exit(1)
fmt(api_get(f"/users/{uid}/tweets", {"max_results": count, "tweet.fields": TWEET_FIELDS, "expansions": EXPANSIONS, "user.fields": USER_FIELDS}), mode)
elif cmd == "search":
query = rest[0] if rest else sys.exit('Usage: xpost search "query" [--count N]')
count = "10"
for i in range(1, len(rest)):
if rest[i] == "--count" and i+1 < len(rest): count = rest[i+1]
fmt(api_get("/tweets/search/recent", {"query": query, "max_results": count, "tweet.fields": TWEET_FIELDS, "expansions": EXPANSIONS, "user.fields": USER_FIELDS}), mode)
elif cmd == "home":
count = "20"
for i in range(len(rest)):
if rest[i] == "--count" and i+1 < len(rest): count = rest[i+1]
fmt(api_get(f"/users/{USER_ID}/timelines/reverse_chronological", {"max_results": count, "tweet.fields": TWEET_FIELDS, "expansions": EXPANSIONS, "user.fields": USER_FIELDS}), mode)
elif cmd == "like":
tid = rest[0] if rest else sys.exit("Usage: xpost like <tweet_id>")
fmt(api_post(f"/users/{USER_ID}/likes", {"tweet_id": tid}), mode)
elif cmd == "retweet":
tid = rest[0] if rest else sys.exit("Usage: xpost retweet <tweet_id>")
fmt(api_post(f"/users/{USER_ID}/retweets", {"tweet_id": tid}), mode)
elif cmd == "delete":
tid = rest[0] if rest else sys.exit("Usage: xpost delete <tweet_id>")
fmt(api_delete(f"/tweets/{tid}"), mode)
elif cmd == "get":
tid = rest[0] if rest else sys.exit("Usage: xpost get <tweet_id>")
fmt(api_get(f"/tweets/{tid}", {"tweet.fields": TWEET_FIELDS, "expansions": EXPANSIONS, "user.fields": USER_FIELDS}), mode)
elif cmd == "me":
fmt(api_get("/users/me", {"user.fields": "id,username,name,public_metrics"}), mode)
else:
print("""xpost — X/Twitter API v2 CLI
Usage: xpost <command> [args] [--pretty|--text|--json]
Commands:
post "text" Post a tweet
reply <id> "text" Reply to a tweet
quote <id> "text" Quote tweet
mentions [--count N] Get recent mentions
timeline <user> [--count N] Get user timeline
search "query" [--count N] Search recent tweets
home [--count N] Home timeline (reverse chron)
like <id> Like a tweet
retweet <id> Retweet
delete <id> Delete a tweet
get <id> Get a single tweet
me Get authenticated user info""")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment