Skip to content

Instantly share code, notes, and snippets.

@elyase
Last active January 11, 2026 15:35
Show Gist options
  • Select an option

  • Save elyase/81fa60d6762b4cdfac3bcec9abd3a568 to your computer and use it in GitHub Desktop.

Select an option

Save elyase/81fa60d6762b4cdfac3bcec9abd3a568 to your computer and use it in GitHub Desktop.
Telegram Database Reader (cleaned paths + discovery)
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.8"
# dependencies = [
# "pycryptodome>=3.20.0",
# "mmh3>=5.0.0",
# "rich>=13.7.0",
# "sqlcipher3-wheels",
# ]
# ///
"""
Telegram Database Reader - A comprehensive CLI tool for reading and analyzing
messages from local Telegram database files on macOS.
Features:
- 🔓 Live encrypted DB access via SQLCipher (no export step)
- 🔍 Smart message search across all conversations
- 📋 Dialog management (list, search, filter)
- 📊 Multiple output formats (table, JSON, CSV)
- 🎯 Professional CLI with subcommands and legacy support
Quick start:
uv run telegram-reader.py dialogs list
Usage:
./telegram-reader paths
./telegram-reader dialogs list --format json
./telegram-reader messages recent --dialog "GroupName" --limit 10
./telegram-reader messages search "keyword" --limit 20
Dependencies:
- Inline script deps install automatically with `uv run`
- If using pip directly: pip install sqlcipher3-wheels pycryptodome mmh3 rich
Discovery:
- Auto-discovery is macOS-only (Telegram Desktop)
- Use `telegram-reader paths` to list detected locations
Overrides:
- Plaintext DB: --db /path/to/plaintext.db or TELEGRAM_DB_PATH
- Encrypted DB: --encrypted-db /path/to/db_sqlite or TELEGRAM_ENCRYPTED_DB
- Tempkey: --tempkey /path/to/.tempkeyEncrypted or TELEGRAM_TEMPKEY_PATH
- Encrypted DB is read live by default (no export)
"""
import argparse
import binascii
import csv
import datetime
import enum
import io
import json
import os
import sqlite3
import struct
import sys
from contextlib import closing
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import mmh3
import sqlcipher3
from Crypto.Cipher import AES
from Crypto.Hash import SHA512
from rich import box
from rich.console import Console
from rich.table import Table
@dataclass
class Dialog:
"""Represents a chat dialog (user, group, or channel)"""
id: int
name: str
username: Optional[str]
type: str # 'user', 'group', 'channel'
message_count: int
last_activity: Optional[datetime.datetime]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON export"""
data = asdict(self)
if self.last_activity:
data["last_activity"] = self.last_activity.isoformat()
return data
@dataclass
class Message:
"""Represents a single message"""
id: int
dialog_id: int
dialog_name: str
author_id: Optional[int]
author_name: str
timestamp: datetime.datetime
text: str
direction: str # 'sent', 'received'
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON export"""
data = asdict(self)
data["timestamp"] = self.timestamp.isoformat()
return data
# Database access helpers
DEFAULT_PASSWORD = "no-matter-key"
ENV_ENCRYPTED_DB = "TELEGRAM_ENCRYPTED_DB"
ENV_TEMPKEY = "TELEGRAM_TEMPKEY_PATH"
ENV_PLAINTEXT_DB = "TELEGRAM_DB_PATH"
console = Console()
@dataclass
class TelegramDbLocation:
"""Represents a discovered Telegram database location"""
container: str
tempkey: str
encrypted_db: str
account: str
last_modified: datetime.datetime
def to_dict(self) -> Dict[str, Any]:
data = asdict(self)
data["last_modified"] = self.last_modified.isoformat()
return data
def murmur_for_decryption(d):
"""MurmurHash3 for decryption with Telegram's seed"""
return mmh3.hash(d, seed=0xF7CA7FD2)
def tempkey_kdf(password):
"""Key derivation function for tempkey"""
h = SHA512.new()
h.update(password.encode("utf-8"))
digest = h.digest()
key, iv = digest[0:32], digest[-16:]
return key, iv
def tempkey_parse(dataEnc, pwd):
"""Parse tempkey from encrypted data"""
aesKey, aesIV = tempkey_kdf(pwd)
cipher = AES.new(key=aesKey, iv=aesIV, mode=AES.MODE_CBC)
data = cipher.decrypt(dataEnc)
dbKey = data[0:32]
dbSalt = data[32:48]
dbHash = struct.unpack("<i", data[48:52])[0]
dbPad = data[52:]
if len(dbPad) != 12 or any(dbPad):
console.print("⚠️ Warning: dbPad not 12 zeros")
calcHash = murmur_for_decryption(dbKey + dbSalt)
if dbHash != calcHash:
console.print(
f"⚠️ Warning: hash mismatch: {dbHash} != {calcHash} - continuing anyway"
)
return dbKey, dbSalt
def tempkey_pragma(dbKey, dbSalt):
"""Generate SQLCipher pragma for database key"""
key = binascii.hexlify(dbKey + dbSalt).decode("utf-8")
return f"PRAGMA key=\"x'{key}'\";"
def _is_sqlite_database(path: Path) -> bool:
"""Best-effort check for a plaintext SQLite database header"""
try:
with open(path, "rb") as f:
header = f.read(16)
return header.startswith(b"SQLite format 3")
except Exception:
return False
def discover_telegram_db_locations() -> List[TelegramDbLocation]:
"""Discover Telegram Desktop database locations on macOS"""
locations: List[TelegramDbLocation] = []
home = Path.home()
group_containers = home / "Library" / "Group Containers"
if group_containers.exists():
for container in group_containers.iterdir():
if not container.is_dir():
continue
stable = container / "stable"
tempkey = stable / ".tempkeyEncrypted"
if not tempkey.exists():
continue
for encrypted_db in stable.glob("account-*/postbox/db/db_sqlite"):
account_dir = encrypted_db.parents[2].name
account_id = account_dir.replace("account-", "", 1)
last_modified = datetime.datetime.fromtimestamp(
encrypted_db.stat().st_mtime
)
locations.append(
TelegramDbLocation(
container=str(container),
tempkey=str(tempkey),
encrypted_db=str(encrypted_db),
account=account_id,
last_modified=last_modified,
)
)
locations.sort(key=lambda loc: loc.last_modified, reverse=True)
return locations
def _choose_latest_location(
locations: List[TelegramDbLocation],
) -> Optional[TelegramDbLocation]:
if not locations:
return None
return locations[0]
def _resolve_encrypted_paths(
encrypted_db_path: Optional[str] = None, tempkey_path: Optional[str] = None
):
enc_path = Path(encrypted_db_path).expanduser() if encrypted_db_path else None
temp_path = Path(tempkey_path).expanduser() if tempkey_path else None
if enc_path is None:
env_enc = os.getenv(ENV_ENCRYPTED_DB)
if env_enc:
enc_path = Path(env_enc).expanduser()
if temp_path is None:
env_temp = os.getenv(ENV_TEMPKEY)
if env_temp:
temp_path = Path(env_temp).expanduser()
locations = []
if enc_path is None:
if temp_path is not None and temp_path.exists():
stable = temp_path.parent
candidates = list(stable.glob("account-*/postbox/db/db_sqlite"))
if candidates:
enc_path = max(candidates, key=lambda p: p.stat().st_mtime)
else:
locations = discover_telegram_db_locations()
chosen = _choose_latest_location(locations)
if chosen:
enc_path = Path(chosen.encrypted_db)
if temp_path is None:
temp_path = Path(chosen.tempkey)
if temp_path is None and enc_path is not None:
stable = enc_path.parents[3] if len(enc_path.parents) > 3 else None
if stable:
candidate_tempkey = stable / ".tempkeyEncrypted"
if candidate_tempkey.exists():
temp_path = candidate_tempkey
return enc_path, temp_path, locations
def _require_encrypted_paths(
encrypted_db_path: Optional[str] = None, tempkey_path: Optional[str] = None
) -> Tuple[Path, Path]:
enc_path, temp_path, _ = _resolve_encrypted_paths(
encrypted_db_path=encrypted_db_path, tempkey_path=tempkey_path
)
if enc_path is None:
raise FileNotFoundError(
"Could not locate an encrypted Telegram database. "
"Run `telegram-reader paths` or set "
f"{ENV_ENCRYPTED_DB} and {ENV_TEMPKEY}."
)
if temp_path is None or not temp_path.exists():
raise FileNotFoundError(
"Could not locate .tempkeyEncrypted. "
"Run `telegram-reader paths` or set "
f"{ENV_TEMPKEY}."
)
enc_path = Path(enc_path)
temp_path = Path(temp_path)
if not enc_path.exists():
raise FileNotFoundError(f"Encrypted database not found: {enc_path}")
return enc_path, temp_path
def _load_tempkey_pragma(tempkey_path: Path) -> str:
tempkeyEnc = tempkey_path.read_bytes()
dbKey, dbSalt = tempkey_parse(tempkeyEnc, DEFAULT_PASSWORD)
return tempkey_pragma(dbKey, dbSalt)
def _connect_sqlcipher(encrypted_db_path: Path, tempkey_path: Path):
pragma = _load_tempkey_pragma(tempkey_path)
conn = sqlcipher3.connect(str(encrypted_db_path))
conn.execute("PRAGMA busy_timeout=5000;")
conn.execute("PRAGMA cipher_plaintext_header_size=32;")
conn.execute("PRAGMA cipher_default_plaintext_header_size=32;")
conn.execute(pragma)
conn.execute("PRAGMA user_version;")
return conn
class OutputFormatter:
"""Handles different output formats"""
@staticmethod
def print_dialogs(dialogs: List[Dialog], format_type: str = "table"):
"""Print dialogs in specified format"""
if format_type == "json":
console.print_json(json.dumps([d.to_dict() for d in dialogs], indent=2))
elif format_type == "csv":
if dialogs:
writer = csv.DictWriter(
sys.stdout, fieldnames=dialogs[0].to_dict().keys()
)
writer.writeheader()
for dialog in dialogs:
writer.writerow(dialog.to_dict())
else: # table format
if not dialogs:
console.print("No dialogs found")
return
table = Table(
title=f"Dialogs ({len(dialogs)})",
box=box.SIMPLE_HEAVY,
show_lines=False,
)
table.add_column("Type", no_wrap=True)
table.add_column("Name", style="bold")
table.add_column("Username")
table.add_column("Messages", justify="right")
table.add_column("Last Activity", style="dim")
for dialog in dialogs:
username_part = f" @{dialog.username}" if dialog.username else ""
type_emoji = {"user": "👤", "group": "👥", "channel": "📢"}.get(
dialog.type, "💬"
)
activity_str = (
dialog.last_activity.strftime("%Y-%m-%d %H:%M")
if dialog.last_activity
else "Never"
)
table.add_row(
f"{type_emoji} {dialog.type}",
dialog.name,
username_part.strip(),
f"{dialog.message_count}",
activity_str,
)
console.print(table)
@staticmethod
def print_messages(messages: List[Message], format_type: str = "table"):
"""Print messages in specified format"""
if format_type == "json":
console.print_json(json.dumps([m.to_dict() for m in messages], indent=2))
elif format_type == "csv":
if messages:
writer = csv.DictWriter(
sys.stdout, fieldnames=messages[0].to_dict().keys()
)
writer.writeheader()
for message in messages:
writer.writerow(message.to_dict())
else: # table format
if not messages:
console.print("No messages found")
return
table = Table(
title=f"Messages ({len(messages)})",
box=box.SIMPLE_HEAVY,
show_lines=True,
)
table.add_column("When", style="dim", no_wrap=True)
table.add_column("Direction", no_wrap=True)
table.add_column("Dialog")
table.add_column("Author")
table.add_column("Text", overflow="fold")
for message in messages:
direction_emoji = "📤" if message.direction == "sent" else "📩"
table.add_row(
message.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
f"{direction_emoji} {message.direction}",
message.dialog_name,
message.author_name,
message.text,
)
console.print(table)
@staticmethod
def print_paths(locations: List[TelegramDbLocation], format_type: str = "table"):
"""Print discovered database paths"""
if format_type == "json":
console.print_json(json.dumps([l.to_dict() for l in locations], indent=2))
elif format_type == "csv":
if locations:
writer = csv.DictWriter(
sys.stdout, fieldnames=locations[0].to_dict().keys()
)
writer.writeheader()
for location in locations:
writer.writerow(location.to_dict())
else:
if not locations:
console.print("No Telegram database locations found")
return
table = Table(
title=f"Database Locations ({len(locations)})",
box=box.SIMPLE_HEAVY,
show_lines=True,
)
table.add_column("Account", no_wrap=True)
table.add_column("Last Modified", style="dim")
table.add_column("Encrypted DB", overflow="fold")
table.add_column("Tempkey", overflow="fold")
table.add_column("Container", overflow="fold")
for location in locations:
table.add_row(
location.account,
location.last_modified.strftime("%Y-%m-%d %H:%M:%S"),
location.encrypted_db,
location.tempkey,
location.container,
)
console.print(table)
class ByteUtil:
"""Binary data reading utility (exact copy from notebook)"""
def __init__(self, buffer, endian="<"):
self.endian = endian
self.buf = buffer
def read_fmt(self, fmt):
fmt = self.endian + fmt
data = self.buf.read(struct.calcsize(fmt))
return struct.unpack(fmt, data)[0]
def read_int8(self):
return self.read_fmt("b")
def read_uint8(self):
return self.read_fmt("B")
def read_int32(self):
return self.read_fmt("i")
def read_uint32(self):
return self.read_fmt("I")
def read_int64(self):
return self.read_fmt("q")
def read_uint64(self):
return self.read_fmt("Q")
def read_bytes(self):
slen = self.read_int32()
return self.buf.read(slen)
def read_str(self):
return self.read_bytes().decode("utf-8")
def read_short_bytes(self):
slen = self.read_uint8()
return self.buf.read(slen)
def read_short_str(self):
return self.read_short_bytes().decode("utf-8")
def read_double(self):
return self.read_fmt("d")
def murmur(d):
"""MurmurHash3 with Telegram's seed"""
return mmh3.hash(d, seed=0xF7CA7FD2)
class PostboxDecoder:
"""Postbox decoder for parsing Telegram's binary format"""
class ValueType(enum.Enum):
Int32 = 0
Int64 = 1
Bool = 2
Double = 3
String = 4
Object = 5
Int32Array = 6
Int64Array = 7
ObjectArray = 8
ObjectDictionary = 9
Bytes = 10
Nil = 11
StringArray = 12
BytesArray = 13
def __init__(self, data):
self.bio = ByteUtil(io.BytesIO(data), endian="<")
self.size = len(data)
def decodeRootObject(self):
return self.decodeObjectForKey("_")
def decodeObjectForKey(self, key):
t, v = self.get(self.ValueType.Object, key)
if v:
return v
def get(self, valueType, key):
for k, t, v in self._iter_kv():
if k != key:
pass
elif valueType is None:
return t, v
elif t == valueType:
return t, v
elif t == self.ValueType.Nil:
return t, None
return None, None
def _iter_kv(self):
self.bio.buf.seek(0, io.SEEK_SET)
while True:
pos = self.bio.buf.tell()
if pos >= self.size:
break
key = self.bio.read_short_str()
valueType, value = self.readValue()
yield key, valueType, value
def _readObject(self, decode=None):
if decode is None:
decode = True
typeHash = self.bio.read_int32()
dataLen = self.bio.read_int32()
data = self.bio.buf.read(dataLen)
if not decode:
value = {"type": typeHash, "data": data}
else:
decoder = self.__class__(data)
value = {k: v for k, t, v in decoder._iter_kv()}
value["@type"] = typeHash
return value
def readValue(self, decodeObjects=None):
valueType = self.ValueType(self.bio.read_uint8())
value = None
if valueType == self.ValueType.Int32:
value = self.bio.read_int32()
elif valueType == self.ValueType.Int64:
value = self.bio.read_int64()
elif valueType == self.ValueType.Bool:
value = self.bio.read_uint8() != 0
elif valueType == self.ValueType.Double:
value = self.bio.read_double()
elif valueType == self.ValueType.String:
value = self.bio.read_str()
elif valueType == self.ValueType.Object:
value = self._readObject(decode=decodeObjects)
elif valueType == self.ValueType.Int32Array:
alen = self.bio.read_int32()
value = [None] * alen
for i in range(alen):
value[i] = self.bio.read_int32()
elif valueType == self.ValueType.Int64Array:
alen = self.bio.read_int32()
value = [None] * alen
for i in range(alen):
value[i] = self.bio.read_int64()
elif valueType == self.ValueType.ObjectArray:
alen = self.bio.read_int32()
value = [None] * alen
for i in range(alen):
value[i] = self._readObject(decode=decodeObjects)
elif valueType == self.ValueType.ObjectDictionary:
dlen = self.bio.read_int32()
value = [None] * dlen
for i in range(dlen):
dkey = self._readObject(decode=decodeObjects)
dval = self._readObject(decode=decodeObjects)
value[i] = (dkey, dval)
elif valueType == self.ValueType.Bytes:
value = self.bio.read_bytes()
elif valueType == self.ValueType.Nil:
pass # Nil is None
elif valueType == self.ValueType.StringArray:
alen = self.bio.read_int32()
value = [None] * alen
for i in range(alen):
value[i] = self.bio.read_str()
elif valueType == self.ValueType.BytesArray:
alen = self.bio.read_int32()
value = [None] * alen
for i in range(alen):
value[i] = self.bio.read_bytes()
else:
raise Exception("unknown value type")
return valueType, value
class MessageDataFlags(enum.IntFlag):
GloballyUniqueId = 1 << 0
GlobalTags = 1 << 1
GroupingKey = 1 << 2
GroupInfo = 1 << 3
LocalTags = 1 << 4
ThreadId = 1 << 5
class FwdInfoFlags(enum.IntFlag):
SourceId = 1 << 1
SourceMessage = 1 << 2
Signature = 1 << 3
PsaType = 1 << 4
Flags = 1 << 5
class MessageFlags(enum.IntFlag):
Unsent = 1
Failed = 2
Incoming = 4
TopIndexable = 16
Sending = 32
CanBeGroupedIntoFeed = 64
WasScheduled = 128
CountedAsIncoming = 256
class MessageTags(enum.IntFlag):
PhotoOrVideo = 1 << 0
File = 1 << 1
Music = 1 << 2
WebPage = 1 << 3
VoiceOrInstantVideo = 1 << 4
UnseenPersonalMessage = 1 << 5
LiveLocation = 1 << 6
Gif = 1 << 7
Photo = 1 << 8
Video = 1 << 9
Pinned = 1 << 10
class MessageIndex:
"""Message index structure"""
def __init__(self, peerId, namespace, mid, timestamp):
self.peerId = peerId
self.namespace = namespace
self.id = mid
self.timestamp = timestamp
@classmethod
def from_bytes(cls, b):
bio = ByteUtil(io.BytesIO(b), endian=">")
peerId = bio.read_int64()
namespace = bio.read_int32()
timestamp = bio.read_int32()
mid = bio.read_int32()
return cls(peerId, namespace, mid, timestamp)
def as_bytes(self):
return struct.pack(
">qiii", self.peerId, self.namespace, self.timestamp, self.id
)
def __repr__(self):
return f"ns:{self.namespace} pr:{self.peerId} id:{self.id} ts:{self.timestamp}"
def read_intermediate_fwd_info(buf):
"""Read forward info from message (exact copy from notebook)"""
infoFlags = FwdInfoFlags(buf.read_int8())
if infoFlags == 0:
return None
authorId = buf.read_int64()
date = buf.read_int32()
sourceId = None
if FwdInfoFlags.SourceId in infoFlags:
sourceId = buf.read_int64()
sourceMessagePeerId = None
sourceMessageNamespace = None
sourceMessageIdId = None
if FwdInfoFlags.SourceMessage in infoFlags:
sourceMessagePeerId = buf.read_int64()
sourceMessageNamespace = buf.read_int32()
sourceMessageIdId = buf.read_int32()
signature = None
if FwdInfoFlags.Signature in infoFlags:
signature = buf.read_str()
psaType = None
if FwdInfoFlags.PsaType in infoFlags:
psaType = buf.read_str()
flags = None
if FwdInfoFlags.Flags in infoFlags:
flags = buf.read_int32()
return {
"author": authorId,
"date": date,
"srcId": sourceId,
"srcMsgPeer": sourceMessagePeerId,
"srcMsgNs": sourceMessageNamespace,
"srcMsgId": sourceMessageIdId,
"signature": signature,
"psaType": psaType,
"flags": flags,
}
def read_intermediate_message(v: bytes):
"""Message parser from the notebook (exact implementation)"""
buf = ByteUtil(io.BytesIO(v))
typ = buf.read_int8()
if typ != 0:
return None
stableId = buf.read_uint32()
stableVer = buf.read_uint32()
dataFlags = MessageDataFlags(buf.read_uint8())
globallyUniqueId = None
if MessageDataFlags.GloballyUniqueId in dataFlags:
globallyUniqueId = buf.read_int64()
globalTags = None
if MessageDataFlags.GlobalTags in dataFlags:
globalTags = buf.read_uint32()
groupingKey = None
if MessageDataFlags.GroupingKey in dataFlags:
groupingKey = buf.read_int64()
groupInfoStableId = None
if MessageDataFlags.GroupInfo in dataFlags:
groupInfoStableId = buf.read_uint32()
localTagsVal = None
if MessageDataFlags.LocalTags in dataFlags:
localTagsVal = buf.read_uint32()
threadId = None
if MessageDataFlags.ThreadId in dataFlags:
threadId = buf.read_int64()
flags = MessageFlags(buf.read_uint32())
tags = MessageTags(buf.read_uint32())
fwd_info = read_intermediate_fwd_info(buf)
authorId = None
hasAuthorId = buf.read_int8()
if hasAuthorId == 1:
authorId = buf.read_int64()
text = buf.read_str()
attributesCount = buf.read_int32()
attributes = [None] * attributesCount
for i in range(attributesCount):
attributes[i] = PostboxDecoder(buf.read_bytes()).decodeRootObject()
embeddedMediaCount = buf.read_int32()
embeddedMedia = [None] * embeddedMediaCount
for i in range(embeddedMediaCount):
embeddedMedia[i] = PostboxDecoder(buf.read_bytes()).decodeRootObject()
referencedMediaIds = []
referencedMediaIdsCount = buf.read_int32()
for _ in range(referencedMediaIdsCount):
idNamespace = buf.read_int32()
idId = buf.read_int64()
referencedMediaIds.append((idNamespace, idId))
return {
"flags": flags,
"tags": tags,
"authorId": authorId,
"fwd": fwd_info,
"text": text,
"referencedMediaIds": referencedMediaIds,
"embeddedMedia": embeddedMedia,
"attributes": attributes,
}
class TelegramReader:
"""Main Telegram database reader class"""
def __init__(self):
self.con = None
self._peer_cache = {}
def connect(
self,
db_path: Optional[str] = None,
encrypted_db_path: Optional[str] = None,
tempkey_path: Optional[str] = None,
):
"""Connect to the database (plaintext or live encrypted)"""
if db_path is None:
env_plain = os.getenv(ENV_PLAINTEXT_DB)
if env_plain:
db_path = env_plain
using_sqlcipher = False
if db_path is not None:
db_path = Path(db_path).expanduser()
if not db_path.exists():
raise FileNotFoundError(f"Database not found: {db_path}")
if _is_sqlite_database(db_path):
self.con = sqlite3.connect(str(db_path))
else:
console.print(
"ℹ️ Provided database does not look like plaintext SQLite. Opening with SQLCipher (live)."
)
enc_path, temp_path = _require_encrypted_paths(
encrypted_db_path=str(db_path), tempkey_path=tempkey_path
)
self.con = _connect_sqlcipher(enc_path, temp_path)
using_sqlcipher = True
else:
enc_path, temp_path = _require_encrypted_paths(
encrypted_db_path=encrypted_db_path, tempkey_path=tempkey_path
)
self.con = _connect_sqlcipher(enc_path, temp_path)
using_sqlcipher = True
# Get database info
with closing(self.con.cursor()) as cursor:
cursor.execute("SELECT COUNT(*) FROM t7")
message_count = cursor.fetchone()[0]
mode = " (live encrypted)" if using_sqlcipher else ""
console.print(f"✅ Connected to database{mode} ({message_count:,} messages)")
return True
def get_peer(self, peer_id):
"""Get peer information by ID with caching"""
if peer_id in self._peer_cache:
return self._peer_cache[peer_id]
with closing(self.con.cursor()) as cursor:
cursor.execute(
"SELECT value FROM t2 WHERE key = ? ORDER BY key LIMIT 1", (peer_id,)
)
v = cursor.fetchone()
if v is None:
self._peer_cache[peer_id] = None
return None
data = PostboxDecoder(v[0]).decodeRootObject()
# Extract display name
if data:
if data.get("fn") or data.get("ln"):
display_name = f"{data.get('fn', '')} {data.get('ln', '')}".strip()
elif data.get("t"):
display_name = data.get("t", "")
else:
display_name = f"User {peer_id}"
data["display_name"] = display_name
self._peer_cache[peer_id] = data
return data
def search_messages(self, search_term, limit=50):
"""Search for messages containing specific text"""
if not search_term or limit <= 0:
return []
found = []
hex_pattern = f"%{search_term.encode().hex().upper()}%"
with closing(self.con.cursor()) as cursor:
cursor.execute(
"""
SELECT key, value FROM t7
WHERE hex(value) LIKE ?
ORDER BY key DESC
LIMIT ?
""",
(hex_pattern, limit * 10),
)
search_lower = search_term.lower()
for key, value in cursor.fetchall():
try:
idx = MessageIndex.from_bytes(key)
msg = read_intermediate_message(value)
except Exception:
continue
if msg and msg["text"] and search_lower in msg["text"].lower():
found.append((idx, msg))
if len(found) >= limit:
break
return found
def get_recent_messages(self, limit=20):
"""Get recent messages from all conversations"""
with closing(self.con.cursor()) as cursor:
cursor.execute(
"SELECT key, value FROM t7 ORDER BY key DESC LIMIT ?", (limit * 5,)
)
messages = []
for key, value in cursor.fetchall():
try:
idx = MessageIndex.from_bytes(key)
msg = read_intermediate_message(value)
except Exception:
continue
if msg and msg["text"]:
messages.append((idx, msg))
if len(messages) >= limit:
break
return messages
def _create_message_object(self, idx: MessageIndex, msg: Dict) -> Message:
"""Create a Message object from parsed data"""
# Get dialog info
peer = self.get_peer(idx.peerId)
dialog_name = (
peer.get("display_name", f"Unknown {idx.peerId}")
if peer
else f"Unknown {idx.peerId}"
)
# Get author info
author_id = msg.get("authorId")
if author_id:
author_peer = self.get_peer(author_id)
if author_peer:
author_name = author_peer.get("display_name", f"User {author_id}")
else:
author_name = f"User {author_id}"
else:
author_name = "Unknown"
# Determine direction
direction = "received" if MessageFlags.Incoming in msg["flags"] else "sent"
return Message(
id=idx.id,
dialog_id=idx.peerId,
dialog_name=dialog_name,
author_id=author_id,
author_name=author_name,
timestamp=datetime.datetime.fromtimestamp(idx.timestamp),
text=msg["text"],
direction=direction,
)
def search_messages_as_objects(
self, search_term: str, limit: int = 50, dialog_name: Optional[str] = None
) -> List[Message]:
"""Search for messages and return as Message objects"""
if not search_term or limit <= 0:
return []
if dialog_name:
dialog_manager = DialogManager(self)
dialog = dialog_manager.get_dialog_by_name(dialog_name)
if not dialog:
return []
found_raw = []
hex_pattern = f"%{search_term.encode().hex().upper()}%"
dialog_hex = struct.pack(">Q", dialog.id).hex().upper()
search_lower = search_term.lower()
with closing(self.con.cursor()) as cursor:
cursor.execute(
"""
SELECT key, value FROM t7
WHERE hex(substr(key, 1, 8)) = ? AND hex(value) LIKE ?
ORDER BY key DESC
LIMIT ?
""",
(dialog_hex, hex_pattern, limit * 5),
)
for key, value in cursor.fetchall():
try:
idx = MessageIndex.from_bytes(key)
msg = read_intermediate_message(value)
except Exception:
continue
if msg and msg["text"] and search_lower in msg["text"].lower():
found_raw.append((idx, msg))
if len(found_raw) >= limit:
break
else:
found_raw = self.search_messages(search_term, limit)
# Convert to Message objects
messages = []
for idx, msg in found_raw:
try:
message_obj = self._create_message_object(idx, msg)
except Exception:
continue
messages.append(message_obj)
return messages
def get_recent_messages_as_objects(
self, limit: int = 20, dialog_name: Optional[str] = None
) -> List[Message]:
"""Get recent messages as Message objects"""
if dialog_name:
dialog_manager = DialogManager(self)
dialog = dialog_manager.get_dialog_by_name(dialog_name)
if not dialog:
return []
dialog_hex = struct.pack(">Q", dialog.id).hex().upper()
with closing(self.con.cursor()) as cursor:
cursor.execute(
f"""
SELECT key, value FROM t7
WHERE hex(substr(key, 1, 8)) = '{dialog_hex}'
ORDER BY key DESC
LIMIT ?
""",
(limit * 5,),
)
messages = []
for key, value in cursor.fetchall():
try:
idx = MessageIndex.from_bytes(key)
msg = read_intermediate_message(value)
except Exception:
continue
if msg and msg["text"]:
message_obj = self._create_message_object(idx, msg)
messages.append(message_obj)
if len(messages) >= limit:
break
return messages
else:
recent_tuples = self.get_recent_messages(limit)
messages = []
for idx, msg in recent_tuples:
try:
message_obj = self._create_message_object(idx, msg)
except Exception:
continue
messages.append(message_obj)
return messages
def print_message(self, idx, msg):
"""Print a single message (legacy format)"""
direction = "<-" if MessageFlags.Incoming in msg["flags"] else "->"
ts = datetime.datetime.fromtimestamp(idx.timestamp).isoformat()
peer = self.get_peer(idx.peerId)
peer_name = (
peer.get("display_name", f"Peer {idx.peerId}")
if peer
else f"Peer {idx.peerId}"
)
author_name = "Unknown"
if msg.get("authorId"):
author_peer = self.get_peer(msg["authorId"])
if author_peer:
author_name = author_peer.get("display_name", f"User {msg['authorId']}")
console.print(f"=== {direction} {ts} in {peer_name}")
console.print(f"=== From: {author_name}")
if msg["text"]:
console.print(msg["text"])
console.print()
class DialogManager:
"""Manages dialog discovery and filtering"""
def __init__(self, reader: TelegramReader):
self.reader = reader
self._dialog_cache = {}
def get_all_dialogs(self, include_empty: bool = False) -> List[Dialog]:
"""Get all dialogs with message counts"""
dialogs = []
with closing(self.reader.con.cursor()) as cursor:
cursor.execute("""
SELECT substr(key, 1, 8) as peer_prefix, COUNT(*) as msg_count, MAX(key) as last_key
FROM t7
GROUP BY substr(key, 1, 8)
ORDER BY msg_count DESC
""")
peer_stats = {}
for prefix, count, last_key in cursor.fetchall():
try:
if len(prefix) >= 8:
peer_id = struct.unpack(">Q", prefix)[0]
peer_stats[peer_id] = {
"message_count": count,
"last_key": last_key,
}
except (struct.error, ValueError):
continue
if peer_stats:
placeholders = ",".join("?" * len(peer_stats))
cursor.execute(
f"SELECT key, value FROM t2 WHERE key IN ({placeholders})",
list(peer_stats.keys()),
)
for peer_id, peer_data in cursor.fetchall():
peer_info = self.reader.get_peer(peer_id)
if not peer_info:
continue
stats = peer_stats.get(
peer_id, {"message_count": 0, "last_key": None}
)
message_count = stats["message_count"]
if not include_empty and message_count == 0:
continue
last_activity = None
if stats["last_key"]:
try:
idx = MessageIndex.from_bytes(stats["last_key"])
last_activity = datetime.datetime.fromtimestamp(
idx.timestamp
)
except (struct.error, TypeError, ValueError):
last_activity = None
dialog_type = self._determine_dialog_type(peer_info)
dialog = Dialog(
id=peer_id,
name=peer_info["display_name"],
username=peer_info.get("un"),
type=dialog_type,
message_count=message_count,
last_activity=last_activity,
)
dialogs.append(dialog)
if include_empty:
cursor.execute("SELECT key, value FROM t2")
for peer_id, peer_data in cursor.fetchall():
if peer_id not in peer_stats:
peer_info = self.reader.get_peer(peer_id)
if peer_info:
dialog_type = self._determine_dialog_type(peer_info)
dialog = Dialog(
id=peer_id,
name=peer_info["display_name"],
username=peer_info.get("un"),
type=dialog_type,
message_count=0,
last_activity=None,
)
dialogs.append(dialog)
return dialogs
def _determine_dialog_type(self, peer_info: Dict) -> str:
"""Determine if peer is user, group, or channel"""
if peer_info.get("t"):
if peer_info.get("un"):
return "channel"
else:
return "group"
elif peer_info.get("fn") or peer_info.get("ln"):
return "user"
else:
return "unknown"
def search_dialogs(self, query: str) -> List[Dialog]:
"""Search dialogs by name or username"""
all_dialogs = self.get_all_dialogs()
query_lower = query.lower()
filtered = []
for dialog in all_dialogs:
if query_lower in dialog.name.lower():
filtered.append(dialog)
continue
if dialog.username and query_lower in dialog.username.lower():
filtered.append(dialog)
continue
return filtered
def filter_dialogs(
self,
dialogs: List[Dialog],
dialog_type: Optional[str] = None,
min_messages: int = 0,
) -> List[Dialog]:
"""Filter dialogs by type and activity"""
filtered = dialogs
if dialog_type:
filtered = [d for d in filtered if d.type == dialog_type]
if min_messages > 0:
filtered = [d for d in filtered if d.message_count >= min_messages]
return filtered
def get_dialog_by_name(self, name: str) -> Optional[Dialog]:
"""Find dialog by exact or partial name match"""
dialogs = self.search_dialogs(name)
for dialog in dialogs:
if dialog.name.lower() == name.lower():
return dialog
if dialog.username and dialog.username.lower() == name.lower():
return dialog
return dialogs[0] if dialogs else None
def handle_dialogs_list(args, reader, dialog_manager):
"""Handle dialogs list command"""
dialogs = dialog_manager.get_all_dialogs()
if args.limit:
dialogs = dialogs[: args.limit]
OutputFormatter.print_dialogs(dialogs, args.format)
def handle_dialogs_search(args, reader, dialog_manager):
"""Handle dialogs search command"""
dialogs = dialog_manager.search_dialogs(args.query)
OutputFormatter.print_dialogs(dialogs, args.format)
def handle_messages_recent(args, reader, dialog_manager):
"""Handle messages recent command"""
messages = reader.get_recent_messages_as_objects(args.limit, args.dialog)
OutputFormatter.print_messages(messages, args.format)
def handle_messages_search(args, reader, dialog_manager):
"""Handle messages search command"""
messages = reader.search_messages_as_objects(args.query, args.limit, args.dialog)
if messages:
if args.format == "table":
console.print(f"\n🔍 Found {len(messages)} messages for '{args.query}':")
OutputFormatter.print_messages(messages, args.format)
else:
if args.format == "table":
console.print("No messages found")
def handle_legacy_mode(args, reader):
"""Handle legacy single-flag mode for backwards compatibility"""
if args.search:
console.print(f"\n🔍 Searching for: '{args.search}'")
messages = reader.search_messages(args.search, args.limit)
if messages:
console.print(f"Found {len(messages)} messages:")
for idx, msg in messages:
reader.print_message(idx, msg)
else:
console.print("No messages found")
elif args.recent:
console.print("\n📱 Recent messages:")
messages = reader.get_recent_messages(args.recent)
for idx, msg in messages:
reader.print_message(idx, msg)
elif args.peer:
console.print(f"\n👤 Messages from peer {args.peer}:")
# Implementation for peer-specific messages
pass
return True
def handle_paths(args):
"""Handle paths command"""
locations = discover_telegram_db_locations()
OutputFormatter.print_paths(locations, args.format)
return True
def main():
"""Main CLI entry point"""
parser = argparse.ArgumentParser(
description="Telegram Database Reader - Enhanced CLI for exploring Telegram data",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# List all dialogs
telegram-reader dialogs list
# Search for specific dialog
telegram-reader dialogs search "starcraft"
# Get recent messages from specific dialog
telegram-reader messages recent --dialog "StarCraft" --limit 10
# Search messages in all dialogs
telegram-reader messages search "brete" --limit 5
# Export to JSON
telegram-reader dialogs list --format json
# Show detected database paths
telegram-reader paths
# Use explicit paths
telegram-reader --encrypted-db "/path/to/db_sqlite" --tempkey "/path/to/.tempkeyEncrypted" dialogs list
# Live encrypted DB (default)
telegram-reader dialogs list
# Legacy mode (backwards compatibility)
telegram-reader --search "keyword" --limit 10
""",
)
# Global options
parser.add_argument(
"--format",
choices=["table", "json", "csv"],
default="table",
help="Output format (default: table)",
)
parser.add_argument(
"--db",
help=(
"Path to plaintext SQLite DB (overrides auto-discovery or set "
f"{ENV_PLAINTEXT_DB})"
),
)
parser.add_argument(
"--encrypted-db",
help=(
"Path to encrypted db_sqlite (overrides auto-discovery or set "
f"{ENV_ENCRYPTED_DB})"
),
)
parser.add_argument(
"--tempkey",
help=f"Path to .tempkeyEncrypted (or set {ENV_TEMPKEY})",
)
# Legacy options for backwards compatibility
parser.add_argument("--search", help="Legacy: Search for messages containing text")
parser.add_argument("--recent", type=int, help="Legacy: Show recent messages")
parser.add_argument(
"--peer", type=int, help="Legacy: Show messages from specific peer ID"
)
parser.add_argument("--limit", type=int, default=20, help="Limit number of results")
# Subcommands
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# Paths discovery
paths_parser = subparsers.add_parser(
"paths", help="List detected Telegram database locations"
)
paths_parser.add_argument(
"--format", choices=["table", "json", "csv"], default="table"
)
# Dialogs subcommand
dialogs_parser = subparsers.add_parser(
"dialogs", help="Manage dialogs (chats, groups, channels)"
)
dialogs_subparsers = dialogs_parser.add_subparsers(dest="dialogs_action")
# Dialogs list
dialogs_list_parser = dialogs_subparsers.add_parser("list", help="List all dialogs")
dialogs_list_parser.add_argument(
"--limit", type=int, help="Limit number of dialogs"
)
dialogs_list_parser.add_argument(
"--format", choices=["table", "json", "csv"], default="table"
)
# Dialogs search
dialogs_search_parser = dialogs_subparsers.add_parser(
"search", help="Search dialogs by name"
)
dialogs_search_parser.add_argument("query", help="Search query for dialog name")
dialogs_search_parser.add_argument(
"--format", choices=["table", "json", "csv"], default="table"
)
# Messages subcommand
messages_parser = subparsers.add_parser("messages", help="Work with messages")
messages_subparsers = messages_parser.add_subparsers(dest="messages_action")
# Messages recent
messages_recent_parser = messages_subparsers.add_parser(
"recent", help="Get recent messages"
)
messages_recent_parser.add_argument("--dialog", help="Dialog name to filter by")
messages_recent_parser.add_argument(
"--limit", type=int, default=20, help="Number of messages"
)
messages_recent_parser.add_argument(
"--format", choices=["table", "json", "csv"], default="table"
)
# Messages search
messages_search_parser = messages_subparsers.add_parser(
"search", help="Search messages"
)
messages_search_parser.add_argument("query", help="Search query for message text")
messages_search_parser.add_argument("--dialog", help="Dialog name to filter by")
messages_search_parser.add_argument(
"--limit", type=int, default=50, help="Number of messages"
)
messages_search_parser.add_argument(
"--format", choices=["table", "json", "csv"], default="table"
)
args = parser.parse_args()
if args.command == "paths":
return handle_paths(args)
# Check for legacy mode
if args.search or args.recent or args.peer:
console.print("ℹ️ Using legacy mode. Consider using the new subcommand interface:")
console.print(" dialogs list, dialogs search, messages recent, messages search")
console.print()
reader = TelegramReader()
reader.connect(
db_path=args.db,
encrypted_db_path=args.encrypted_db,
tempkey_path=args.tempkey,
)
return handle_legacy_mode(args, reader)
# Handle subcommands
if not args.command:
parser.print_help()
return False
# Initialize reader and dialog manager
reader = TelegramReader()
reader.connect(
db_path=args.db,
encrypted_db_path=args.encrypted_db,
tempkey_path=args.tempkey,
)
dialog_manager = DialogManager(reader)
# Route to appropriate handler
if args.command == "dialogs":
if args.dialogs_action == "list":
handle_dialogs_list(args, reader, dialog_manager)
elif args.dialogs_action == "search":
handle_dialogs_search(args, reader, dialog_manager)
else:
dialogs_parser.print_help()
elif args.command == "messages":
if args.messages_action == "recent":
handle_messages_recent(args, reader, dialog_manager)
elif args.messages_action == "search":
handle_messages_search(args, reader, dialog_manager)
else:
messages_parser.print_help()
return True
if __name__ == "__main__":
try:
success = main()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
console.print("\n⚠️ Interrupted by user")
sys.exit(1)
except Exception as e:
console.print(f"❌ Error: {e}")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment