Skip to content

Instantly share code, notes, and snippets.

@mikehostetler
Created February 28, 2026 19:18
Show Gist options
  • Select an option

  • Save mikehostetler/8a93767412e6b5651c8c33b3164467bb to your computer and use it in GitHub Desktop.

Select an option

Save mikehostetler/8a93767412e6b5651c8c33b3164467bb to your computer and use it in GitHub Desktop.
Jido Telegram AI agent spike (polling ingress + agent loop)
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
exec elixir "$SCRIPT_DIR/telegram_ai_agent_spike.exs"
#!/usr/bin/env elixir
Mix.install([
{:jido_ai, path: Path.expand("../../jido_ai", __DIR__)},
{:jido_chat, path: Path.expand("../jido_chat", __DIR__)},
{:jido_chat_telegram, path: Path.expand("../jido_chat_telegram", __DIR__)},
{:req_llm, "~> 1.6"},
{:dotenvy, "~> 1.1"}
])
defmodule TelegramAiSpike.Actions.LocalTime do
@moduledoc false
use Jido.Action,
name: "local_time",
description: "Get current UTC time",
schema: Zoi.object(%{format: Zoi.string() |> Zoi.optional() |> Zoi.default("iso8601")})
@impl true
def run(_params, _context) do
{:ok, %{time: DateTime.utc_now() |> DateTime.truncate(:second) |> DateTime.to_iso8601()}}
end
end
defmodule TelegramAiSpike.Agent do
@moduledoc false
use Jido.AI.Agent,
name: "telegram_spike_agent",
description: "Simple Telegram chat assistant spike",
model: :fast,
max_iterations: 6,
tools: [TelegramAiSpike.Actions.LocalTime],
system_prompt: """
You are a concise Telegram assistant for a local development spike.
Keep answers practical and under 6 sentences.
If asked for time, use the local_time tool.
"""
end
defmodule TelegramAiSpike.Runtime do
@moduledoc false
use GenServer
require Logger
alias Jido.Chat.Telegram.Adapter
@type state :: %{
chat: Jido.Chat.t(),
token: String.t(),
allowed_chat_id: String.t() | nil
}
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: Keyword.get(opts, :name, __MODULE__))
end
@impl true
def init(opts) do
{:ok,
%{
chat: Keyword.fetch!(opts, :chat),
token: Keyword.fetch!(opts, :token),
allowed_chat_id: Keyword.get(opts, :allowed_chat_id)
}}
end
@impl true
def handle_call({:ingress_payload, payload, mode}, _from, state)
when is_map(payload) and mode in [:payload, :webhook] do
payload = normalize_payload(payload)
case allowed_chat?(state.allowed_chat_id, payload) do
true ->
case Adapter.handle_webhook(state.chat, payload, token: state.token) do
{:ok, updated_chat, _incoming} ->
{:reply, :ok, %{state | chat: updated_chat}}
{:error, :unsupported_update_type} ->
{:reply, :ok, state}
{:error, reason} ->
Logger.warning("[TelegramAiSpike] ingress error: #{inspect(reason)}")
{:reply, {:error, reason}, state}
end
false ->
{:reply, :ok, state}
end
end
def handle_call({:ingress_payload, _payload, _mode}, _from, state) do
{:reply, {:error, :invalid_payload}, state}
end
defp allowed_chat?(nil, _payload), do: true
defp allowed_chat?(allowed_chat_id, payload) when is_binary(allowed_chat_id) do
payload_chat_id =
get_in(payload, ["message", "chat", "id"]) ||
get_in(payload, [:message, :chat, :id]) ||
get_in(payload, ["channel_post", "chat", "id"]) ||
get_in(payload, [:channel_post, :chat, :id]) ||
get_in(payload, ["edited_message", "chat", "id"]) ||
get_in(payload, [:edited_message, :chat, :id]) ||
get_in(payload, ["edited_channel_post", "chat", "id"]) ||
get_in(payload, [:edited_channel_post, :chat, :id])
to_string(payload_chat_id || "") == allowed_chat_id
end
defp normalize_payload(%_{} = struct), do: struct |> Map.from_struct() |> normalize_payload()
defp normalize_payload(map) when is_map(map) do
Map.new(map, fn {key, value} -> {key, normalize_payload(value)} end)
end
defp normalize_payload(list) when is_list(list), do: Enum.map(list, &normalize_payload/1)
defp normalize_payload(other), do: other
end
defmodule TelegramAiSpike.Sink do
@moduledoc false
@spec emit(pid(), map(), keyword()) :: :ok | {:error, term()}
def emit(runtime_pid, payload, opts \\ []) when is_pid(runtime_pid) and is_map(payload) do
mode = Keyword.get(opts, :mode, :payload)
GenServer.call(runtime_pid, {:ingress_payload, payload, mode}, 180_000)
end
end
defmodule TelegramAiSpike do
@moduledoc false
require Logger
alias Jido.Chat.Telegram.Adapter
@default_env_file Path.expand("../jido_messaging/.env", __DIR__)
@model_candidates_by_key [
{"OPENAI_API_KEY", "openai:gpt-4o-mini"},
{"ANTHROPIC_API_KEY", "anthropic:claude-haiku-4-5"},
{"CEREBRAS_API_KEY", "cerebras:gpt-oss-120b"},
{"OPENROUTER_API_KEY", "openrouter:openai/gpt-4o-mini"},
{"GOOGLE_API_KEY", "google:gemini-2.5-flash"}
]
def run do
Logger.configure(level: :warning)
load_env!(@default_env_file)
configure_provider_keys_from_env()
token = env!("TELEGRAM_BOT_TOKEN")
allowed_chat_id = env_optional("TELEGRAM_CHAT_ID")
bot_user_id = fetch_bot_user_id(token)
selected_model = resolve_model!()
configure_model_aliases(selected_model)
Application.put_env(:jido_chat_telegram, :telegram_bot_token, token)
{:ok, _apps} = Application.ensure_all_started(:jido_ai)
{:ok, _jido_pid} = Jido.start()
{:ok, agent_pid} = Jido.start_agent(Jido.default_instance(), TelegramAiSpike.Agent)
chat = build_chat(agent_pid, token, bot_user_id)
{:ok, runtime_pid} =
TelegramAiSpike.Runtime.start_link(
chat: chat,
token: token,
allowed_chat_id: normalize_id(allowed_chat_id)
)
{:ok, listener_sup} = DynamicSupervisor.start_link(strategy: :one_for_one)
{:ok, specs} =
Adapter.listener_child_specs("telegram-spike",
ingress: %{
mode: "polling",
token: token,
timeout_s: 20,
poll_interval_ms: 500,
max_backoff_ms: 5_000
},
sink_mfa: {TelegramAiSpike.Sink, :emit, [runtime_pid]}
)
Enum.each(specs, fn spec ->
{:ok, _pid} = DynamicSupervisor.start_child(listener_sup, spec)
end)
maybe_send_startup_message(token, allowed_chat_id, selected_model)
IO.puts("")
IO.puts("Telegram AI spike is running.")
IO.puts("Model: #{selected_model}")
IO.puts("Allowed chat: #{allowed_chat_id || "<all>"}")
IO.puts("Press Ctrl+C to stop.")
Process.sleep(:infinity)
end
defp build_chat(agent_pid, token, bot_user_id) do
Jido.Chat.new(
user_name: "agentjido",
adapters: %{telegram: Adapter}
)
|> Jido.Chat.on_new_message(~r/.+/u, fn thread, incoming ->
handle_incoming_message(thread, incoming, agent_pid, token, bot_user_id)
end)
end
defp handle_incoming_message(thread, incoming, agent_pid, token, bot_user_id) do
text = String.trim(incoming.text || "")
cond do
text == "" ->
:ok
bot_message?(incoming, bot_user_id) ->
:ok
true ->
prompt = format_prompt(incoming, text)
llm_text =
case TelegramAiSpike.Agent.ask_sync(agent_pid, prompt, timeout: 90_000) do
{:ok, reply} -> normalize_reply(reply)
{:error, reason} -> "I hit an error: #{inspect(reason)}"
end
response_text = """
echo: #{text}
llm: #{truncate_text(llm_text, 900)}
"""
|> String.trim()
case Jido.Chat.Thread.post(thread, response_text, token: token) do
{:ok, _sent} -> :ok
{:error, reason} -> Logger.warning("[TelegramAiSpike] send failed: #{inspect(reason)}")
end
end
end
defp bot_message?(incoming, nil) do
incoming.author && incoming.author.is_bot
end
defp bot_message?(incoming, bot_user_id) when is_binary(bot_user_id) do
to_string(incoming.external_user_id || "") == bot_user_id or (incoming.author && incoming.author.is_bot)
end
defp format_prompt(incoming, text) do
user_label =
incoming.display_name || incoming.username || to_string(incoming.external_user_id || "unknown")
"""
Telegram user: #{user_label}
Message: #{text}
Reply naturally and concisely.
"""
|> String.trim()
end
defp normalize_reply(reply) when is_binary(reply) do
trimmed = String.trim(reply)
if trimmed == "", do: "I don't have a response yet.", else: trimmed
end
defp normalize_reply(reply), do: inspect(reply)
defp truncate_text(text, max_len) when is_binary(text) and is_integer(max_len) and max_len > 3 do
if String.length(text) <= max_len do
text
else
String.slice(text, 0, max_len - 3) <> "..."
end
end
defp maybe_send_startup_message(_token, nil, _model), do: :ok
defp maybe_send_startup_message(token, chat_id, model) do
_ =
Adapter.send_message(chat_id, "Telegram AI spike online (model=#{model}). Send me a message.",
token: token
)
:ok
end
defp resolve_model! do
case env_optional("JIDO_TELEGRAM_AGENT_MODEL") do
model when is_binary(model) ->
resolve_model_spec!(model)
_ ->
resolve_model_from_env_keys!()
end
end
defp resolve_model_from_env_keys! do
candidate_specs =
@model_candidates_by_key
|> Enum.filter(fn {env_key, _spec} -> env_present?(env_key) end)
|> Enum.map(&elem(&1, 1))
case candidate_specs do
[] ->
raise "No supported LLM API key found. Set OPENAI_API_KEY/ANTHROPIC_API_KEY/CEREBRAS_API_KEY/OPENROUTER_API_KEY/GOOGLE_API_KEY or JIDO_TELEGRAM_AGENT_MODEL."
_ ->
resolve_first_model_spec!(candidate_specs)
end
end
defp resolve_first_model_spec!(candidate_specs) do
candidate_specs
|> Enum.reduce_while([], fn spec, errors ->
case ReqLLM.model(spec) do
{:ok, model} ->
{:halt, canonical_model_spec(model)}
{:error, reason} ->
{:cont, [{spec, reason} | errors]}
end
end)
|> case do
resolved when is_binary(resolved) ->
resolved
errors ->
details =
errors
|> Enum.reverse()
|> Enum.map_join(", ", fn {spec, reason} ->
"#{spec}=#{inspect(reason)}"
end)
raise "Unable to resolve any model spec from env-backed candidates (#{details})"
end
end
defp resolve_model_spec!(model_spec) when is_binary(model_spec) do
case ReqLLM.model(model_spec) do
{:ok, model} ->
canonical_model_spec(model)
{:error, reason} ->
raise "Invalid JIDO_TELEGRAM_AGENT_MODEL #{inspect(model_spec)}: #{inspect(reason)}"
end
end
defp canonical_model_spec(model) do
provider = Map.get(model, :provider)
model_id = Map.get(model, :provider_model_id) || Map.get(model, :model) || Map.get(model, :id)
cond do
is_atom(provider) and is_binary(model_id) ->
"#{provider}:#{model_id}"
true ->
raise "Resolved model is missing provider/id fields: #{inspect(model)}"
end
end
defp configure_model_aliases(model) do
aliases = %{
fast: model,
capable: model,
thinking: model,
reasoning: model,
planning: model
}
existing = Application.get_env(:jido_ai, :model_aliases, %{})
Application.put_env(:jido_ai, :model_aliases, Map.merge(existing, aliases))
end
defp fetch_bot_user_id(token) do
case Req.get("https://api.telegram.org/bot#{token}/getMe", receive_timeout: 10_000) do
{:ok, %Req.Response{status: 200, body: %{"ok" => true, "result" => %{"id" => id}}}} ->
normalize_id(id)
_ ->
nil
end
end
defp normalize_id(nil), do: nil
defp normalize_id(value), do: to_string(value)
defp load_env!(path) do
if File.exists?(path) do
vars = Dotenvy.source!([path])
Enum.each(vars, fn {key, value} -> System.put_env(key, value) end)
end
end
defp configure_provider_keys_from_env do
maybe_set_req_llm_key(:openai_api_key, "OPENAI_API_KEY")
maybe_set_req_llm_key(:anthropic_api_key, "ANTHROPIC_API_KEY")
maybe_set_req_llm_key(:cerebras_api_key, "CEREBRAS_API_KEY")
maybe_set_req_llm_key(:openrouter_api_key, "OPENROUTER_API_KEY")
maybe_set_req_llm_key(:google_api_key, "GOOGLE_API_KEY")
end
defp maybe_set_req_llm_key(config_key, env_key) do
if value = env_optional(env_key) do
Application.put_env(:req_llm, config_key, value)
end
end
defp env!(key) do
case System.get_env(key) do
nil -> raise "Missing required env var #{key}"
value -> value
end
end
defp env_optional(key) do
System.get_env(key)
end
defp env_present?(key) do
value = env_optional(key)
is_binary(value) and String.trim(value) != ""
end
end
if System.get_env("TELEGRAM_AI_SPIKE_NO_RUN") == "1" do
:ok
else
TelegramAiSpike.run()
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment