Created
January 27, 2026 14:33
-
-
Save sssemil/41b0e57616c55c907240682306f23c44 to your computer and use it in GitHub Desktop.
Simple CLI for PersonaPlex - talk and listen via terminal
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Simple CLI for PersonaPlex - talk and listen via terminal. | |
| """ | |
| import argparse | |
| import os | |
| import sys | |
| import time | |
| import threading | |
| import queue | |
| from pathlib import Path | |
| import numpy as np | |
| import sounddevice as sd | |
| import torch | |
| from huggingface_hub import hf_hub_download | |
| # Add the moshi package to path | |
| sys.path.insert(0, str(Path(__file__).parent / "personaplex" / "moshi")) | |
| from moshi.models import loaders, MimiModel, LMModel, LMGen | |
| import sentencepiece | |
| def wrap_with_system_tags(text: str) -> str: | |
| """Add system tags as the model expects.""" | |
| cleaned = text.strip() | |
| if cleaned.startswith("<system>") and cleaned.endswith("<system>"): | |
| return cleaned | |
| return f"<system> {cleaned} <system>" | |
| def main(): | |
| parser = argparse.ArgumentParser(description="PersonaPlex CLI") | |
| parser.add_argument("--voice", type=str, default="NATF3.pt", help="Voice prompt file name") | |
| parser.add_argument("--prompt", type=str, default="", help="System text prompt") | |
| parser.add_argument("--hf-repo", type=str, default="nvidia/personaplex-7b-v1", help="HuggingFace repo") | |
| parser.add_argument("--device", type=str, default="cuda", help="Device (cuda/cpu)") | |
| parser.add_argument("--list-devices", action="store_true", help="List audio devices and exit") | |
| parser.add_argument("--input-device", type=int, default=None, help="Input device index") | |
| parser.add_argument("--output-device", type=int, default=None, help="Output device index") | |
| parser.add_argument("--debug", action="store_true", help="Show mic input levels") | |
| parser.add_argument("--echo-test", action="store_true", help="Echo test mode - hear your mic through speakers") | |
| parser.add_argument("--cpu-offload", action="store_true", help="Offload model layers to CPU when GPU memory is insufficient") | |
| args = parser.parse_args() | |
| if args.list_devices: | |
| print(sd.query_devices()) | |
| return | |
| # Echo test mode - simple mic to speaker passthrough | |
| if args.echo_test: | |
| print("Echo test mode - speak and you should hear yourself") | |
| print("Press Ctrl+C to stop\n") | |
| def echo_callback(indata, outdata, frames, time_info, status): | |
| if status: | |
| print(f"Status: {status}") | |
| outdata[:] = indata | |
| rms = np.sqrt(np.mean(indata ** 2)) | |
| bars = int(rms * 200) | |
| print(f"\r[MIC: {'|' * min(bars, 40):40s}] {rms:.4f}", end="", flush=True) | |
| try: | |
| # Use default sample rate for the device | |
| with sd.Stream( | |
| channels=1, | |
| callback=echo_callback, | |
| device=(args.input_device, args.output_device), | |
| ): | |
| print("Listening...") | |
| while True: | |
| time.sleep(0.1) | |
| except KeyboardInterrupt: | |
| print("\nDone.") | |
| return | |
| device = torch.device(args.device) | |
| print(f"Using device: {device}") | |
| # Download/load models | |
| print("Loading models...") | |
| mimi_path = hf_hub_download(args.hf_repo, loaders.MIMI_NAME) | |
| lm_path = hf_hub_download(args.hf_repo, loaders.MOSHI_NAME) | |
| tokenizer_path = hf_hub_download(args.hf_repo, loaders.TEXT_TOKENIZER_NAME) | |
| # Voice prompts are in the local cache, find them from the mimi path | |
| voice_prompt_dir = str(Path(mimi_path).parent / "voices") | |
| mimi = loaders.get_mimi(mimi_path, device) | |
| other_mimi = loaders.get_mimi(mimi_path, device) | |
| lm = loaders.get_moshi_lm(lm_path, device=device, cpu_offload=args.cpu_offload) | |
| text_tokenizer = sentencepiece.SentencePieceProcessor(tokenizer_path) | |
| sample_rate = mimi.sample_rate # 24000 | |
| frame_size = int(sample_rate / mimi.frame_rate) # 1920 samples per frame | |
| lm_gen = LMGen( | |
| lm, | |
| audio_silence_frame_cnt=int(0.5 * mimi.frame_rate), | |
| sample_rate=sample_rate, | |
| device=device, | |
| frame_rate=mimi.frame_rate, | |
| ) | |
| # Set up streaming | |
| mimi.streaming_forever(1) | |
| other_mimi.streaming_forever(1) | |
| lm_gen.streaming_forever(1) | |
| # Load voice prompt | |
| voice_prompt_path = os.path.join(voice_prompt_dir, args.voice) | |
| if not os.path.exists(voice_prompt_path): | |
| print(f"Voice prompt not found: {voice_prompt_path}") | |
| print(f"Available voices: {os.listdir(voice_prompt_dir)}") | |
| return | |
| print(f"Loading voice: {args.voice}") | |
| if voice_prompt_path.endswith('.pt'): | |
| lm_gen.load_voice_prompt_embeddings(voice_prompt_path) | |
| else: | |
| lm_gen.load_voice_prompt(voice_prompt_path) | |
| # Set text prompt | |
| if args.prompt: | |
| lm_gen.text_prompt_tokens = text_tokenizer.encode(wrap_with_system_tags(args.prompt)) | |
| print(f"Text prompt: {args.prompt[:100]}{'...' if len(args.prompt) > 100 else ''}") | |
| print(f"Text prompt tokens: {len(lm_gen.text_prompt_tokens)}") | |
| else: | |
| lm_gen.text_prompt_tokens = [] | |
| # Warmup | |
| print("Warming up...") | |
| for _ in range(4): | |
| chunk = torch.zeros(1, 1, frame_size, dtype=torch.float32, device=device) | |
| codes = mimi.encode(chunk) | |
| _ = other_mimi.encode(chunk) | |
| for c in range(codes.shape[-1]): | |
| tokens = lm_gen.step(codes[:, :, c: c + 1]) | |
| if tokens is None: | |
| continue | |
| _ = mimi.decode(tokens[:, 1:9]) | |
| _ = other_mimi.decode(tokens[:, 1:9]) | |
| if device.type == 'cuda': | |
| torch.cuda.synchronize() | |
| # Process system prompts with progress | |
| mimi.reset_streaming() | |
| other_mimi.reset_streaming() | |
| lm_gen.reset_streaming() | |
| # Voice prompt | |
| print("Loading voice prompt...", end="", flush=True) | |
| lm_gen._step_voice_prompt(mimi) | |
| print(" done") | |
| # Silence | |
| print("Loading silence...", end="", flush=True) | |
| lm_gen._step_audio_silence() | |
| print(" done") | |
| # Text prompt with progress | |
| num_tokens = len(lm_gen.text_prompt_tokens) if lm_gen.text_prompt_tokens else 0 | |
| if num_tokens > 0: | |
| print(f"Loading text prompt (0/{num_tokens} tokens)...", end="", flush=True) | |
| for i, _ in enumerate(lm_gen._step_text_prompt_core()): | |
| if (i + 1) % 50 == 0 or (i + 1) == num_tokens: | |
| print(f"\rLoading text prompt ({i + 1}/{num_tokens} tokens)...", end="", flush=True) | |
| print(" done") | |
| else: | |
| print("No text prompt") | |
| # Final silence | |
| print("Loading final silence...", end="", flush=True) | |
| lm_gen._step_audio_silence() | |
| print(" done") | |
| mimi.reset_streaming() | |
| print("\n" + "="*50) | |
| print("Ready! Speak into your microphone. Ctrl+C to stop.") | |
| print("="*50 + "\n") | |
| # Audio queues | |
| input_queue = queue.Queue() | |
| output_queue = queue.Queue() | |
| running = True | |
| def audio_callback(indata, outdata, frames, time_info, status): | |
| # Ignore underflow warnings - they're normal | |
| # Put input audio in queue | |
| input_queue.put(indata[:, 0].copy()) | |
| # Get output audio if available | |
| try: | |
| out = output_queue.get_nowait() | |
| outdata[:len(out), 0] = out | |
| outdata[len(out):, 0] = 0 | |
| except queue.Empty: | |
| outdata[:, 0] = 0 | |
| def process_audio(): | |
| nonlocal running | |
| input_buffer = np.array([], dtype=np.float32) | |
| current_text = "" | |
| with torch.no_grad(): | |
| while running: | |
| try: | |
| # Get input audio | |
| chunk = input_queue.get(timeout=0.1) | |
| input_buffer = np.concatenate([input_buffer, chunk]) | |
| # Process full frames | |
| while len(input_buffer) >= frame_size: | |
| frame = input_buffer[:frame_size] | |
| input_buffer = input_buffer[frame_size:] | |
| # Show when user is speaking | |
| rms = np.sqrt(np.mean(frame ** 2)) | |
| if rms > 0.01: # Voice activity threshold | |
| print(f" [YOU]", end="", flush=True) | |
| # Encode input | |
| frame_tensor = torch.from_numpy(frame).to(device=device)[None, None] | |
| codes = mimi.encode(frame_tensor) | |
| _ = other_mimi.encode(frame_tensor) | |
| # Generate output | |
| for c in range(codes.shape[-1]): | |
| tokens = lm_gen.step(codes[:, :, c: c + 1]) | |
| if tokens is None: | |
| continue | |
| # Decode audio | |
| main_pcm = mimi.decode(tokens[:, 1:9]) | |
| _ = other_mimi.decode(tokens[:, 1:9]) | |
| output_audio = main_pcm[0, 0].cpu().numpy() | |
| output_queue.put(output_audio) | |
| # Get text | |
| text_token = tokens[0, 0, 0].item() | |
| if text_token not in (0, 3): | |
| text = text_tokenizer.id_to_piece(text_token) | |
| text = text.replace("▁", " ") | |
| current_text += text | |
| # Print on word boundaries | |
| if " " in current_text or len(current_text) > 20: | |
| print(current_text, end="", flush=True) | |
| current_text = "" | |
| except queue.Empty: | |
| continue | |
| except Exception as e: | |
| print(f"\nError: {e}") | |
| running = False | |
| break | |
| # Start processing thread | |
| process_thread = threading.Thread(target=process_audio, daemon=True) | |
| process_thread.start() | |
| # Start audio stream | |
| try: | |
| with sd.Stream( | |
| samplerate=sample_rate, | |
| blocksize=frame_size, | |
| channels=1, | |
| dtype=np.float32, | |
| callback=audio_callback, | |
| device=(args.input_device, args.output_device), | |
| ): | |
| print("AI: ", end="", flush=True) | |
| while running: | |
| time.sleep(0.1) | |
| except KeyboardInterrupt: | |
| print("\n\nStopping...") | |
| running = False | |
| print("Done.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment