Last active
January 11, 2026 15:35
-
-
Save elyase/81fa60d6762b4cdfac3bcec9abd3a568 to your computer and use it in GitHub Desktop.
Telegram Database Reader (cleaned paths + discovery)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.8" | |
| # dependencies = [ | |
| # "pycryptodome>=3.20.0", | |
| # "mmh3>=5.0.0", | |
| # "rich>=13.7.0", | |
| # "sqlcipher3-wheels", | |
| # ] | |
| # /// | |
| """ | |
| Telegram Database Reader - A comprehensive CLI tool for reading and analyzing | |
| messages from local Telegram database files on macOS. | |
| Features: | |
| - 🔓 Live encrypted DB access via SQLCipher (no export step) | |
| - 🔍 Smart message search across all conversations | |
| - 📋 Dialog management (list, search, filter) | |
| - 📊 Multiple output formats (table, JSON, CSV) | |
| - 🎯 Professional CLI with subcommands and legacy support | |
| Quick start: | |
| uv run telegram-reader.py dialogs list | |
| Usage: | |
| ./telegram-reader paths | |
| ./telegram-reader dialogs list --format json | |
| ./telegram-reader messages recent --dialog "GroupName" --limit 10 | |
| ./telegram-reader messages search "keyword" --limit 20 | |
| Dependencies: | |
| - Inline script deps install automatically with `uv run` | |
| - If using pip directly: pip install sqlcipher3-wheels pycryptodome mmh3 rich | |
| Discovery: | |
| - Auto-discovery is macOS-only (Telegram Desktop) | |
| - Use `telegram-reader paths` to list detected locations | |
| Overrides: | |
| - Plaintext DB: --db /path/to/plaintext.db or TELEGRAM_DB_PATH | |
| - Encrypted DB: --encrypted-db /path/to/db_sqlite or TELEGRAM_ENCRYPTED_DB | |
| - Tempkey: --tempkey /path/to/.tempkeyEncrypted or TELEGRAM_TEMPKEY_PATH | |
| - Encrypted DB is read live by default (no export) | |
| """ | |
| import argparse | |
| import binascii | |
| import csv | |
| import datetime | |
| import enum | |
| import io | |
| import json | |
| import os | |
| import sqlite3 | |
| import struct | |
| import sys | |
| from contextlib import closing | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import mmh3 | |
| import sqlcipher3 | |
| from Crypto.Cipher import AES | |
| from Crypto.Hash import SHA512 | |
| from rich import box | |
| from rich.console import Console | |
| from rich.table import Table | |
| @dataclass | |
| class Dialog: | |
| """Represents a chat dialog (user, group, or channel)""" | |
| id: int | |
| name: str | |
| username: Optional[str] | |
| type: str # 'user', 'group', 'channel' | |
| message_count: int | |
| last_activity: Optional[datetime.datetime] | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for JSON export""" | |
| data = asdict(self) | |
| if self.last_activity: | |
| data["last_activity"] = self.last_activity.isoformat() | |
| return data | |
| @dataclass | |
| class Message: | |
| """Represents a single message""" | |
| id: int | |
| dialog_id: int | |
| dialog_name: str | |
| author_id: Optional[int] | |
| author_name: str | |
| timestamp: datetime.datetime | |
| text: str | |
| direction: str # 'sent', 'received' | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for JSON export""" | |
| data = asdict(self) | |
| data["timestamp"] = self.timestamp.isoformat() | |
| return data | |
| # Database access helpers | |
| DEFAULT_PASSWORD = "no-matter-key" | |
| ENV_ENCRYPTED_DB = "TELEGRAM_ENCRYPTED_DB" | |
| ENV_TEMPKEY = "TELEGRAM_TEMPKEY_PATH" | |
| ENV_PLAINTEXT_DB = "TELEGRAM_DB_PATH" | |
| console = Console() | |
| @dataclass | |
| class TelegramDbLocation: | |
| """Represents a discovered Telegram database location""" | |
| container: str | |
| tempkey: str | |
| encrypted_db: str | |
| account: str | |
| last_modified: datetime.datetime | |
| def to_dict(self) -> Dict[str, Any]: | |
| data = asdict(self) | |
| data["last_modified"] = self.last_modified.isoformat() | |
| return data | |
| def murmur_for_decryption(d): | |
| """MurmurHash3 for decryption with Telegram's seed""" | |
| return mmh3.hash(d, seed=0xF7CA7FD2) | |
| def tempkey_kdf(password): | |
| """Key derivation function for tempkey""" | |
| h = SHA512.new() | |
| h.update(password.encode("utf-8")) | |
| digest = h.digest() | |
| key, iv = digest[0:32], digest[-16:] | |
| return key, iv | |
| def tempkey_parse(dataEnc, pwd): | |
| """Parse tempkey from encrypted data""" | |
| aesKey, aesIV = tempkey_kdf(pwd) | |
| cipher = AES.new(key=aesKey, iv=aesIV, mode=AES.MODE_CBC) | |
| data = cipher.decrypt(dataEnc) | |
| dbKey = data[0:32] | |
| dbSalt = data[32:48] | |
| dbHash = struct.unpack("<i", data[48:52])[0] | |
| dbPad = data[52:] | |
| if len(dbPad) != 12 or any(dbPad): | |
| console.print("⚠️ Warning: dbPad not 12 zeros") | |
| calcHash = murmur_for_decryption(dbKey + dbSalt) | |
| if dbHash != calcHash: | |
| console.print( | |
| f"⚠️ Warning: hash mismatch: {dbHash} != {calcHash} - continuing anyway" | |
| ) | |
| return dbKey, dbSalt | |
| def tempkey_pragma(dbKey, dbSalt): | |
| """Generate SQLCipher pragma for database key""" | |
| key = binascii.hexlify(dbKey + dbSalt).decode("utf-8") | |
| return f"PRAGMA key=\"x'{key}'\";" | |
| def _is_sqlite_database(path: Path) -> bool: | |
| """Best-effort check for a plaintext SQLite database header""" | |
| try: | |
| with open(path, "rb") as f: | |
| header = f.read(16) | |
| return header.startswith(b"SQLite format 3") | |
| except Exception: | |
| return False | |
| def discover_telegram_db_locations() -> List[TelegramDbLocation]: | |
| """Discover Telegram Desktop database locations on macOS""" | |
| locations: List[TelegramDbLocation] = [] | |
| home = Path.home() | |
| group_containers = home / "Library" / "Group Containers" | |
| if group_containers.exists(): | |
| for container in group_containers.iterdir(): | |
| if not container.is_dir(): | |
| continue | |
| stable = container / "stable" | |
| tempkey = stable / ".tempkeyEncrypted" | |
| if not tempkey.exists(): | |
| continue | |
| for encrypted_db in stable.glob("account-*/postbox/db/db_sqlite"): | |
| account_dir = encrypted_db.parents[2].name | |
| account_id = account_dir.replace("account-", "", 1) | |
| last_modified = datetime.datetime.fromtimestamp( | |
| encrypted_db.stat().st_mtime | |
| ) | |
| locations.append( | |
| TelegramDbLocation( | |
| container=str(container), | |
| tempkey=str(tempkey), | |
| encrypted_db=str(encrypted_db), | |
| account=account_id, | |
| last_modified=last_modified, | |
| ) | |
| ) | |
| locations.sort(key=lambda loc: loc.last_modified, reverse=True) | |
| return locations | |
| def _choose_latest_location( | |
| locations: List[TelegramDbLocation], | |
| ) -> Optional[TelegramDbLocation]: | |
| if not locations: | |
| return None | |
| return locations[0] | |
| def _resolve_encrypted_paths( | |
| encrypted_db_path: Optional[str] = None, tempkey_path: Optional[str] = None | |
| ): | |
| enc_path = Path(encrypted_db_path).expanduser() if encrypted_db_path else None | |
| temp_path = Path(tempkey_path).expanduser() if tempkey_path else None | |
| if enc_path is None: | |
| env_enc = os.getenv(ENV_ENCRYPTED_DB) | |
| if env_enc: | |
| enc_path = Path(env_enc).expanduser() | |
| if temp_path is None: | |
| env_temp = os.getenv(ENV_TEMPKEY) | |
| if env_temp: | |
| temp_path = Path(env_temp).expanduser() | |
| locations = [] | |
| if enc_path is None: | |
| if temp_path is not None and temp_path.exists(): | |
| stable = temp_path.parent | |
| candidates = list(stable.glob("account-*/postbox/db/db_sqlite")) | |
| if candidates: | |
| enc_path = max(candidates, key=lambda p: p.stat().st_mtime) | |
| else: | |
| locations = discover_telegram_db_locations() | |
| chosen = _choose_latest_location(locations) | |
| if chosen: | |
| enc_path = Path(chosen.encrypted_db) | |
| if temp_path is None: | |
| temp_path = Path(chosen.tempkey) | |
| if temp_path is None and enc_path is not None: | |
| stable = enc_path.parents[3] if len(enc_path.parents) > 3 else None | |
| if stable: | |
| candidate_tempkey = stable / ".tempkeyEncrypted" | |
| if candidate_tempkey.exists(): | |
| temp_path = candidate_tempkey | |
| return enc_path, temp_path, locations | |
| def _require_encrypted_paths( | |
| encrypted_db_path: Optional[str] = None, tempkey_path: Optional[str] = None | |
| ) -> Tuple[Path, Path]: | |
| enc_path, temp_path, _ = _resolve_encrypted_paths( | |
| encrypted_db_path=encrypted_db_path, tempkey_path=tempkey_path | |
| ) | |
| if enc_path is None: | |
| raise FileNotFoundError( | |
| "Could not locate an encrypted Telegram database. " | |
| "Run `telegram-reader paths` or set " | |
| f"{ENV_ENCRYPTED_DB} and {ENV_TEMPKEY}." | |
| ) | |
| if temp_path is None or not temp_path.exists(): | |
| raise FileNotFoundError( | |
| "Could not locate .tempkeyEncrypted. " | |
| "Run `telegram-reader paths` or set " | |
| f"{ENV_TEMPKEY}." | |
| ) | |
| enc_path = Path(enc_path) | |
| temp_path = Path(temp_path) | |
| if not enc_path.exists(): | |
| raise FileNotFoundError(f"Encrypted database not found: {enc_path}") | |
| return enc_path, temp_path | |
| def _load_tempkey_pragma(tempkey_path: Path) -> str: | |
| tempkeyEnc = tempkey_path.read_bytes() | |
| dbKey, dbSalt = tempkey_parse(tempkeyEnc, DEFAULT_PASSWORD) | |
| return tempkey_pragma(dbKey, dbSalt) | |
| def _connect_sqlcipher(encrypted_db_path: Path, tempkey_path: Path): | |
| pragma = _load_tempkey_pragma(tempkey_path) | |
| conn = sqlcipher3.connect(str(encrypted_db_path)) | |
| conn.execute("PRAGMA busy_timeout=5000;") | |
| conn.execute("PRAGMA cipher_plaintext_header_size=32;") | |
| conn.execute("PRAGMA cipher_default_plaintext_header_size=32;") | |
| conn.execute(pragma) | |
| conn.execute("PRAGMA user_version;") | |
| return conn | |
| class OutputFormatter: | |
| """Handles different output formats""" | |
| @staticmethod | |
| def print_dialogs(dialogs: List[Dialog], format_type: str = "table"): | |
| """Print dialogs in specified format""" | |
| if format_type == "json": | |
| console.print_json(json.dumps([d.to_dict() for d in dialogs], indent=2)) | |
| elif format_type == "csv": | |
| if dialogs: | |
| writer = csv.DictWriter( | |
| sys.stdout, fieldnames=dialogs[0].to_dict().keys() | |
| ) | |
| writer.writeheader() | |
| for dialog in dialogs: | |
| writer.writerow(dialog.to_dict()) | |
| else: # table format | |
| if not dialogs: | |
| console.print("No dialogs found") | |
| return | |
| table = Table( | |
| title=f"Dialogs ({len(dialogs)})", | |
| box=box.SIMPLE_HEAVY, | |
| show_lines=False, | |
| ) | |
| table.add_column("Type", no_wrap=True) | |
| table.add_column("Name", style="bold") | |
| table.add_column("Username") | |
| table.add_column("Messages", justify="right") | |
| table.add_column("Last Activity", style="dim") | |
| for dialog in dialogs: | |
| username_part = f" @{dialog.username}" if dialog.username else "" | |
| type_emoji = {"user": "👤", "group": "👥", "channel": "📢"}.get( | |
| dialog.type, "💬" | |
| ) | |
| activity_str = ( | |
| dialog.last_activity.strftime("%Y-%m-%d %H:%M") | |
| if dialog.last_activity | |
| else "Never" | |
| ) | |
| table.add_row( | |
| f"{type_emoji} {dialog.type}", | |
| dialog.name, | |
| username_part.strip(), | |
| f"{dialog.message_count}", | |
| activity_str, | |
| ) | |
| console.print(table) | |
| @staticmethod | |
| def print_messages(messages: List[Message], format_type: str = "table"): | |
| """Print messages in specified format""" | |
| if format_type == "json": | |
| console.print_json(json.dumps([m.to_dict() for m in messages], indent=2)) | |
| elif format_type == "csv": | |
| if messages: | |
| writer = csv.DictWriter( | |
| sys.stdout, fieldnames=messages[0].to_dict().keys() | |
| ) | |
| writer.writeheader() | |
| for message in messages: | |
| writer.writerow(message.to_dict()) | |
| else: # table format | |
| if not messages: | |
| console.print("No messages found") | |
| return | |
| table = Table( | |
| title=f"Messages ({len(messages)})", | |
| box=box.SIMPLE_HEAVY, | |
| show_lines=True, | |
| ) | |
| table.add_column("When", style="dim", no_wrap=True) | |
| table.add_column("Direction", no_wrap=True) | |
| table.add_column("Dialog") | |
| table.add_column("Author") | |
| table.add_column("Text", overflow="fold") | |
| for message in messages: | |
| direction_emoji = "📤" if message.direction == "sent" else "📩" | |
| table.add_row( | |
| message.timestamp.strftime("%Y-%m-%d %H:%M:%S"), | |
| f"{direction_emoji} {message.direction}", | |
| message.dialog_name, | |
| message.author_name, | |
| message.text, | |
| ) | |
| console.print(table) | |
| @staticmethod | |
| def print_paths(locations: List[TelegramDbLocation], format_type: str = "table"): | |
| """Print discovered database paths""" | |
| if format_type == "json": | |
| console.print_json(json.dumps([l.to_dict() for l in locations], indent=2)) | |
| elif format_type == "csv": | |
| if locations: | |
| writer = csv.DictWriter( | |
| sys.stdout, fieldnames=locations[0].to_dict().keys() | |
| ) | |
| writer.writeheader() | |
| for location in locations: | |
| writer.writerow(location.to_dict()) | |
| else: | |
| if not locations: | |
| console.print("No Telegram database locations found") | |
| return | |
| table = Table( | |
| title=f"Database Locations ({len(locations)})", | |
| box=box.SIMPLE_HEAVY, | |
| show_lines=True, | |
| ) | |
| table.add_column("Account", no_wrap=True) | |
| table.add_column("Last Modified", style="dim") | |
| table.add_column("Encrypted DB", overflow="fold") | |
| table.add_column("Tempkey", overflow="fold") | |
| table.add_column("Container", overflow="fold") | |
| for location in locations: | |
| table.add_row( | |
| location.account, | |
| location.last_modified.strftime("%Y-%m-%d %H:%M:%S"), | |
| location.encrypted_db, | |
| location.tempkey, | |
| location.container, | |
| ) | |
| console.print(table) | |
| class ByteUtil: | |
| """Binary data reading utility (exact copy from notebook)""" | |
| def __init__(self, buffer, endian="<"): | |
| self.endian = endian | |
| self.buf = buffer | |
| def read_fmt(self, fmt): | |
| fmt = self.endian + fmt | |
| data = self.buf.read(struct.calcsize(fmt)) | |
| return struct.unpack(fmt, data)[0] | |
| def read_int8(self): | |
| return self.read_fmt("b") | |
| def read_uint8(self): | |
| return self.read_fmt("B") | |
| def read_int32(self): | |
| return self.read_fmt("i") | |
| def read_uint32(self): | |
| return self.read_fmt("I") | |
| def read_int64(self): | |
| return self.read_fmt("q") | |
| def read_uint64(self): | |
| return self.read_fmt("Q") | |
| def read_bytes(self): | |
| slen = self.read_int32() | |
| return self.buf.read(slen) | |
| def read_str(self): | |
| return self.read_bytes().decode("utf-8") | |
| def read_short_bytes(self): | |
| slen = self.read_uint8() | |
| return self.buf.read(slen) | |
| def read_short_str(self): | |
| return self.read_short_bytes().decode("utf-8") | |
| def read_double(self): | |
| return self.read_fmt("d") | |
| def murmur(d): | |
| """MurmurHash3 with Telegram's seed""" | |
| return mmh3.hash(d, seed=0xF7CA7FD2) | |
| class PostboxDecoder: | |
| """Postbox decoder for parsing Telegram's binary format""" | |
| class ValueType(enum.Enum): | |
| Int32 = 0 | |
| Int64 = 1 | |
| Bool = 2 | |
| Double = 3 | |
| String = 4 | |
| Object = 5 | |
| Int32Array = 6 | |
| Int64Array = 7 | |
| ObjectArray = 8 | |
| ObjectDictionary = 9 | |
| Bytes = 10 | |
| Nil = 11 | |
| StringArray = 12 | |
| BytesArray = 13 | |
| def __init__(self, data): | |
| self.bio = ByteUtil(io.BytesIO(data), endian="<") | |
| self.size = len(data) | |
| def decodeRootObject(self): | |
| return self.decodeObjectForKey("_") | |
| def decodeObjectForKey(self, key): | |
| t, v = self.get(self.ValueType.Object, key) | |
| if v: | |
| return v | |
| def get(self, valueType, key): | |
| for k, t, v in self._iter_kv(): | |
| if k != key: | |
| pass | |
| elif valueType is None: | |
| return t, v | |
| elif t == valueType: | |
| return t, v | |
| elif t == self.ValueType.Nil: | |
| return t, None | |
| return None, None | |
| def _iter_kv(self): | |
| self.bio.buf.seek(0, io.SEEK_SET) | |
| while True: | |
| pos = self.bio.buf.tell() | |
| if pos >= self.size: | |
| break | |
| key = self.bio.read_short_str() | |
| valueType, value = self.readValue() | |
| yield key, valueType, value | |
| def _readObject(self, decode=None): | |
| if decode is None: | |
| decode = True | |
| typeHash = self.bio.read_int32() | |
| dataLen = self.bio.read_int32() | |
| data = self.bio.buf.read(dataLen) | |
| if not decode: | |
| value = {"type": typeHash, "data": data} | |
| else: | |
| decoder = self.__class__(data) | |
| value = {k: v for k, t, v in decoder._iter_kv()} | |
| value["@type"] = typeHash | |
| return value | |
| def readValue(self, decodeObjects=None): | |
| valueType = self.ValueType(self.bio.read_uint8()) | |
| value = None | |
| if valueType == self.ValueType.Int32: | |
| value = self.bio.read_int32() | |
| elif valueType == self.ValueType.Int64: | |
| value = self.bio.read_int64() | |
| elif valueType == self.ValueType.Bool: | |
| value = self.bio.read_uint8() != 0 | |
| elif valueType == self.ValueType.Double: | |
| value = self.bio.read_double() | |
| elif valueType == self.ValueType.String: | |
| value = self.bio.read_str() | |
| elif valueType == self.ValueType.Object: | |
| value = self._readObject(decode=decodeObjects) | |
| elif valueType == self.ValueType.Int32Array: | |
| alen = self.bio.read_int32() | |
| value = [None] * alen | |
| for i in range(alen): | |
| value[i] = self.bio.read_int32() | |
| elif valueType == self.ValueType.Int64Array: | |
| alen = self.bio.read_int32() | |
| value = [None] * alen | |
| for i in range(alen): | |
| value[i] = self.bio.read_int64() | |
| elif valueType == self.ValueType.ObjectArray: | |
| alen = self.bio.read_int32() | |
| value = [None] * alen | |
| for i in range(alen): | |
| value[i] = self._readObject(decode=decodeObjects) | |
| elif valueType == self.ValueType.ObjectDictionary: | |
| dlen = self.bio.read_int32() | |
| value = [None] * dlen | |
| for i in range(dlen): | |
| dkey = self._readObject(decode=decodeObjects) | |
| dval = self._readObject(decode=decodeObjects) | |
| value[i] = (dkey, dval) | |
| elif valueType == self.ValueType.Bytes: | |
| value = self.bio.read_bytes() | |
| elif valueType == self.ValueType.Nil: | |
| pass # Nil is None | |
| elif valueType == self.ValueType.StringArray: | |
| alen = self.bio.read_int32() | |
| value = [None] * alen | |
| for i in range(alen): | |
| value[i] = self.bio.read_str() | |
| elif valueType == self.ValueType.BytesArray: | |
| alen = self.bio.read_int32() | |
| value = [None] * alen | |
| for i in range(alen): | |
| value[i] = self.bio.read_bytes() | |
| else: | |
| raise Exception("unknown value type") | |
| return valueType, value | |
| class MessageDataFlags(enum.IntFlag): | |
| GloballyUniqueId = 1 << 0 | |
| GlobalTags = 1 << 1 | |
| GroupingKey = 1 << 2 | |
| GroupInfo = 1 << 3 | |
| LocalTags = 1 << 4 | |
| ThreadId = 1 << 5 | |
| class FwdInfoFlags(enum.IntFlag): | |
| SourceId = 1 << 1 | |
| SourceMessage = 1 << 2 | |
| Signature = 1 << 3 | |
| PsaType = 1 << 4 | |
| Flags = 1 << 5 | |
| class MessageFlags(enum.IntFlag): | |
| Unsent = 1 | |
| Failed = 2 | |
| Incoming = 4 | |
| TopIndexable = 16 | |
| Sending = 32 | |
| CanBeGroupedIntoFeed = 64 | |
| WasScheduled = 128 | |
| CountedAsIncoming = 256 | |
| class MessageTags(enum.IntFlag): | |
| PhotoOrVideo = 1 << 0 | |
| File = 1 << 1 | |
| Music = 1 << 2 | |
| WebPage = 1 << 3 | |
| VoiceOrInstantVideo = 1 << 4 | |
| UnseenPersonalMessage = 1 << 5 | |
| LiveLocation = 1 << 6 | |
| Gif = 1 << 7 | |
| Photo = 1 << 8 | |
| Video = 1 << 9 | |
| Pinned = 1 << 10 | |
| class MessageIndex: | |
| """Message index structure""" | |
| def __init__(self, peerId, namespace, mid, timestamp): | |
| self.peerId = peerId | |
| self.namespace = namespace | |
| self.id = mid | |
| self.timestamp = timestamp | |
| @classmethod | |
| def from_bytes(cls, b): | |
| bio = ByteUtil(io.BytesIO(b), endian=">") | |
| peerId = bio.read_int64() | |
| namespace = bio.read_int32() | |
| timestamp = bio.read_int32() | |
| mid = bio.read_int32() | |
| return cls(peerId, namespace, mid, timestamp) | |
| def as_bytes(self): | |
| return struct.pack( | |
| ">qiii", self.peerId, self.namespace, self.timestamp, self.id | |
| ) | |
| def __repr__(self): | |
| return f"ns:{self.namespace} pr:{self.peerId} id:{self.id} ts:{self.timestamp}" | |
| def read_intermediate_fwd_info(buf): | |
| """Read forward info from message (exact copy from notebook)""" | |
| infoFlags = FwdInfoFlags(buf.read_int8()) | |
| if infoFlags == 0: | |
| return None | |
| authorId = buf.read_int64() | |
| date = buf.read_int32() | |
| sourceId = None | |
| if FwdInfoFlags.SourceId in infoFlags: | |
| sourceId = buf.read_int64() | |
| sourceMessagePeerId = None | |
| sourceMessageNamespace = None | |
| sourceMessageIdId = None | |
| if FwdInfoFlags.SourceMessage in infoFlags: | |
| sourceMessagePeerId = buf.read_int64() | |
| sourceMessageNamespace = buf.read_int32() | |
| sourceMessageIdId = buf.read_int32() | |
| signature = None | |
| if FwdInfoFlags.Signature in infoFlags: | |
| signature = buf.read_str() | |
| psaType = None | |
| if FwdInfoFlags.PsaType in infoFlags: | |
| psaType = buf.read_str() | |
| flags = None | |
| if FwdInfoFlags.Flags in infoFlags: | |
| flags = buf.read_int32() | |
| return { | |
| "author": authorId, | |
| "date": date, | |
| "srcId": sourceId, | |
| "srcMsgPeer": sourceMessagePeerId, | |
| "srcMsgNs": sourceMessageNamespace, | |
| "srcMsgId": sourceMessageIdId, | |
| "signature": signature, | |
| "psaType": psaType, | |
| "flags": flags, | |
| } | |
| def read_intermediate_message(v: bytes): | |
| """Message parser from the notebook (exact implementation)""" | |
| buf = ByteUtil(io.BytesIO(v)) | |
| typ = buf.read_int8() | |
| if typ != 0: | |
| return None | |
| stableId = buf.read_uint32() | |
| stableVer = buf.read_uint32() | |
| dataFlags = MessageDataFlags(buf.read_uint8()) | |
| globallyUniqueId = None | |
| if MessageDataFlags.GloballyUniqueId in dataFlags: | |
| globallyUniqueId = buf.read_int64() | |
| globalTags = None | |
| if MessageDataFlags.GlobalTags in dataFlags: | |
| globalTags = buf.read_uint32() | |
| groupingKey = None | |
| if MessageDataFlags.GroupingKey in dataFlags: | |
| groupingKey = buf.read_int64() | |
| groupInfoStableId = None | |
| if MessageDataFlags.GroupInfo in dataFlags: | |
| groupInfoStableId = buf.read_uint32() | |
| localTagsVal = None | |
| if MessageDataFlags.LocalTags in dataFlags: | |
| localTagsVal = buf.read_uint32() | |
| threadId = None | |
| if MessageDataFlags.ThreadId in dataFlags: | |
| threadId = buf.read_int64() | |
| flags = MessageFlags(buf.read_uint32()) | |
| tags = MessageTags(buf.read_uint32()) | |
| fwd_info = read_intermediate_fwd_info(buf) | |
| authorId = None | |
| hasAuthorId = buf.read_int8() | |
| if hasAuthorId == 1: | |
| authorId = buf.read_int64() | |
| text = buf.read_str() | |
| attributesCount = buf.read_int32() | |
| attributes = [None] * attributesCount | |
| for i in range(attributesCount): | |
| attributes[i] = PostboxDecoder(buf.read_bytes()).decodeRootObject() | |
| embeddedMediaCount = buf.read_int32() | |
| embeddedMedia = [None] * embeddedMediaCount | |
| for i in range(embeddedMediaCount): | |
| embeddedMedia[i] = PostboxDecoder(buf.read_bytes()).decodeRootObject() | |
| referencedMediaIds = [] | |
| referencedMediaIdsCount = buf.read_int32() | |
| for _ in range(referencedMediaIdsCount): | |
| idNamespace = buf.read_int32() | |
| idId = buf.read_int64() | |
| referencedMediaIds.append((idNamespace, idId)) | |
| return { | |
| "flags": flags, | |
| "tags": tags, | |
| "authorId": authorId, | |
| "fwd": fwd_info, | |
| "text": text, | |
| "referencedMediaIds": referencedMediaIds, | |
| "embeddedMedia": embeddedMedia, | |
| "attributes": attributes, | |
| } | |
| class TelegramReader: | |
| """Main Telegram database reader class""" | |
| def __init__(self): | |
| self.con = None | |
| self._peer_cache = {} | |
| def connect( | |
| self, | |
| db_path: Optional[str] = None, | |
| encrypted_db_path: Optional[str] = None, | |
| tempkey_path: Optional[str] = None, | |
| ): | |
| """Connect to the database (plaintext or live encrypted)""" | |
| if db_path is None: | |
| env_plain = os.getenv(ENV_PLAINTEXT_DB) | |
| if env_plain: | |
| db_path = env_plain | |
| using_sqlcipher = False | |
| if db_path is not None: | |
| db_path = Path(db_path).expanduser() | |
| if not db_path.exists(): | |
| raise FileNotFoundError(f"Database not found: {db_path}") | |
| if _is_sqlite_database(db_path): | |
| self.con = sqlite3.connect(str(db_path)) | |
| else: | |
| console.print( | |
| "ℹ️ Provided database does not look like plaintext SQLite. Opening with SQLCipher (live)." | |
| ) | |
| enc_path, temp_path = _require_encrypted_paths( | |
| encrypted_db_path=str(db_path), tempkey_path=tempkey_path | |
| ) | |
| self.con = _connect_sqlcipher(enc_path, temp_path) | |
| using_sqlcipher = True | |
| else: | |
| enc_path, temp_path = _require_encrypted_paths( | |
| encrypted_db_path=encrypted_db_path, tempkey_path=tempkey_path | |
| ) | |
| self.con = _connect_sqlcipher(enc_path, temp_path) | |
| using_sqlcipher = True | |
| # Get database info | |
| with closing(self.con.cursor()) as cursor: | |
| cursor.execute("SELECT COUNT(*) FROM t7") | |
| message_count = cursor.fetchone()[0] | |
| mode = " (live encrypted)" if using_sqlcipher else "" | |
| console.print(f"✅ Connected to database{mode} ({message_count:,} messages)") | |
| return True | |
| def get_peer(self, peer_id): | |
| """Get peer information by ID with caching""" | |
| if peer_id in self._peer_cache: | |
| return self._peer_cache[peer_id] | |
| with closing(self.con.cursor()) as cursor: | |
| cursor.execute( | |
| "SELECT value FROM t2 WHERE key = ? ORDER BY key LIMIT 1", (peer_id,) | |
| ) | |
| v = cursor.fetchone() | |
| if v is None: | |
| self._peer_cache[peer_id] = None | |
| return None | |
| data = PostboxDecoder(v[0]).decodeRootObject() | |
| # Extract display name | |
| if data: | |
| if data.get("fn") or data.get("ln"): | |
| display_name = f"{data.get('fn', '')} {data.get('ln', '')}".strip() | |
| elif data.get("t"): | |
| display_name = data.get("t", "") | |
| else: | |
| display_name = f"User {peer_id}" | |
| data["display_name"] = display_name | |
| self._peer_cache[peer_id] = data | |
| return data | |
| def search_messages(self, search_term, limit=50): | |
| """Search for messages containing specific text""" | |
| if not search_term or limit <= 0: | |
| return [] | |
| found = [] | |
| hex_pattern = f"%{search_term.encode().hex().upper()}%" | |
| with closing(self.con.cursor()) as cursor: | |
| cursor.execute( | |
| """ | |
| SELECT key, value FROM t7 | |
| WHERE hex(value) LIKE ? | |
| ORDER BY key DESC | |
| LIMIT ? | |
| """, | |
| (hex_pattern, limit * 10), | |
| ) | |
| search_lower = search_term.lower() | |
| for key, value in cursor.fetchall(): | |
| try: | |
| idx = MessageIndex.from_bytes(key) | |
| msg = read_intermediate_message(value) | |
| except Exception: | |
| continue | |
| if msg and msg["text"] and search_lower in msg["text"].lower(): | |
| found.append((idx, msg)) | |
| if len(found) >= limit: | |
| break | |
| return found | |
| def get_recent_messages(self, limit=20): | |
| """Get recent messages from all conversations""" | |
| with closing(self.con.cursor()) as cursor: | |
| cursor.execute( | |
| "SELECT key, value FROM t7 ORDER BY key DESC LIMIT ?", (limit * 5,) | |
| ) | |
| messages = [] | |
| for key, value in cursor.fetchall(): | |
| try: | |
| idx = MessageIndex.from_bytes(key) | |
| msg = read_intermediate_message(value) | |
| except Exception: | |
| continue | |
| if msg and msg["text"]: | |
| messages.append((idx, msg)) | |
| if len(messages) >= limit: | |
| break | |
| return messages | |
| def _create_message_object(self, idx: MessageIndex, msg: Dict) -> Message: | |
| """Create a Message object from parsed data""" | |
| # Get dialog info | |
| peer = self.get_peer(idx.peerId) | |
| dialog_name = ( | |
| peer.get("display_name", f"Unknown {idx.peerId}") | |
| if peer | |
| else f"Unknown {idx.peerId}" | |
| ) | |
| # Get author info | |
| author_id = msg.get("authorId") | |
| if author_id: | |
| author_peer = self.get_peer(author_id) | |
| if author_peer: | |
| author_name = author_peer.get("display_name", f"User {author_id}") | |
| else: | |
| author_name = f"User {author_id}" | |
| else: | |
| author_name = "Unknown" | |
| # Determine direction | |
| direction = "received" if MessageFlags.Incoming in msg["flags"] else "sent" | |
| return Message( | |
| id=idx.id, | |
| dialog_id=idx.peerId, | |
| dialog_name=dialog_name, | |
| author_id=author_id, | |
| author_name=author_name, | |
| timestamp=datetime.datetime.fromtimestamp(idx.timestamp), | |
| text=msg["text"], | |
| direction=direction, | |
| ) | |
| def search_messages_as_objects( | |
| self, search_term: str, limit: int = 50, dialog_name: Optional[str] = None | |
| ) -> List[Message]: | |
| """Search for messages and return as Message objects""" | |
| if not search_term or limit <= 0: | |
| return [] | |
| if dialog_name: | |
| dialog_manager = DialogManager(self) | |
| dialog = dialog_manager.get_dialog_by_name(dialog_name) | |
| if not dialog: | |
| return [] | |
| found_raw = [] | |
| hex_pattern = f"%{search_term.encode().hex().upper()}%" | |
| dialog_hex = struct.pack(">Q", dialog.id).hex().upper() | |
| search_lower = search_term.lower() | |
| with closing(self.con.cursor()) as cursor: | |
| cursor.execute( | |
| """ | |
| SELECT key, value FROM t7 | |
| WHERE hex(substr(key, 1, 8)) = ? AND hex(value) LIKE ? | |
| ORDER BY key DESC | |
| LIMIT ? | |
| """, | |
| (dialog_hex, hex_pattern, limit * 5), | |
| ) | |
| for key, value in cursor.fetchall(): | |
| try: | |
| idx = MessageIndex.from_bytes(key) | |
| msg = read_intermediate_message(value) | |
| except Exception: | |
| continue | |
| if msg and msg["text"] and search_lower in msg["text"].lower(): | |
| found_raw.append((idx, msg)) | |
| if len(found_raw) >= limit: | |
| break | |
| else: | |
| found_raw = self.search_messages(search_term, limit) | |
| # Convert to Message objects | |
| messages = [] | |
| for idx, msg in found_raw: | |
| try: | |
| message_obj = self._create_message_object(idx, msg) | |
| except Exception: | |
| continue | |
| messages.append(message_obj) | |
| return messages | |
| def get_recent_messages_as_objects( | |
| self, limit: int = 20, dialog_name: Optional[str] = None | |
| ) -> List[Message]: | |
| """Get recent messages as Message objects""" | |
| if dialog_name: | |
| dialog_manager = DialogManager(self) | |
| dialog = dialog_manager.get_dialog_by_name(dialog_name) | |
| if not dialog: | |
| return [] | |
| dialog_hex = struct.pack(">Q", dialog.id).hex().upper() | |
| with closing(self.con.cursor()) as cursor: | |
| cursor.execute( | |
| f""" | |
| SELECT key, value FROM t7 | |
| WHERE hex(substr(key, 1, 8)) = '{dialog_hex}' | |
| ORDER BY key DESC | |
| LIMIT ? | |
| """, | |
| (limit * 5,), | |
| ) | |
| messages = [] | |
| for key, value in cursor.fetchall(): | |
| try: | |
| idx = MessageIndex.from_bytes(key) | |
| msg = read_intermediate_message(value) | |
| except Exception: | |
| continue | |
| if msg and msg["text"]: | |
| message_obj = self._create_message_object(idx, msg) | |
| messages.append(message_obj) | |
| if len(messages) >= limit: | |
| break | |
| return messages | |
| else: | |
| recent_tuples = self.get_recent_messages(limit) | |
| messages = [] | |
| for idx, msg in recent_tuples: | |
| try: | |
| message_obj = self._create_message_object(idx, msg) | |
| except Exception: | |
| continue | |
| messages.append(message_obj) | |
| return messages | |
| def print_message(self, idx, msg): | |
| """Print a single message (legacy format)""" | |
| direction = "<-" if MessageFlags.Incoming in msg["flags"] else "->" | |
| ts = datetime.datetime.fromtimestamp(idx.timestamp).isoformat() | |
| peer = self.get_peer(idx.peerId) | |
| peer_name = ( | |
| peer.get("display_name", f"Peer {idx.peerId}") | |
| if peer | |
| else f"Peer {idx.peerId}" | |
| ) | |
| author_name = "Unknown" | |
| if msg.get("authorId"): | |
| author_peer = self.get_peer(msg["authorId"]) | |
| if author_peer: | |
| author_name = author_peer.get("display_name", f"User {msg['authorId']}") | |
| console.print(f"=== {direction} {ts} in {peer_name}") | |
| console.print(f"=== From: {author_name}") | |
| if msg["text"]: | |
| console.print(msg["text"]) | |
| console.print() | |
| class DialogManager: | |
| """Manages dialog discovery and filtering""" | |
| def __init__(self, reader: TelegramReader): | |
| self.reader = reader | |
| self._dialog_cache = {} | |
| def get_all_dialogs(self, include_empty: bool = False) -> List[Dialog]: | |
| """Get all dialogs with message counts""" | |
| dialogs = [] | |
| with closing(self.reader.con.cursor()) as cursor: | |
| cursor.execute(""" | |
| SELECT substr(key, 1, 8) as peer_prefix, COUNT(*) as msg_count, MAX(key) as last_key | |
| FROM t7 | |
| GROUP BY substr(key, 1, 8) | |
| ORDER BY msg_count DESC | |
| """) | |
| peer_stats = {} | |
| for prefix, count, last_key in cursor.fetchall(): | |
| try: | |
| if len(prefix) >= 8: | |
| peer_id = struct.unpack(">Q", prefix)[0] | |
| peer_stats[peer_id] = { | |
| "message_count": count, | |
| "last_key": last_key, | |
| } | |
| except (struct.error, ValueError): | |
| continue | |
| if peer_stats: | |
| placeholders = ",".join("?" * len(peer_stats)) | |
| cursor.execute( | |
| f"SELECT key, value FROM t2 WHERE key IN ({placeholders})", | |
| list(peer_stats.keys()), | |
| ) | |
| for peer_id, peer_data in cursor.fetchall(): | |
| peer_info = self.reader.get_peer(peer_id) | |
| if not peer_info: | |
| continue | |
| stats = peer_stats.get( | |
| peer_id, {"message_count": 0, "last_key": None} | |
| ) | |
| message_count = stats["message_count"] | |
| if not include_empty and message_count == 0: | |
| continue | |
| last_activity = None | |
| if stats["last_key"]: | |
| try: | |
| idx = MessageIndex.from_bytes(stats["last_key"]) | |
| last_activity = datetime.datetime.fromtimestamp( | |
| idx.timestamp | |
| ) | |
| except (struct.error, TypeError, ValueError): | |
| last_activity = None | |
| dialog_type = self._determine_dialog_type(peer_info) | |
| dialog = Dialog( | |
| id=peer_id, | |
| name=peer_info["display_name"], | |
| username=peer_info.get("un"), | |
| type=dialog_type, | |
| message_count=message_count, | |
| last_activity=last_activity, | |
| ) | |
| dialogs.append(dialog) | |
| if include_empty: | |
| cursor.execute("SELECT key, value FROM t2") | |
| for peer_id, peer_data in cursor.fetchall(): | |
| if peer_id not in peer_stats: | |
| peer_info = self.reader.get_peer(peer_id) | |
| if peer_info: | |
| dialog_type = self._determine_dialog_type(peer_info) | |
| dialog = Dialog( | |
| id=peer_id, | |
| name=peer_info["display_name"], | |
| username=peer_info.get("un"), | |
| type=dialog_type, | |
| message_count=0, | |
| last_activity=None, | |
| ) | |
| dialogs.append(dialog) | |
| return dialogs | |
| def _determine_dialog_type(self, peer_info: Dict) -> str: | |
| """Determine if peer is user, group, or channel""" | |
| if peer_info.get("t"): | |
| if peer_info.get("un"): | |
| return "channel" | |
| else: | |
| return "group" | |
| elif peer_info.get("fn") or peer_info.get("ln"): | |
| return "user" | |
| else: | |
| return "unknown" | |
| def search_dialogs(self, query: str) -> List[Dialog]: | |
| """Search dialogs by name or username""" | |
| all_dialogs = self.get_all_dialogs() | |
| query_lower = query.lower() | |
| filtered = [] | |
| for dialog in all_dialogs: | |
| if query_lower in dialog.name.lower(): | |
| filtered.append(dialog) | |
| continue | |
| if dialog.username and query_lower in dialog.username.lower(): | |
| filtered.append(dialog) | |
| continue | |
| return filtered | |
| def filter_dialogs( | |
| self, | |
| dialogs: List[Dialog], | |
| dialog_type: Optional[str] = None, | |
| min_messages: int = 0, | |
| ) -> List[Dialog]: | |
| """Filter dialogs by type and activity""" | |
| filtered = dialogs | |
| if dialog_type: | |
| filtered = [d for d in filtered if d.type == dialog_type] | |
| if min_messages > 0: | |
| filtered = [d for d in filtered if d.message_count >= min_messages] | |
| return filtered | |
| def get_dialog_by_name(self, name: str) -> Optional[Dialog]: | |
| """Find dialog by exact or partial name match""" | |
| dialogs = self.search_dialogs(name) | |
| for dialog in dialogs: | |
| if dialog.name.lower() == name.lower(): | |
| return dialog | |
| if dialog.username and dialog.username.lower() == name.lower(): | |
| return dialog | |
| return dialogs[0] if dialogs else None | |
| def handle_dialogs_list(args, reader, dialog_manager): | |
| """Handle dialogs list command""" | |
| dialogs = dialog_manager.get_all_dialogs() | |
| if args.limit: | |
| dialogs = dialogs[: args.limit] | |
| OutputFormatter.print_dialogs(dialogs, args.format) | |
| def handle_dialogs_search(args, reader, dialog_manager): | |
| """Handle dialogs search command""" | |
| dialogs = dialog_manager.search_dialogs(args.query) | |
| OutputFormatter.print_dialogs(dialogs, args.format) | |
| def handle_messages_recent(args, reader, dialog_manager): | |
| """Handle messages recent command""" | |
| messages = reader.get_recent_messages_as_objects(args.limit, args.dialog) | |
| OutputFormatter.print_messages(messages, args.format) | |
| def handle_messages_search(args, reader, dialog_manager): | |
| """Handle messages search command""" | |
| messages = reader.search_messages_as_objects(args.query, args.limit, args.dialog) | |
| if messages: | |
| if args.format == "table": | |
| console.print(f"\n🔍 Found {len(messages)} messages for '{args.query}':") | |
| OutputFormatter.print_messages(messages, args.format) | |
| else: | |
| if args.format == "table": | |
| console.print("No messages found") | |
| def handle_legacy_mode(args, reader): | |
| """Handle legacy single-flag mode for backwards compatibility""" | |
| if args.search: | |
| console.print(f"\n🔍 Searching for: '{args.search}'") | |
| messages = reader.search_messages(args.search, args.limit) | |
| if messages: | |
| console.print(f"Found {len(messages)} messages:") | |
| for idx, msg in messages: | |
| reader.print_message(idx, msg) | |
| else: | |
| console.print("No messages found") | |
| elif args.recent: | |
| console.print("\n📱 Recent messages:") | |
| messages = reader.get_recent_messages(args.recent) | |
| for idx, msg in messages: | |
| reader.print_message(idx, msg) | |
| elif args.peer: | |
| console.print(f"\n👤 Messages from peer {args.peer}:") | |
| # Implementation for peer-specific messages | |
| pass | |
| return True | |
| def handle_paths(args): | |
| """Handle paths command""" | |
| locations = discover_telegram_db_locations() | |
| OutputFormatter.print_paths(locations, args.format) | |
| return True | |
| def main(): | |
| """Main CLI entry point""" | |
| parser = argparse.ArgumentParser( | |
| description="Telegram Database Reader - Enhanced CLI for exploring Telegram data", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # List all dialogs | |
| telegram-reader dialogs list | |
| # Search for specific dialog | |
| telegram-reader dialogs search "starcraft" | |
| # Get recent messages from specific dialog | |
| telegram-reader messages recent --dialog "StarCraft" --limit 10 | |
| # Search messages in all dialogs | |
| telegram-reader messages search "brete" --limit 5 | |
| # Export to JSON | |
| telegram-reader dialogs list --format json | |
| # Show detected database paths | |
| telegram-reader paths | |
| # Use explicit paths | |
| telegram-reader --encrypted-db "/path/to/db_sqlite" --tempkey "/path/to/.tempkeyEncrypted" dialogs list | |
| # Live encrypted DB (default) | |
| telegram-reader dialogs list | |
| # Legacy mode (backwards compatibility) | |
| telegram-reader --search "keyword" --limit 10 | |
| """, | |
| ) | |
| # Global options | |
| parser.add_argument( | |
| "--format", | |
| choices=["table", "json", "csv"], | |
| default="table", | |
| help="Output format (default: table)", | |
| ) | |
| parser.add_argument( | |
| "--db", | |
| help=( | |
| "Path to plaintext SQLite DB (overrides auto-discovery or set " | |
| f"{ENV_PLAINTEXT_DB})" | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--encrypted-db", | |
| help=( | |
| "Path to encrypted db_sqlite (overrides auto-discovery or set " | |
| f"{ENV_ENCRYPTED_DB})" | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--tempkey", | |
| help=f"Path to .tempkeyEncrypted (or set {ENV_TEMPKEY})", | |
| ) | |
| # Legacy options for backwards compatibility | |
| parser.add_argument("--search", help="Legacy: Search for messages containing text") | |
| parser.add_argument("--recent", type=int, help="Legacy: Show recent messages") | |
| parser.add_argument( | |
| "--peer", type=int, help="Legacy: Show messages from specific peer ID" | |
| ) | |
| parser.add_argument("--limit", type=int, default=20, help="Limit number of results") | |
| # Subcommands | |
| subparsers = parser.add_subparsers(dest="command", help="Available commands") | |
| # Paths discovery | |
| paths_parser = subparsers.add_parser( | |
| "paths", help="List detected Telegram database locations" | |
| ) | |
| paths_parser.add_argument( | |
| "--format", choices=["table", "json", "csv"], default="table" | |
| ) | |
| # Dialogs subcommand | |
| dialogs_parser = subparsers.add_parser( | |
| "dialogs", help="Manage dialogs (chats, groups, channels)" | |
| ) | |
| dialogs_subparsers = dialogs_parser.add_subparsers(dest="dialogs_action") | |
| # Dialogs list | |
| dialogs_list_parser = dialogs_subparsers.add_parser("list", help="List all dialogs") | |
| dialogs_list_parser.add_argument( | |
| "--limit", type=int, help="Limit number of dialogs" | |
| ) | |
| dialogs_list_parser.add_argument( | |
| "--format", choices=["table", "json", "csv"], default="table" | |
| ) | |
| # Dialogs search | |
| dialogs_search_parser = dialogs_subparsers.add_parser( | |
| "search", help="Search dialogs by name" | |
| ) | |
| dialogs_search_parser.add_argument("query", help="Search query for dialog name") | |
| dialogs_search_parser.add_argument( | |
| "--format", choices=["table", "json", "csv"], default="table" | |
| ) | |
| # Messages subcommand | |
| messages_parser = subparsers.add_parser("messages", help="Work with messages") | |
| messages_subparsers = messages_parser.add_subparsers(dest="messages_action") | |
| # Messages recent | |
| messages_recent_parser = messages_subparsers.add_parser( | |
| "recent", help="Get recent messages" | |
| ) | |
| messages_recent_parser.add_argument("--dialog", help="Dialog name to filter by") | |
| messages_recent_parser.add_argument( | |
| "--limit", type=int, default=20, help="Number of messages" | |
| ) | |
| messages_recent_parser.add_argument( | |
| "--format", choices=["table", "json", "csv"], default="table" | |
| ) | |
| # Messages search | |
| messages_search_parser = messages_subparsers.add_parser( | |
| "search", help="Search messages" | |
| ) | |
| messages_search_parser.add_argument("query", help="Search query for message text") | |
| messages_search_parser.add_argument("--dialog", help="Dialog name to filter by") | |
| messages_search_parser.add_argument( | |
| "--limit", type=int, default=50, help="Number of messages" | |
| ) | |
| messages_search_parser.add_argument( | |
| "--format", choices=["table", "json", "csv"], default="table" | |
| ) | |
| args = parser.parse_args() | |
| if args.command == "paths": | |
| return handle_paths(args) | |
| # Check for legacy mode | |
| if args.search or args.recent or args.peer: | |
| console.print("ℹ️ Using legacy mode. Consider using the new subcommand interface:") | |
| console.print(" dialogs list, dialogs search, messages recent, messages search") | |
| console.print() | |
| reader = TelegramReader() | |
| reader.connect( | |
| db_path=args.db, | |
| encrypted_db_path=args.encrypted_db, | |
| tempkey_path=args.tempkey, | |
| ) | |
| return handle_legacy_mode(args, reader) | |
| # Handle subcommands | |
| if not args.command: | |
| parser.print_help() | |
| return False | |
| # Initialize reader and dialog manager | |
| reader = TelegramReader() | |
| reader.connect( | |
| db_path=args.db, | |
| encrypted_db_path=args.encrypted_db, | |
| tempkey_path=args.tempkey, | |
| ) | |
| dialog_manager = DialogManager(reader) | |
| # Route to appropriate handler | |
| if args.command == "dialogs": | |
| if args.dialogs_action == "list": | |
| handle_dialogs_list(args, reader, dialog_manager) | |
| elif args.dialogs_action == "search": | |
| handle_dialogs_search(args, reader, dialog_manager) | |
| else: | |
| dialogs_parser.print_help() | |
| elif args.command == "messages": | |
| if args.messages_action == "recent": | |
| handle_messages_recent(args, reader, dialog_manager) | |
| elif args.messages_action == "search": | |
| handle_messages_search(args, reader, dialog_manager) | |
| else: | |
| messages_parser.print_help() | |
| return True | |
| if __name__ == "__main__": | |
| try: | |
| success = main() | |
| sys.exit(0 if success else 1) | |
| except KeyboardInterrupt: | |
| console.print("\n⚠️ Interrupted by user") | |
| sys.exit(1) | |
| except Exception as e: | |
| console.print(f"❌ Error: {e}") | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment