Skip to content

Instantly share code, notes, and snippets.

@rsp2k
Last active December 10, 2025 07:08
Show Gist options
  • Select an option

  • Save rsp2k/647a58280f8c478ab23fae6d571c1780 to your computer and use it in GitHub Desktop.

Select an option

Save rsp2k/647a58280f8c478ab23fae6d571c1780 to your computer and use it in GitHub Desktop.
FlexTel AI Service for RunPod
#!/bin/bash
set -e
echo "=========================================="
echo "FlexTel AI Service - Bootstrap Starting"
echo "=========================================="
# Install system deps
apt-get update && apt-get install -y ffmpeg curl git
# Install Python deps
pip install --upgrade pip
pip install fastapi uvicorn httpx pydantic openai-whisper soundfile edge-tts pydub
# Install and start Ollama
echo "Installing Ollama..."
curl -fsSL https://ollama.com/install.sh | sh
echo "Starting Ollama server..."
ollama serve &
sleep 10
# Pull models
echo "Pulling LLM model..."
ollama pull llama3.1:8b-instruct-q4_K_M || echo "LLM pull failed, continuing..."
echo "Pulling embeddings model..."
ollama pull nomic-embed-text || echo "Embeddings pull failed, continuing..."
# Download app
echo "Downloading app..."
mkdir -p /app
curl -sSL "https://gist.githubusercontent.com/rsp2k/647a58280f8c478ab23fae6d571c1780/raw/main.py" -o /app/main.py
curl -sSL "https://gist.githubusercontent.com/rsp2k/647a58280f8c478ab23fae6d571c1780/raw/requirements.txt" -o /app/requirements.txt
pip install -r /app/requirements.txt || true
# Start service
echo "=========================================="
echo "Starting FlexTel AI Service on port 8000"
echo "=========================================="
cd /app
exec python main.py
"""
FlexTel PTT AI Service - RunPod GPU Deployment
Unified AI service providing:
- STT: Whisper large-v3
- LLM: Ollama (primary) with Claude/OpenAI fallback
- TTS: StyleTTS 2 (primary, GPU) or edge-tts (fallback, cloud)
- Embeddings: nomic-embed via Ollama
Designed for RTX 4090 (~15GB VRAM total)
"""
import os
import io
import tempfile
import asyncio
import subprocess
from typing import Optional, List
from contextlib import asynccontextmanager
from fastapi import FastAPI, UploadFile, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, model_validator
import httpx
import soundfile as sf
# =============================================================================
# Configuration
# =============================================================================
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
DEFAULT_LLM_PROVIDER = os.environ.get("LLM_DEFAULT_PROVIDER", "ollama")
DEFAULT_LLM_MODEL = os.environ.get("LLM_DEFAULT_MODEL", "llama3.1:8b-instruct-q4_K_M")
WHISPER_MODEL = os.environ.get("WHISPER_MODEL", "large-v3")
SYSTEM_PROMPT = os.environ.get("SYSTEM_PROMPT", """You are a participant in a push-to-talk radio communication channel.
Keep responses concise and conversational - like radio chatter.
Only respond when directly addressed.
Acknowledge with brief confirmations when appropriate.""")
WAKE_WORDS = ["hey assistant", "assistant", "hey ai", "ai"]
# =============================================================================
# Pydantic Models
# =============================================================================
class ChatMessage(BaseModel):
role: str # 'user', 'assistant', 'system'
content: str
class ChatRequest(BaseModel):
messages: List[ChatMessage]
provider: Optional[str] = None # 'ollama', 'claude', 'openai'
model: Optional[str] = None
system_prompt: Optional[str] = None
max_tokens: int = 300
temperature: float = 0.7
class ChatResponse(BaseModel):
text: str
provider: str
model: str
tokens_used: Optional[int] = None
class EmbedRequest(BaseModel):
texts: Optional[List[str]] = None # List of texts to embed
text: Optional[str] = None # Single text convenience field
model: str = "nomic-embed-text"
@model_validator(mode="after")
def check_texts_provided(self):
"""Ensure at least one text input is provided."""
if not self.texts and not self.text:
raise ValueError("Either 'texts' (list) or 'text' (string) must be provided")
# Convert single text to list for uniform processing
if self.text and not self.texts:
self.texts = [self.text]
return self
class EmbedResponse(BaseModel):
embeddings: List[List[float]]
model: str
class PTTRequest(BaseModel):
context: Optional[str] = None
provider: Optional[str] = None
model: Optional[str] = None
system_prompt: Optional[str] = None
class TranscriptionResponse(BaseModel):
text: str
language: str
segments: Optional[list] = None
class PTTResponse(BaseModel):
transcription: str
is_addressed: bool
response_text: Optional[str] = None
response_audio_url: Optional[str] = None
class HealthResponse(BaseModel):
status: str
whisper_loaded: bool
ollama_available: bool
tts_loaded: bool
gpu_available: bool
vram_used_gb: Optional[float] = None
# =============================================================================
# Global Model Holders
# =============================================================================
whisper_model = None
tts_model = None
http_client: httpx.AsyncClient = None
# =============================================================================
# Startup / Shutdown
# =============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Load models on startup, cleanup on shutdown."""
global whisper_model, tts_model, http_client
print("=" * 60)
print("FlexTel PTT AI Service - Starting Up")
print("=" * 60)
# HTTP client for Ollama and external APIs
http_client = httpx.AsyncClient(timeout=60.0)
# Load Whisper
print(f"Loading Whisper model ({WHISPER_MODEL})...")
try:
import whisper
whisper_model = whisper.load_model(WHISPER_MODEL, device="cuda")
print(f" Whisper {WHISPER_MODEL} loaded on CUDA")
except Exception as e:
print(f" Warning: Whisper failed to load: {e}")
# Load TTS (StyleTTS 2 primary, edge-tts fallback)
print("Loading TTS model...")
try:
from styletts2 import tts as styletts2_module
tts_model = {"type": "styletts2", "model": styletts2_module.StyleTTS2()}
print(" StyleTTS 2 loaded (GPU-accelerated, high quality)")
except Exception as e:
print(f" StyleTTS 2 not available: {e}")
print(" Using edge-tts as fallback (cloud-based, always works)")
tts_model = {"type": "edge-tts", "model": None} # edge-tts is async, no model to load
# Check Ollama
print("Checking Ollama availability...")
try:
resp = await http_client.get(f"{OLLAMA_URL}/api/tags")
if resp.status_code == 200:
models = resp.json().get("models", [])
print(f" Ollama available with {len(models)} models")
for m in models[:5]:
print(f" - {m.get('name')}")
else:
print(f" Warning: Ollama returned status {resp.status_code}")
except Exception as e:
print(f" Warning: Ollama not available: {e}")
print("=" * 60)
print("Startup complete!")
print("=" * 60)
yield
# Cleanup
print("Shutting down...")
await http_client.aclose()
app = FastAPI(
title="FlexTel PTT AI Service",
description="Unified AI service for push-to-talk communications",
version="1.0.0",
lifespan=lifespan,
)
# =============================================================================
# Health Check
# =============================================================================
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Check service health and component status."""
import torch
# Check Ollama
ollama_ok = False
try:
resp = await http_client.get(f"{OLLAMA_URL}/api/tags", timeout=5.0)
ollama_ok = resp.status_code == 200
except Exception:
pass
# Check GPU
gpu_available = torch.cuda.is_available()
vram_used = None
if gpu_available:
vram_used = round(torch.cuda.memory_allocated() / 1024**3, 2)
tts_available = tts_model is not None and (
tts_model.get("type") == "edge-tts" or tts_model.get("model") is not None
)
return HealthResponse(
status="healthy" if whisper_model and ollama_ok else "degraded",
whisper_loaded=whisper_model is not None,
ollama_available=ollama_ok,
tts_loaded=tts_available,
gpu_available=gpu_available,
vram_used_gb=vram_used,
)
# =============================================================================
# Speech-to-Text (Whisper)
# =============================================================================
@app.post("/transcribe", response_model=TranscriptionResponse)
async def transcribe_audio(file: UploadFile):
"""Transcribe uploaded audio file using Whisper."""
if not whisper_model:
raise HTTPException(503, "Whisper model not loaded")
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
try:
result = whisper_model.transcribe(tmp_path)
return TranscriptionResponse(
text=result["text"].strip(),
language=result["language"],
segments=result.get("segments"),
)
finally:
os.unlink(tmp_path)
# =============================================================================
# LLM Chat (Ollama primary, Claude/OpenAI fallback)
# =============================================================================
@app.post("/chat", response_model=ChatResponse)
async def chat_completion(request: ChatRequest):
"""
Generate chat completion using configured LLM provider.
Provider priority:
1. Explicitly requested provider
2. Default provider (LLM_DEFAULT_PROVIDER env var)
3. Ollama (always available on RunPod)
"""
provider = request.provider or DEFAULT_LLM_PROVIDER
model = request.model or DEFAULT_LLM_MODEL
system_prompt = request.system_prompt or SYSTEM_PROMPT
if provider == "ollama":
return await _chat_ollama(request, model, system_prompt)
elif provider == "claude":
if not ANTHROPIC_API_KEY:
raise HTTPException(400, "Claude API key not configured")
return await _chat_claude(request, model, system_prompt)
elif provider == "openai":
if not OPENAI_API_KEY:
raise HTTPException(400, "OpenAI API key not configured")
return await _chat_openai(request, model, system_prompt)
else:
raise HTTPException(400, f"Unknown provider: {provider}")
async def _chat_ollama(request: ChatRequest, model: str, system_prompt: str) -> ChatResponse:
"""Chat via local Ollama instance."""
messages = []
# Add system prompt
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
# Add conversation messages
for msg in request.messages:
messages.append({"role": msg.role, "content": msg.content})
try:
resp = await http_client.post(
f"{OLLAMA_URL}/api/chat",
json={
"model": model,
"messages": messages,
"stream": False,
"options": {
"temperature": request.temperature,
"num_predict": request.max_tokens,
},
},
)
resp.raise_for_status()
data = resp.json()
return ChatResponse(
text=data["message"]["content"],
provider="ollama",
model=model,
tokens_used=data.get("eval_count"),
)
except httpx.HTTPError as e:
raise HTTPException(503, f"Ollama error: {e}")
async def _chat_claude(request: ChatRequest, model: str, system_prompt: str) -> ChatResponse:
"""Chat via Anthropic Claude API."""
# Default to claude-sonnet if no model specified
if model.startswith("llama") or model.startswith("mistral"):
model = "claude-sonnet-4-20250514"
messages = [{"role": msg.role, "content": msg.content} for msg in request.messages]
try:
resp = await http_client.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": model,
"max_tokens": request.max_tokens,
"system": system_prompt,
"messages": messages,
},
)
resp.raise_for_status()
data = resp.json()
return ChatResponse(
text=data["content"][0]["text"],
provider="claude",
model=model,
tokens_used=data.get("usage", {}).get("output_tokens"),
)
except httpx.HTTPError as e:
raise HTTPException(503, f"Claude API error: {e}")
async def _chat_openai(request: ChatRequest, model: str, system_prompt: str) -> ChatResponse:
"""Chat via OpenAI API."""
# Default to gpt-4 if no model specified
if model.startswith("llama") or model.startswith("mistral"):
model = "gpt-4-turbo-preview"
messages = [{"role": "system", "content": system_prompt}]
messages.extend([{"role": msg.role, "content": msg.content} for msg in request.messages])
try:
resp = await http_client.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": model,
"max_tokens": request.max_tokens,
"temperature": request.temperature,
"messages": messages,
},
)
resp.raise_for_status()
data = resp.json()
return ChatResponse(
text=data["choices"][0]["message"]["content"],
provider="openai",
model=model,
tokens_used=data.get("usage", {}).get("completion_tokens"),
)
except httpx.HTTPError as e:
raise HTTPException(503, f"OpenAI API error: {e}")
# =============================================================================
# Embeddings (Ollama nomic-embed)
# =============================================================================
@app.post("/embed", response_model=EmbedResponse)
async def generate_embeddings(request: EmbedRequest):
"""Generate embeddings for texts using Ollama."""
embeddings = []
for text in request.texts:
try:
resp = await http_client.post(
f"{OLLAMA_URL}/api/embeddings",
json={
"model": request.model,
"prompt": text,
},
)
resp.raise_for_status()
data = resp.json()
embeddings.append(data["embedding"])
except httpx.HTTPError as e:
raise HTTPException(503, f"Ollama embeddings error: {e}")
return EmbedResponse(embeddings=embeddings, model=request.model)
# =============================================================================
# Text-to-Speech (StyleTTS 2 / edge-tts)
# =============================================================================
# Default voice for edge-tts (Microsoft Neural voices)
EDGE_TTS_VOICE = os.environ.get("EDGE_TTS_VOICE", "en-US-GuyNeural") # Natural male voice
@app.post("/synthesize")
async def synthesize_speech(text: str, voice_id: Optional[str] = None):
"""Generate speech from text using StyleTTS 2 (primary) or edge-tts (fallback)."""
audio_bytes = await _generate_speech(text, voice_id)
return StreamingResponse(
io.BytesIO(audio_bytes),
media_type="audio/wav",
headers={"Content-Disposition": "attachment; filename=speech.wav"},
)
async def _generate_speech(text: str, voice_id: Optional[str] = None) -> bytes:
"""Internal TTS generation function supporting multiple engines."""
if not tts_model:
raise HTTPException(503, "TTS not configured")
tts_type = tts_model.get("type")
try:
if tts_type == "styletts2":
# StyleTTS 2 - GPU-accelerated, high quality
model = tts_model["model"]
audio_array = model.inference(text)
# Convert to WAV bytes (StyleTTS 2 outputs at 24kHz)
buffer = io.BytesIO()
sf.write(buffer, audio_array, samplerate=24000, format='WAV')
buffer.seek(0)
return buffer.read()
elif tts_type == "edge-tts":
# edge-tts - Microsoft Edge cloud TTS (async native)
import edge_tts
voice = voice_id or EDGE_TTS_VOICE
communicate = edge_tts.Communicate(text, voice)
# edge-tts outputs MP3, we need to convert or return MP3
# For simplicity, return MP3 and let client handle it
# Or use pydub to convert to WAV
audio_buffer = io.BytesIO()
async for chunk in communicate.stream():
if chunk["type"] == "audio":
audio_buffer.write(chunk["data"])
audio_buffer.seek(0)
# Convert MP3 to WAV using pydub
from pydub import AudioSegment
audio = AudioSegment.from_mp3(audio_buffer)
wav_buffer = io.BytesIO()
audio.export(wav_buffer, format="wav")
wav_buffer.seek(0)
return wav_buffer.read()
else:
raise HTTPException(503, f"Unknown TTS type: {tts_type}")
except Exception as e:
raise HTTPException(503, f"TTS generation failed: {e}")
@app.get("/voices")
async def list_voices():
"""List available TTS voices."""
if not tts_model:
return {"voices": [], "engine": None}
tts_type = tts_model.get("type")
if tts_type == "edge-tts":
import edge_tts
voices = await edge_tts.list_voices()
# Return a selection of English voices
english_voices = [v for v in voices if v["Locale"].startswith("en-")]
return {
"engine": "edge-tts",
"default": EDGE_TTS_VOICE,
"voices": [
{"id": v["ShortName"], "name": v.get("FriendlyName", v.get("Name", v["ShortName"])), "gender": v["Gender"]}
for v in english_voices[:20] # Limit to 20
],
}
elif tts_type == "styletts2":
return {
"engine": "styletts2",
"default": "default",
"voices": [{"id": "default", "name": "StyleTTS 2 Default", "gender": "neutral"}],
}
return {"voices": [], "engine": tts_type}
# =============================================================================
# Full PTT Pipeline (STT → LLM → TTS)
# =============================================================================
# Temporary audio storage for responses
_audio_cache = {}
@app.post("/process_ptt", response_model=PTTResponse)
async def process_ptt_message(
file: UploadFile,
context: Optional[str] = None,
provider: Optional[str] = None,
model: Optional[str] = None,
system_prompt: Optional[str] = None,
background_tasks: BackgroundTasks = None,
):
"""
Full PTT pipeline:
1. Transcribe audio (Whisper)
2. Check if AI is addressed (wake word detection)
3. Generate LLM response if addressed
4. Synthesize response audio (TTS)
"""
# Step 1: Transcribe
transcription = await transcribe_audio(file)
text = transcription.text.lower().strip()
# Step 2: Check if addressed
is_addressed = any(wake in text for wake in WAKE_WORDS)
response = PTTResponse(
transcription=transcription.text,
is_addressed=is_addressed,
)
if not is_addressed:
return response
# Step 3: Build context and generate LLM response
messages = []
if context:
messages.append(ChatMessage(role="user", content=f"Recent context: {context}"))
messages.append(ChatMessage(role="user", content=transcription.text))
chat_request = ChatRequest(
messages=messages,
provider=provider,
model=model,
system_prompt=system_prompt,
max_tokens=300,
)
chat_response = await chat_completion(chat_request)
response.response_text = chat_response.text
# Step 4: Generate TTS audio
if tts_model and tts_model.get("type"):
try:
audio_bytes = await _generate_speech(chat_response.text)
import time
audio_id = f"response_{int(time.time() * 1000)}"
_audio_cache[audio_id] = audio_bytes
response.response_audio_url = f"/audio/{audio_id}"
# Clean up old audio after 5 minutes
if background_tasks:
background_tasks.add_task(_cleanup_audio, audio_id, 300)
except Exception as e:
print(f"TTS generation failed: {e}")
return response
@app.get("/audio/{audio_id}")
async def get_audio(audio_id: str):
"""Retrieve generated audio file."""
if audio_id not in _audio_cache:
raise HTTPException(404, "Audio not found")
return StreamingResponse(
io.BytesIO(_audio_cache[audio_id]),
media_type="audio/wav",
)
async def _cleanup_audio(audio_id: str, delay: int):
"""Remove cached audio after delay."""
await asyncio.sleep(delay)
_audio_cache.pop(audio_id, None)
# =============================================================================
# Model Management
# =============================================================================
@app.get("/models")
async def list_models():
"""List available Ollama models."""
try:
resp = await http_client.get(f"{OLLAMA_URL}/api/tags")
resp.raise_for_status()
return resp.json()
except httpx.HTTPError as e:
raise HTTPException(503, f"Ollama error: {e}")
@app.post("/models/pull")
async def pull_model(model: str):
"""Pull a model from Ollama registry."""
try:
resp = await http_client.post(
f"{OLLAMA_URL}/api/pull",
json={"name": model},
timeout=600.0, # 10 minute timeout for large models
)
resp.raise_for_status()
return {"status": "success", "model": model}
except httpx.HTTPError as e:
raise HTTPException(503, f"Failed to pull model: {e}")
# =============================================================================
# Entry Point
# =============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
# FlexTel PTT AI Service - RunPod GPU Dependencies
# Designed for CUDA 12.x / RTX 4090
# Core Framework
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
python-multipart>=0.0.6
httpx>=0.26.0
pydantic>=2.5.0
# PyTorch (CUDA 12.1)
torch>=2.1.0
torchaudio>=2.1.0
# Speech-to-Text
openai-whisper>=20231117
# Or use faster-whisper for lower VRAM:
# faster-whisper>=0.10.0
# Text-to-Speech options:
# StyleTTS 2 - High quality, GPU-accelerated, human-level synthesis
styletts2==0.1.6
# edge-tts - Fast fallback, uses Microsoft Edge cloud API
edge-tts>=7.2.0
# Audio Processing
soundfile>=0.12.1
numpy>=1.24.0
pydub>=0.25.1
# Optional: External LLM APIs (fallback)
anthropic>=0.18.0
openai>=1.10.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment