Skip to content

Instantly share code, notes, and snippets.

@sssemil
Created January 27, 2026 14:33
Show Gist options
  • Select an option

  • Save sssemil/41b0e57616c55c907240682306f23c44 to your computer and use it in GitHub Desktop.

Select an option

Save sssemil/41b0e57616c55c907240682306f23c44 to your computer and use it in GitHub Desktop.
Simple CLI for PersonaPlex - talk and listen via terminal
#!/usr/bin/env python3
"""
Simple CLI for PersonaPlex - talk and listen via terminal.
"""
import argparse
import os
import sys
import time
import threading
import queue
from pathlib import Path
import numpy as np
import sounddevice as sd
import torch
from huggingface_hub import hf_hub_download
# Add the moshi package to path
sys.path.insert(0, str(Path(__file__).parent / "personaplex" / "moshi"))
from moshi.models import loaders, MimiModel, LMModel, LMGen
import sentencepiece
def wrap_with_system_tags(text: str) -> str:
"""Add system tags as the model expects."""
cleaned = text.strip()
if cleaned.startswith("<system>") and cleaned.endswith("<system>"):
return cleaned
return f"<system> {cleaned} <system>"
def main():
parser = argparse.ArgumentParser(description="PersonaPlex CLI")
parser.add_argument("--voice", type=str, default="NATF3.pt", help="Voice prompt file name")
parser.add_argument("--prompt", type=str, default="", help="System text prompt")
parser.add_argument("--hf-repo", type=str, default="nvidia/personaplex-7b-v1", help="HuggingFace repo")
parser.add_argument("--device", type=str, default="cuda", help="Device (cuda/cpu)")
parser.add_argument("--list-devices", action="store_true", help="List audio devices and exit")
parser.add_argument("--input-device", type=int, default=None, help="Input device index")
parser.add_argument("--output-device", type=int, default=None, help="Output device index")
parser.add_argument("--debug", action="store_true", help="Show mic input levels")
parser.add_argument("--echo-test", action="store_true", help="Echo test mode - hear your mic through speakers")
parser.add_argument("--cpu-offload", action="store_true", help="Offload model layers to CPU when GPU memory is insufficient")
args = parser.parse_args()
if args.list_devices:
print(sd.query_devices())
return
# Echo test mode - simple mic to speaker passthrough
if args.echo_test:
print("Echo test mode - speak and you should hear yourself")
print("Press Ctrl+C to stop\n")
def echo_callback(indata, outdata, frames, time_info, status):
if status:
print(f"Status: {status}")
outdata[:] = indata
rms = np.sqrt(np.mean(indata ** 2))
bars = int(rms * 200)
print(f"\r[MIC: {'|' * min(bars, 40):40s}] {rms:.4f}", end="", flush=True)
try:
# Use default sample rate for the device
with sd.Stream(
channels=1,
callback=echo_callback,
device=(args.input_device, args.output_device),
):
print("Listening...")
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print("\nDone.")
return
device = torch.device(args.device)
print(f"Using device: {device}")
# Download/load models
print("Loading models...")
mimi_path = hf_hub_download(args.hf_repo, loaders.MIMI_NAME)
lm_path = hf_hub_download(args.hf_repo, loaders.MOSHI_NAME)
tokenizer_path = hf_hub_download(args.hf_repo, loaders.TEXT_TOKENIZER_NAME)
# Voice prompts are in the local cache, find them from the mimi path
voice_prompt_dir = str(Path(mimi_path).parent / "voices")
mimi = loaders.get_mimi(mimi_path, device)
other_mimi = loaders.get_mimi(mimi_path, device)
lm = loaders.get_moshi_lm(lm_path, device=device, cpu_offload=args.cpu_offload)
text_tokenizer = sentencepiece.SentencePieceProcessor(tokenizer_path)
sample_rate = mimi.sample_rate # 24000
frame_size = int(sample_rate / mimi.frame_rate) # 1920 samples per frame
lm_gen = LMGen(
lm,
audio_silence_frame_cnt=int(0.5 * mimi.frame_rate),
sample_rate=sample_rate,
device=device,
frame_rate=mimi.frame_rate,
)
# Set up streaming
mimi.streaming_forever(1)
other_mimi.streaming_forever(1)
lm_gen.streaming_forever(1)
# Load voice prompt
voice_prompt_path = os.path.join(voice_prompt_dir, args.voice)
if not os.path.exists(voice_prompt_path):
print(f"Voice prompt not found: {voice_prompt_path}")
print(f"Available voices: {os.listdir(voice_prompt_dir)}")
return
print(f"Loading voice: {args.voice}")
if voice_prompt_path.endswith('.pt'):
lm_gen.load_voice_prompt_embeddings(voice_prompt_path)
else:
lm_gen.load_voice_prompt(voice_prompt_path)
# Set text prompt
if args.prompt:
lm_gen.text_prompt_tokens = text_tokenizer.encode(wrap_with_system_tags(args.prompt))
print(f"Text prompt: {args.prompt[:100]}{'...' if len(args.prompt) > 100 else ''}")
print(f"Text prompt tokens: {len(lm_gen.text_prompt_tokens)}")
else:
lm_gen.text_prompt_tokens = []
# Warmup
print("Warming up...")
for _ in range(4):
chunk = torch.zeros(1, 1, frame_size, dtype=torch.float32, device=device)
codes = mimi.encode(chunk)
_ = other_mimi.encode(chunk)
for c in range(codes.shape[-1]):
tokens = lm_gen.step(codes[:, :, c: c + 1])
if tokens is None:
continue
_ = mimi.decode(tokens[:, 1:9])
_ = other_mimi.decode(tokens[:, 1:9])
if device.type == 'cuda':
torch.cuda.synchronize()
# Process system prompts with progress
mimi.reset_streaming()
other_mimi.reset_streaming()
lm_gen.reset_streaming()
# Voice prompt
print("Loading voice prompt...", end="", flush=True)
lm_gen._step_voice_prompt(mimi)
print(" done")
# Silence
print("Loading silence...", end="", flush=True)
lm_gen._step_audio_silence()
print(" done")
# Text prompt with progress
num_tokens = len(lm_gen.text_prompt_tokens) if lm_gen.text_prompt_tokens else 0
if num_tokens > 0:
print(f"Loading text prompt (0/{num_tokens} tokens)...", end="", flush=True)
for i, _ in enumerate(lm_gen._step_text_prompt_core()):
if (i + 1) % 50 == 0 or (i + 1) == num_tokens:
print(f"\rLoading text prompt ({i + 1}/{num_tokens} tokens)...", end="", flush=True)
print(" done")
else:
print("No text prompt")
# Final silence
print("Loading final silence...", end="", flush=True)
lm_gen._step_audio_silence()
print(" done")
mimi.reset_streaming()
print("\n" + "="*50)
print("Ready! Speak into your microphone. Ctrl+C to stop.")
print("="*50 + "\n")
# Audio queues
input_queue = queue.Queue()
output_queue = queue.Queue()
running = True
def audio_callback(indata, outdata, frames, time_info, status):
# Ignore underflow warnings - they're normal
# Put input audio in queue
input_queue.put(indata[:, 0].copy())
# Get output audio if available
try:
out = output_queue.get_nowait()
outdata[:len(out), 0] = out
outdata[len(out):, 0] = 0
except queue.Empty:
outdata[:, 0] = 0
def process_audio():
nonlocal running
input_buffer = np.array([], dtype=np.float32)
current_text = ""
with torch.no_grad():
while running:
try:
# Get input audio
chunk = input_queue.get(timeout=0.1)
input_buffer = np.concatenate([input_buffer, chunk])
# Process full frames
while len(input_buffer) >= frame_size:
frame = input_buffer[:frame_size]
input_buffer = input_buffer[frame_size:]
# Show when user is speaking
rms = np.sqrt(np.mean(frame ** 2))
if rms > 0.01: # Voice activity threshold
print(f" [YOU]", end="", flush=True)
# Encode input
frame_tensor = torch.from_numpy(frame).to(device=device)[None, None]
codes = mimi.encode(frame_tensor)
_ = other_mimi.encode(frame_tensor)
# Generate output
for c in range(codes.shape[-1]):
tokens = lm_gen.step(codes[:, :, c: c + 1])
if tokens is None:
continue
# Decode audio
main_pcm = mimi.decode(tokens[:, 1:9])
_ = other_mimi.decode(tokens[:, 1:9])
output_audio = main_pcm[0, 0].cpu().numpy()
output_queue.put(output_audio)
# Get text
text_token = tokens[0, 0, 0].item()
if text_token not in (0, 3):
text = text_tokenizer.id_to_piece(text_token)
text = text.replace("▁", " ")
current_text += text
# Print on word boundaries
if " " in current_text or len(current_text) > 20:
print(current_text, end="", flush=True)
current_text = ""
except queue.Empty:
continue
except Exception as e:
print(f"\nError: {e}")
running = False
break
# Start processing thread
process_thread = threading.Thread(target=process_audio, daemon=True)
process_thread.start()
# Start audio stream
try:
with sd.Stream(
samplerate=sample_rate,
blocksize=frame_size,
channels=1,
dtype=np.float32,
callback=audio_callback,
device=(args.input_device, args.output_device),
):
print("AI: ", end="", flush=True)
while running:
time.sleep(0.1)
except KeyboardInterrupt:
print("\n\nStopping...")
running = False
print("Done.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment