|
#!/usr/bin/env node |
|
|
|
/** |
|
* Telegram Bot — AI CLI bridge (Claude + Codex). |
|
* Zero dependencies. Receives messages from Telegram, passes them to an AI CLI, |
|
* streams output back with live message editing. |
|
* |
|
* Supported agents: |
|
* - claude (default) — Anthropic Claude Code CLI |
|
* - codex — OpenAI Codex CLI |
|
* |
|
* Set AGENT_CMD in .env to switch (e.g. AGENT_CMD=codex). |
|
*/ |
|
|
|
import { readFileSync, writeFileSync, mkdirSync, mkdtempSync, existsSync, unlinkSync, rmSync, createWriteStream } from 'node:fs'; |
|
import { resolve, extname, join } from 'node:path'; |
|
import { spawn, execFileSync } from 'node:child_process'; |
|
import { createInterface } from 'node:readline'; |
|
import { tmpdir, homedir } from 'node:os'; |
|
import { pipeline } from 'node:stream/promises'; |
|
import { Readable } from 'node:stream'; |
|
|
|
import { loadEnv } from './env.js'; |
|
import { toTelegramHTML, stripHTMLTags, htmlEscape, stripBackslashEscapes } from './html.js'; |
|
|
|
loadEnv(); |
|
|
|
// --- Config --- |
|
|
|
const BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN; |
|
const ALLOWED_USERS = (process.env.ALLOWED_USERS || '') |
|
.split(',') |
|
.map((u) => u.trim()) |
|
.filter(Boolean); |
|
const WORK_DIR = process.env.GCTRL_WORK_DIR || process.env.WORK_DIR || homedir(); |
|
const AGENT_CMD = process.env.AGENT_CMD || 'claude'; |
|
const AGENT_MODEL = process.env.AGENT_MODEL || ''; |
|
const API_BASE = `https://api.telegram.org/bot${BOT_TOKEN}`; |
|
|
|
const EDIT_INTERVAL = 500; // ms between message edits |
|
const TYPING_INTERVAL = 4000; // ms between typing indicators |
|
const SESSION_EXPIRY = 24 * 60 * 60 * 1000; // 24h in ms |
|
const PROCESS_TIMEOUT = 5 * 60 * 1000; // 5 min — auto-kill hanging processes |
|
const MAX_MESSAGE_LENGTH = 4000; |
|
|
|
const ALLOWED_TOOLS = 'Read,Write,Edit,Bash,Glob,Grep,WebFetch,WebSearch,Skill,Task'; |
|
|
|
// --- Agent detection --- |
|
|
|
const AGENT_TYPE = AGENT_CMD.includes('codex') ? 'codex' : 'claude'; |
|
|
|
// --- Telegram API helpers --- |
|
|
|
async function tg(method, body) { |
|
const res = await fetch(`${API_BASE}/${method}`, { |
|
method: 'POST', |
|
headers: { 'Content-Type': 'application/json' }, |
|
body: JSON.stringify(body), |
|
}); |
|
const data = await res.json(); |
|
if (!data.ok) { |
|
log(`Telegram API error [${method}]: ${data.description}`); |
|
} |
|
return data; |
|
} |
|
|
|
async function sendMessage(chatId, text, parseMode = 'HTML') { |
|
const result = await tg('sendMessage', { chat_id: chatId, text, parse_mode: parseMode }); |
|
if (!result.ok && parseMode === 'HTML') { |
|
// Fallback to plain text if HTML is rejected |
|
return tg('sendMessage', { chat_id: chatId, text: stripHTMLTags(text) }); |
|
} |
|
return result; |
|
} |
|
|
|
async function editMessageText(chatId, messageId, text, parseMode) { |
|
const body = { chat_id: chatId, message_id: messageId, text }; |
|
if (parseMode) body.parse_mode = parseMode; |
|
const result = await tg('editMessageText', body); |
|
if (!result.ok && parseMode === 'HTML') { |
|
// Fallback to plain text |
|
return tg('editMessageText', { chat_id: chatId, message_id: messageId, text: stripHTMLTags(text) }); |
|
} |
|
return result; |
|
} |
|
|
|
async function sendTyping(chatId) { |
|
return tg('sendChatAction', { chat_id: chatId, action: 'typing' }); |
|
} |
|
|
|
async function getFile(fileId) { |
|
return tg('getFile', { file_id: fileId }); |
|
} |
|
|
|
// --- Text helpers --- |
|
|
|
function truncate(text) { |
|
if (text.length > MAX_MESSAGE_LENGTH) { |
|
return text.slice(0, MAX_MESSAGE_LENGTH) + '\n\n... (truncated)'; |
|
} |
|
return text; |
|
} |
|
|
|
const COST_LINE_RE = /^Cost:.*\n?/gm; |
|
|
|
function stripCostLine(text) { |
|
return text.replace(COST_LINE_RE, ''); |
|
} |
|
|
|
// --- Logging --- |
|
|
|
function log(msg) { |
|
console.log(`[${new Date().toISOString()}] ${msg}`); |
|
} |
|
|
|
// --- Session management (JSON file) --- |
|
|
|
const SESSION_DIR = resolve(homedir(), '.gctrl'); |
|
const SESSION_FILE = resolve(SESSION_DIR, 'telegram-sessions.json'); |
|
|
|
function loadSessions() { |
|
try { |
|
return JSON.parse(readFileSync(SESSION_FILE, 'utf8')); |
|
} catch { |
|
return {}; |
|
} |
|
} |
|
|
|
function saveSessions(sessions) { |
|
try { |
|
if (!existsSync(SESSION_DIR)) { |
|
mkdirSync(SESSION_DIR, { recursive: true }); |
|
} |
|
writeFileSync(SESSION_FILE, JSON.stringify(sessions, null, 2)); |
|
} catch (err) { |
|
log(`Failed to save sessions: ${err.message}`); |
|
} |
|
} |
|
|
|
function getSession(userId) { |
|
const sessions = loadSessions(); |
|
const userSessions = sessions[userId]; |
|
if (!userSessions || !Array.isArray(userSessions) || userSessions.length === 0) { |
|
return null; |
|
} |
|
const active = userSessions.find((s) => s.active); |
|
if (!active) return null; |
|
|
|
// Check expiry |
|
if (Date.now() - new Date(active.lastActive).getTime() > SESSION_EXPIRY) { |
|
active.active = false; |
|
saveSessions(sessions); |
|
return null; |
|
} |
|
|
|
return active; |
|
} |
|
|
|
function deactivateAll(list) { |
|
for (const s of list) s.active = false; |
|
} |
|
|
|
function saveSession(userId, sessionId, title) { |
|
const sessions = loadSessions(); |
|
if (!sessions[userId]) sessions[userId] = []; |
|
|
|
const existing = sessions[userId].find((s) => s.active); |
|
if (existing) { |
|
if (sessionId) existing.sessionId = sessionId; |
|
if (title) existing.title = title; |
|
existing.lastActive = new Date().toISOString(); |
|
} else { |
|
deactivateAll(sessions[userId]); |
|
sessions[userId].unshift({ |
|
sessionId: sessionId || '', |
|
title: title || '', |
|
lastActive: new Date().toISOString(), |
|
active: true, |
|
}); |
|
if (sessions[userId].length > 10) sessions[userId].length = 10; |
|
} |
|
|
|
saveSessions(sessions); |
|
} |
|
|
|
function clearSession(userId) { |
|
const sessions = loadSessions(); |
|
if (sessions[userId]) { |
|
deactivateAll(sessions[userId]); |
|
saveSessions(sessions); |
|
} |
|
} |
|
|
|
function getRecentSessions(userId, limit = 5) { |
|
const sessions = loadSessions(); |
|
return (sessions[userId] || []).slice(0, limit); |
|
} |
|
|
|
function activateSession(userId, index) { |
|
const sessions = loadSessions(); |
|
if (!sessions[userId] || index >= sessions[userId].length) return false; |
|
deactivateAll(sessions[userId]); |
|
sessions[userId][index].active = true; |
|
sessions[userId][index].lastActive = new Date().toISOString(); |
|
saveSessions(sessions); |
|
return true; |
|
} |
|
|
|
// --- Active user tracking (one request at a time per user) --- |
|
|
|
/** @type {Map<string, AbortController>} */ |
|
const activeUsers = new Map(); |
|
|
|
function acquireUser(chatId, userId) { |
|
if (activeUsers.has(userId)) { |
|
sendMessage(chatId, 'Still processing your previous message. Please wait...'); |
|
return null; |
|
} |
|
const controller = new AbortController(); |
|
controller._timeout = setTimeout(() => { |
|
log(`Process timeout for user ${userId} — aborting after ${PROCESS_TIMEOUT / 1000}s`); |
|
controller.abort(); |
|
}, PROCESS_TIMEOUT); |
|
activeUsers.set(userId, controller); |
|
return controller; |
|
} |
|
|
|
function releaseUser(userId) { |
|
const controller = activeUsers.get(userId); |
|
if (controller?._timeout) clearTimeout(controller._timeout); |
|
activeUsers.delete(userId); |
|
} |
|
|
|
function interruptUser(userId) { |
|
const controller = activeUsers.get(userId); |
|
if (!controller) return false; |
|
controller.abort(); |
|
return true; |
|
} |
|
|
|
// --- Tool display names --- |
|
|
|
const TOOL_DISPLAY = { |
|
Read: 'Reading', |
|
Write: 'Writing', |
|
Edit: 'Editing', |
|
Bash: 'Running command', |
|
Glob: 'Searching files', |
|
Grep: 'Searching code', |
|
WebFetch: 'Fetching page', |
|
WebSearch: 'Searching web', |
|
}; |
|
|
|
// --- CLI streaming (agent-agnostic event model) --- |
|
// |
|
// Both parsers normalise CLI output into a common event stream: |
|
// { type: 'chunk', content } — incremental text |
|
// { type: 'snapshot', content } — full-text replacement |
|
// { type: 'tool_start', content } — tool/command name |
|
// { type: 'tool_done' } — tool finished |
|
// { type: 'session_id', content } — session id for resume |
|
// { type: 'done', content, sessionId } |
|
// { type: 'error', content } |
|
|
|
// ── Claude parser ────────────────────────────────────────────────────────── |
|
|
|
function parseClaudeLine(line) { |
|
let raw; |
|
try { |
|
raw = JSON.parse(line); |
|
} catch { |
|
return []; |
|
} |
|
|
|
const type = raw.type; |
|
const events = []; |
|
|
|
switch (type) { |
|
case 'system': |
|
if (raw.session_id) { |
|
events.push({ type: 'session_id', content: raw.session_id }); |
|
} |
|
break; |
|
|
|
case 'assistant': { |
|
const content = raw.message?.content; |
|
if (!Array.isArray(content)) break; |
|
let text = ''; |
|
for (const block of content) { |
|
if (block.type === 'text' && block.text) text += block.text; |
|
if (block.type === 'tool_use' && block.name) { |
|
events.push({ type: 'tool_start', content: block.name }); |
|
} |
|
} |
|
if (text) events.push({ type: 'snapshot', content: text }); |
|
break; |
|
} |
|
|
|
case 'user': { |
|
const content = raw.message?.content; |
|
if (!Array.isArray(content)) break; |
|
for (const block of content) { |
|
if (block.type === 'tool_result') { |
|
events.push({ type: 'tool_done' }); |
|
break; |
|
} |
|
} |
|
break; |
|
} |
|
|
|
case 'stream_event': { |
|
const event = raw.event; |
|
if (!event) break; |
|
if (event.type === 'content_block_delta') { |
|
const delta = event.delta; |
|
if (delta?.type === 'text_delta' && delta.text) { |
|
events.push({ type: 'chunk', content: delta.text }); |
|
} |
|
} else if (event.type === 'content_block_start') { |
|
const block = event.content_block; |
|
if (block?.type === 'tool_use' && block.name) { |
|
events.push({ type: 'tool_start', content: block.name }); |
|
} |
|
} |
|
break; |
|
} |
|
|
|
case 'result': |
|
events.push({ |
|
type: 'done', |
|
content: raw.result || '', |
|
sessionId: raw.session_id || '', |
|
}); |
|
break; |
|
} |
|
|
|
return events; |
|
} |
|
|
|
// ── Codex parser ─────────────────────────────────────────────────────────── |
|
|
|
function parseCodexLine(line) { |
|
let raw; |
|
try { |
|
raw = JSON.parse(line); |
|
} catch { |
|
return []; |
|
} |
|
|
|
const type = raw.type; |
|
const events = []; |
|
|
|
switch (type) { |
|
case 'thread.started': |
|
if (raw.thread_id) { |
|
events.push({ type: 'session_id', content: raw.thread_id }); |
|
} |
|
break; |
|
|
|
case 'item.started': { |
|
const item = raw.item; |
|
if (!item) break; |
|
if (item.type === 'command_execution' && item.command) { |
|
events.push({ type: 'tool_start', content: 'Running command' }); |
|
} else if (item.type === 'file_change') { |
|
events.push({ type: 'tool_start', content: 'Editing file' }); |
|
} else if (item.type === 'mcp_tool_call') { |
|
events.push({ type: 'tool_start', content: item.tool_name || 'Tool call' }); |
|
} else if (item.type === 'web_search') { |
|
events.push({ type: 'tool_start', content: 'Searching web' }); |
|
} |
|
break; |
|
} |
|
|
|
case 'item.completed': { |
|
const item = raw.item; |
|
if (!item) break; |
|
if (item.type === 'agent_message' && item.text) { |
|
events.push({ type: 'snapshot', content: item.text }); |
|
} else { |
|
// Tool/command finished |
|
events.push({ type: 'tool_done' }); |
|
} |
|
break; |
|
} |
|
|
|
case 'turn.completed': |
|
// Final turn — extract the last agent message if present |
|
events.push({ type: 'done', content: '', sessionId: '' }); |
|
break; |
|
|
|
case 'turn.failed': |
|
case 'error': |
|
events.push({ type: 'error', content: raw.message || raw.error || 'Codex error' }); |
|
break; |
|
} |
|
|
|
return events; |
|
} |
|
|
|
// ── Spawn & stream ───────────────────────────────────────────────────────── |
|
|
|
function buildAgentArgs(prompt, sessionId) { |
|
if (AGENT_TYPE === 'codex') { |
|
// Codex: `codex exec "prompt" --json --full-auto` |
|
// Resume: `codex exec resume --last --json --full-auto` |
|
if (sessionId) { |
|
const args = ['exec', 'resume', '--last', '--json', '--full-auto']; |
|
if (AGENT_MODEL) args.push('-m', AGENT_MODEL); |
|
// Append the new prompt via stdin or as trailing arg |
|
args.push(prompt); |
|
return args; |
|
} |
|
const args = ['exec', prompt, '--json', '--full-auto']; |
|
if (AGENT_MODEL) args.push('-m', AGENT_MODEL); |
|
return args; |
|
} |
|
|
|
// Claude: `claude -p "prompt" --output-format stream-json ...` |
|
const args = ['-p', prompt, '--verbose', '--output-format', 'stream-json', '--include-partial-messages', '--allowedTools', ALLOWED_TOOLS]; |
|
if (AGENT_MODEL) args.push('--model', AGENT_MODEL); |
|
if (sessionId) args.push('--resume', sessionId); |
|
return args; |
|
} |
|
|
|
const parseLine = AGENT_TYPE === 'codex' ? parseCodexLine : parseClaudeLine; |
|
|
|
async function* streamAgent(prompt, sessionId, signal) { |
|
const args = buildAgentArgs(prompt, sessionId); |
|
|
|
// Strip CLAUDECODE env to allow nested launches |
|
const env = { ...process.env }; |
|
delete env.CLAUDECODE; |
|
|
|
const proc = spawn(AGENT_CMD, args, { |
|
cwd: WORK_DIR, |
|
env, |
|
stdio: ['ignore', 'pipe', 'pipe'], |
|
}); |
|
|
|
// Catch spawn errors (e.g. ENOENT if agent binary missing) so they |
|
// don't crash the whole bot — surface as a stream error instead. |
|
let spawnError = null; |
|
proc.on('error', (err) => { |
|
spawnError = err; |
|
}); |
|
|
|
const onAbort = () => { |
|
proc.kill('SIGTERM'); |
|
setTimeout(() => proc.kill('SIGKILL'), 3000); |
|
}; |
|
signal?.addEventListener('abort', onAbort, { once: true }); |
|
|
|
const rl = createInterface({ input: proc.stdout, crlfDelay: Infinity }); |
|
|
|
// Capture close promise BEFORE readline loop — if we register after, |
|
// the process may have already exited and the event already fired, |
|
// causing the promise to hang forever and releaseUser to never run. |
|
const closed = new Promise((resolve) => proc.on('close', resolve)); |
|
|
|
const stderrRl = createInterface({ input: proc.stderr, crlfDelay: Infinity }); |
|
stderrRl.on('line', (line) => log(`[${AGENT_TYPE} stderr] ${line}`)); |
|
|
|
for await (const line of rl) { |
|
if (!line) continue; |
|
yield* parseLine(line); |
|
} |
|
|
|
signal?.removeEventListener('abort', onAbort); |
|
|
|
await closed; |
|
|
|
if (spawnError) { |
|
yield { type: 'error', content: `Failed to start ${AGENT_TYPE}: ${spawnError.message}` }; |
|
} else if (proc.exitCode !== 0 && !signal?.aborted) { |
|
yield { type: 'error', content: `${AGENT_TYPE} exited with code ${proc.exitCode}` }; |
|
} |
|
} |
|
|
|
/** |
|
* Stream an agent response and live-edit a Telegram message. |
|
* Works with both Claude and Codex via the normalised event model. |
|
*/ |
|
async function streamAgentResponse(chatId, messageId, userId, prompt, prefix = '') { |
|
const session = getSession(userId); |
|
const prevSessionId = session?.sessionId || ''; |
|
const controller = activeUsers.get(userId); |
|
const signal = controller?.signal; |
|
|
|
let accumulated = ''; |
|
let activeTool = ''; |
|
let lastEdit = 0; |
|
let lastTyping = Date.now(); |
|
let sessionId = ''; |
|
|
|
function renderDisplay() { |
|
let raw = stripBackslashEscapes(accumulated); |
|
if (activeTool) { |
|
const label = TOOL_DISPLAY[activeTool] || activeTool; |
|
if (raw) raw += '\n\n'; |
|
raw += `\u23f3 ${label}...`; |
|
} |
|
return raw; |
|
} |
|
|
|
async function maybeEdit(force = false) { |
|
const now = Date.now(); |
|
if (!force && now - lastEdit < EDIT_INTERVAL) return; |
|
const text = renderDisplay(); |
|
if (!text) return; |
|
await editMessageText(chatId, messageId, truncate(prefix + text)); |
|
lastEdit = now; |
|
} |
|
|
|
async function maybeTyping() { |
|
if (Date.now() - lastTyping > TYPING_INTERVAL) { |
|
sendTyping(chatId); // fire-and-forget |
|
lastTyping = Date.now(); |
|
} |
|
} |
|
|
|
try { |
|
for await (const event of streamAgent(prompt, prevSessionId, signal)) { |
|
await maybeTyping(); |
|
|
|
switch (event.type) { |
|
case 'chunk': |
|
accumulated += event.content; |
|
await maybeEdit(); |
|
break; |
|
|
|
case 'snapshot': |
|
if (event.content) { |
|
accumulated = event.content; |
|
await maybeEdit(); |
|
} |
|
break; |
|
|
|
case 'tool_start': |
|
activeTool = event.content; |
|
await maybeEdit(true); |
|
break; |
|
|
|
case 'tool_done': |
|
activeTool = ''; |
|
break; |
|
|
|
case 'session_id': |
|
sessionId = event.content; |
|
break; |
|
|
|
case 'done': |
|
if (event.sessionId) sessionId = event.sessionId; |
|
activeTool = ''; |
|
|
|
let finalText = stripCostLine(accumulated); |
|
if (!finalText && event.content) finalText = stripCostLine(event.content); |
|
if (!finalText) finalText = `No response from ${AGENT_TYPE}.`; |
|
|
|
await editMessageText(chatId, messageId, truncate(prefix + toTelegramHTML(finalText)), 'HTML'); |
|
break; |
|
|
|
case 'error': |
|
await editMessageText(chatId, messageId, truncate(prefix + 'Error: ' + htmlEscape(event.content)), 'HTML'); |
|
break; |
|
} |
|
} |
|
} catch (err) { |
|
if (signal?.aborted) { |
|
// Interrupted |
|
if (accumulated) { |
|
await editMessageText( |
|
chatId, |
|
messageId, |
|
truncate(prefix + toTelegramHTML(accumulated) + '\n\n<i>(interrupted)</i>'), |
|
'HTML', |
|
); |
|
} |
|
} else { |
|
log(`Stream error: ${err.message}`); |
|
await editMessageText(chatId, messageId, truncate(prefix + 'Error: ' + htmlEscape(err.message)), 'HTML'); |
|
} |
|
} |
|
|
|
// Save session (Codex: thread_id, Claude: session_id) |
|
if (!sessionId) sessionId = prevSessionId; |
|
if (sessionId) { |
|
const title = prompt.slice(0, 50); |
|
saveSession(userId, sessionId, title); |
|
} |
|
} |
|
|
|
// --- Whisper voice transcription --- |
|
|
|
let whisperPath = null; |
|
|
|
function initWhisper() { |
|
const venvPath = resolve(homedir(), '.cc-tools-venv', 'bin', 'mlx_whisper'); |
|
|
|
// Check known venv path first, then search PATH |
|
if (existsSync(venvPath)) { |
|
whisperPath = venvPath; |
|
log(`Whisper found: ${whisperPath}`); |
|
return; |
|
} |
|
|
|
for (const name of ['mlx_whisper', 'whisper']) { |
|
try { |
|
execFileSync('which', [name], { encoding: 'utf8', stdio: ['ignore', 'pipe', 'ignore'] }); |
|
whisperPath = name; |
|
log(`Whisper found: ${whisperPath}`); |
|
return; |
|
} catch {} |
|
} |
|
|
|
log('Warning: No whisper binary found. Voice messages will not be transcribed.'); |
|
} |
|
|
|
async function downloadTelegramFile(fileId, prefix) { |
|
const result = await getFile(fileId); |
|
if (!result.ok) throw new Error('Failed to get file info'); |
|
|
|
const filePath = result.result.file_path; |
|
const url = `https://api.telegram.org/file/bot${BOT_TOKEN}/${filePath}`; |
|
|
|
const ext = extname(filePath) || '.ogg'; |
|
const tmpPath = join(tmpdir(), `telegram-${prefix}-${Date.now()}${ext}`); |
|
|
|
const res = await fetch(url); |
|
if (!res.ok) throw new Error(`Download failed: ${res.status}`); |
|
|
|
const fileStream = createWriteStream(tmpPath); |
|
await pipeline(Readable.fromWeb(res.body), fileStream); |
|
|
|
return tmpPath; |
|
} |
|
|
|
function transcribe(audioPath) { |
|
const tmpDir = mkdtempSync(join(tmpdir(), 'telegram-whisper-')); |
|
try { |
|
execFileSync(whisperPath, [audioPath, '--model', 'mlx-community/whisper-small-mlx', '--output-format', 'txt', '--output-dir', tmpDir, '--output-name', 'transcript', '--language', 'en'], { |
|
encoding: 'utf8', |
|
timeout: 120000, |
|
env: { ...process.env, PATH: `${process.env.PATH || ''}:/opt/homebrew/bin:/usr/local/bin` }, |
|
}); |
|
|
|
const txtPath = join(tmpDir, 'transcript.txt'); |
|
if (existsSync(txtPath)) { |
|
return readFileSync(txtPath, 'utf8').trim(); |
|
} |
|
return ''; |
|
} finally { |
|
rmSync(tmpDir, { recursive: true, force: true }); |
|
} |
|
} |
|
|
|
// --- File type helpers --- |
|
|
|
const IMAGE_EXTS = new Set(['.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg']); |
|
const TEXT_EXTS = new Set([ |
|
'.txt', '.md', '.csv', '.json', '.xml', '.yaml', '.yml', '.toml', '.html', '.css', '.js', '.ts', '.go', '.py', '.rs', '.java', '.sh', '.bash', '.zsh', '.log', '.ini', '.cfg', '.conf', |
|
]); |
|
|
|
function buildFilePrompt(path, filename, ext, caption) { |
|
if (IMAGE_EXTS.has(ext)) { |
|
return `I've attached an image at the path ${path}. Please look at it using the Read tool and then: ${caption}`; |
|
} |
|
if (TEXT_EXTS.has(ext) || ext === '.pdf') { |
|
return `I've attached a file at the path ${path}. Please read it using the Read tool and then: ${caption}`; |
|
} |
|
return `I've attached a file at the path ${path} (filename: ${filename}). Try to read it and then: ${caption}`; |
|
} |
|
|
|
// --- Message handlers --- |
|
|
|
async function handleChat(msg) { |
|
const userId = String(msg.from.id); |
|
const chatId = msg.chat.id; |
|
let text = msg.text; |
|
|
|
// Interrupt with ! prefix |
|
if (text.startsWith('!')) { |
|
text = text.slice(1).trim(); |
|
if (interruptUser(userId)) { |
|
for (let i = 0; i < 50 && activeUsers.has(userId); i++) { |
|
await new Promise((r) => setTimeout(r, 100)); |
|
} |
|
} |
|
if (!text) return; |
|
} |
|
|
|
if (!acquireUser(chatId, userId)) return; |
|
|
|
try { |
|
sendTyping(chatId); |
|
const sent = await sendMessage(chatId, 'Thinking...'); |
|
if (!sent.ok) return; |
|
|
|
await streamAgentResponse(chatId, sent.result.message_id, userId, text); |
|
} finally { |
|
releaseUser(userId); |
|
} |
|
} |
|
|
|
async function handleVoice(msg) { |
|
const userId = String(msg.from.id); |
|
const chatId = msg.chat.id; |
|
|
|
if (!whisperPath) { |
|
await sendMessage(chatId, 'Voice transcription not available. Send text instead.\n\n<i>Tip: Install mlx-whisper to enable voice support.</i>'); |
|
return; |
|
} |
|
|
|
if (!acquireUser(chatId, userId)) return; |
|
|
|
try { |
|
const sent = await sendMessage(chatId, '\ud83c\udfa4 Transcribing voice message...'); |
|
if (!sent.ok) return; |
|
const messageId = sent.result.message_id; |
|
|
|
const fileId = msg.voice?.file_id || msg.audio?.file_id; |
|
if (!fileId) { |
|
await editMessageText(chatId, messageId, 'Unsupported audio format.'); |
|
return; |
|
} |
|
|
|
let audioPath; |
|
try { |
|
audioPath = await downloadTelegramFile(fileId, 'voice'); |
|
} catch (err) { |
|
await editMessageText(chatId, messageId, `Failed to download voice: ${err.message}`); |
|
return; |
|
} |
|
|
|
let transcript; |
|
try { |
|
transcript = transcribe(audioPath); |
|
} catch (err) { |
|
await editMessageText(chatId, messageId, `Transcription failed: ${err.message}`); |
|
return; |
|
} finally { |
|
try { unlinkSync(audioPath); } catch {} |
|
} |
|
|
|
if (!transcript) { |
|
await editMessageText(chatId, messageId, 'Could not transcribe voice message. Please try again.'); |
|
return; |
|
} |
|
|
|
const prefix = `\ud83c\udfa4 "${transcript}"\n\n`; |
|
await editMessageText(chatId, messageId, prefix + 'Thinking...'); |
|
await streamAgentResponse(chatId, messageId, userId, transcript, prefix); |
|
} finally { |
|
releaseUser(userId); |
|
} |
|
} |
|
|
|
async function handlePhoto(msg) { |
|
const userId = String(msg.from.id); |
|
const chatId = msg.chat.id; |
|
|
|
if (!acquireUser(chatId, userId)) return; |
|
|
|
try { |
|
const sent = await sendMessage(chatId, '\ud83d\udcf7 Analyzing image...'); |
|
if (!sent.ok) return; |
|
const messageId = sent.result.message_id; |
|
|
|
const photos = msg.photo; |
|
if (!photos || photos.length === 0) { |
|
await editMessageText(chatId, messageId, 'No photo found.'); |
|
return; |
|
} |
|
|
|
const bestPhoto = photos[photos.length - 1]; |
|
let imgPath; |
|
try { |
|
imgPath = await downloadTelegramFile(bestPhoto.file_id, 'photo'); |
|
} catch (err) { |
|
await editMessageText(chatId, messageId, `Failed to download image: ${err.message}`); |
|
return; |
|
} |
|
|
|
const caption = msg.caption || 'Describe what you see in this image.'; |
|
const prompt = `I've attached an image at the path ${imgPath}. Please look at it using the Read tool and then: ${caption}`; |
|
|
|
try { |
|
await streamAgentResponse(chatId, messageId, userId, prompt); |
|
} finally { |
|
try { unlinkSync(imgPath); } catch {} |
|
} |
|
} finally { |
|
releaseUser(userId); |
|
} |
|
} |
|
|
|
async function handleDocument(msg) { |
|
const userId = String(msg.from.id); |
|
const chatId = msg.chat.id; |
|
|
|
if (!acquireUser(chatId, userId)) return; |
|
|
|
try { |
|
const doc = msg.document; |
|
if (!doc) return; |
|
|
|
if (doc.file_size > 10 * 1024 * 1024) { |
|
await sendMessage(chatId, 'File too large. Maximum size is 10MB.'); |
|
return; |
|
} |
|
|
|
const sent = await sendMessage(chatId, '\ud83d\udcc4 Reading document...'); |
|
if (!sent.ok) return; |
|
const messageId = sent.result.message_id; |
|
|
|
let docPath; |
|
try { |
|
docPath = await downloadTelegramFile(doc.file_id, 'doc'); |
|
} catch (err) { |
|
await editMessageText(chatId, messageId, `Failed to download file: ${err.message}`); |
|
return; |
|
} |
|
|
|
const caption = msg.caption || 'Analyze this file and summarize its contents.'; |
|
const ext = extname(doc.file_name || '').toLowerCase(); |
|
const prompt = buildFilePrompt(docPath, doc.file_name || 'file', ext, caption); |
|
|
|
try { |
|
await streamAgentResponse(chatId, messageId, userId, prompt); |
|
} finally { |
|
try { unlinkSync(docPath); } catch {} |
|
} |
|
} finally { |
|
releaseUser(userId); |
|
} |
|
} |
|
|
|
// --- Commands --- |
|
|
|
async function handleCommand(msg) { |
|
const text = msg.text || ''; |
|
const cmd = text.split(/\s/)[0].split('@')[0]; // strip @botname |
|
const args = text.slice(cmd.length).trim(); |
|
|
|
switch (cmd) { |
|
case '/help': |
|
await sendMessage(msg.chat.id, |
|
`<b>Telegram Bot</b> (${AGENT_TYPE})\n\n` + |
|
'Send any message to start a conversation.\n' + |
|
'Prefix with <code>!</code> to interrupt a running response.\n\n' + |
|
'<b>Commands:</b>\n' + |
|
'/new - Start new conversation\n' + |
|
'/resume (N) - List or switch to a previous session\n' + |
|
'/help - This message\n\n' + |
|
'<b>Media:</b>\n' + |
|
'Send photos, documents, or voice messages.\n\n' + |
|
'<i>(N) = optional</i>', |
|
); |
|
break; |
|
|
|
case '/start': |
|
await sendMessage(msg.chat.id, `Welcome! Send a message to chat with ${AGENT_TYPE}, or use /help for commands.`); |
|
break; |
|
|
|
case '/new': |
|
clearSession(String(msg.from.id)); |
|
await sendMessage(msg.chat.id, 'Started new session. Previous sessions preserved \u2014 use /resume to switch back.'); |
|
break; |
|
|
|
case '/resume': |
|
await handleResumeCommand(msg, args); |
|
break; |
|
|
|
default: |
|
await sendMessage(msg.chat.id, `Unknown command: ${cmd}\nUse /help for available commands.`); |
|
} |
|
} |
|
|
|
async function handleResumeCommand(msg, args) { |
|
const userId = String(msg.from.id); |
|
|
|
if (!args) { |
|
const sessions = getRecentSessions(userId, 5); |
|
if (sessions.length === 0) { |
|
await sendMessage(msg.chat.id, 'No previous sessions found.'); |
|
return; |
|
} |
|
|
|
let text = '<b>Recent Sessions:</b>\n\n'; |
|
sessions.forEach((s, i) => { |
|
const title = s.title || '(untitled)'; |
|
const active = s.active ? ' \u2705' : ''; |
|
const date = new Date(s.lastActive); |
|
const dateStr = date.toLocaleDateString('en-US', { month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit' }); |
|
text += `${i + 1}. ${htmlEscape(title)} \u2014 <i>${dateStr}</i>${active}\n`; |
|
}); |
|
text += '\nUse <code>/resume <number></code> to switch back.'; |
|
await sendMessage(msg.chat.id, text); |
|
return; |
|
} |
|
|
|
const n = parseInt(args, 10); |
|
if (isNaN(n) || n < 1) { |
|
await sendMessage(msg.chat.id, 'Usage: /resume <number> (from /resume list)'); |
|
return; |
|
} |
|
|
|
const sessions = getRecentSessions(userId, 5); |
|
if (n > sessions.length) { |
|
await sendMessage(msg.chat.id, 'Invalid session number. Use /resume to see the list.'); |
|
return; |
|
} |
|
|
|
if (activateSession(userId, n - 1)) { |
|
const title = sessions[n - 1].title || '(untitled)'; |
|
await sendMessage(msg.chat.id, `Resumed session: ${htmlEscape(title)}`); |
|
} else { |
|
await sendMessage(msg.chat.id, 'Failed to switch session.'); |
|
} |
|
} |
|
|
|
// --- Update handler --- |
|
|
|
async function handleUpdate(update) { |
|
const msg = update.message; |
|
if (!msg) return; |
|
|
|
const userId = String(msg.from?.id); |
|
if (!ALLOWED_USERS.includes(userId)) { |
|
log(`Unauthorized message from user ${userId}`); |
|
return; |
|
} |
|
|
|
try { |
|
if (msg.text && msg.text.startsWith('/')) { |
|
await handleCommand(msg); |
|
} else if (msg.text) { |
|
await handleChat(msg); |
|
} else if (msg.voice || msg.audio) { |
|
await handleVoice(msg); |
|
} else if (msg.photo) { |
|
await handlePhoto(msg); |
|
} else if (msg.document) { |
|
await handleDocument(msg); |
|
} |
|
} catch (err) { |
|
log(`Error handling update: ${err.message}`); |
|
} |
|
} |
|
|
|
// --- Polling loop --- |
|
|
|
async function fetchUpdates(offset, timeout) { |
|
return tg('getUpdates', { offset, timeout }); |
|
} |
|
|
|
async function poll(offset = 0) { |
|
// Drain stale updates that queued while the bot was down. |
|
try { |
|
const drain = await fetchUpdates(offset, 0); |
|
if (drain.ok && drain.result.length > 0) { |
|
const skipped = drain.result.length; |
|
offset = drain.result[drain.result.length - 1].update_id + 1; |
|
log(`Skipped ${skipped} stale update(s) from while bot was down`); |
|
} |
|
} catch (err) { |
|
log(`Drain error: ${err.message}`); |
|
} |
|
|
|
while (true) { |
|
try { |
|
const data = await fetchUpdates(offset, 30); |
|
if (!data.ok) { |
|
log(`getUpdates error: ${data.description}`); |
|
await new Promise((r) => setTimeout(r, 5000)); |
|
continue; |
|
} |
|
|
|
for (const update of data.result) { |
|
offset = update.update_id + 1; |
|
handleUpdate(update).catch((err) => log(`Update handler error: ${err.message}`)); |
|
} |
|
} catch (err) { |
|
log(`Poll error: ${err.message}`); |
|
await new Promise((r) => setTimeout(r, 5000)); |
|
} |
|
} |
|
} |
|
|
|
// --- Service management --- |
|
|
|
async function cmdTest() { |
|
const result = await tg('getMe', {}); |
|
if (result.ok) { |
|
console.log(`Connected: @${result.result.username} (${result.result.first_name})`); |
|
} else { |
|
console.error(`Connection failed: ${result.description}`); |
|
process.exit(1); |
|
} |
|
} |
|
|
|
// --- Main --- |
|
|
|
if (!BOT_TOKEN) { |
|
console.error('TELEGRAM_BOT_TOKEN not set. Create a .env file first.'); |
|
process.exit(1); |
|
} |
|
|
|
const cmd = process.argv[2]; |
|
|
|
if (cmd === '--test' || cmd === 'test') { |
|
await cmdTest(); |
|
} else { |
|
if (ALLOWED_USERS.length === 0) { |
|
console.error('ALLOWED_USERS not set. Add your Telegram user ID to .env'); |
|
process.exit(1); |
|
} |
|
|
|
const me = await tg('getMe', {}); |
|
if (me.ok) { |
|
log(`Authorized as @${me.result.username}`); |
|
} |
|
log(`Allowed users: ${ALLOWED_USERS.join(', ')}`); |
|
log(`Working directory: ${WORK_DIR}`); |
|
log(`Agent: ${AGENT_CMD} (${AGENT_TYPE})`); |
|
|
|
// Graceful shutdown — abort all active processes so restarts start clean |
|
process.on('SIGTERM', () => { |
|
log('SIGTERM received — cleaning up active processes...'); |
|
for (const [userId, controller] of activeUsers) { |
|
controller.abort(); |
|
releaseUser(userId); |
|
} |
|
process.exit(0); |
|
}); |
|
|
|
initWhisper(); |
|
|
|
log('Listening for messages...'); |
|
await poll(); |
|
} |