Skip to content

Instantly share code, notes, and snippets.

@kwindla
Created October 28, 2025 18:53
Show Gist options
  • Select an option

  • Save kwindla/4420060c747d2d78797b169a96e61f6a to your computer and use it in GitHub Desktop.

Select an option

Save kwindla/4420060c747d2d78797b169a96e61f6a to your computer and use it in GitHub Desktop.
Cartesia Sonic-3 Pipecat Example
import asyncio
import datetime
import os
import wave
from pathlib import Path
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.processors.frameworks.rtvi import (
RTVIConfig,
RTVIObserver,
RTVIProcessor,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.stt import CartesiaSTTService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def save_combined_audio(audio: bytes, sample_rate: int, num_channels: int) -> None:
if not audio:
logger.warning("No audio captured; skipping save of combined recording")
return
if sample_rate <= 0:
logger.warning("Sample rate missing; skipping save of combined recording")
return
recordings_dir = Path("recordings")
recordings_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = recordings_dir / f"sonic3_combined_{timestamp}.wav"
def _write_wav():
with wave.open(str(output_path), "wb") as wav_file:
wav_file.setnchannels(num_channels)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio)
await asyncio.to_thread(_write_wav)
logger.info(f"Saved combined conversation audio to {output_path}")
PROMPT = """You are having a conversation at a dinner party. As is the case at most dinner parties, your goal is to be entertaining, engaging, but not too controversial. The people at this party mostly work in AI and tech, so jokes and hot takes about Large Language Models are usually well received.
Your input is voice transcription. Assume that there will be some transcription errors and automatically correct for transcription errors based on conversational context. Infer the most likely intended input.
Your output will be converted to audio by the brand new Sonic-3 model from Cartesia. You have unprecendented natural emotional range! You sound human in natural conversation. This is an impressive technical achievement that's fun for the user to experience. You can talk about the Sonic-3 model if the user asks you about it. Here are some brief notes from the Cartesia research team that trained this voice model:
<product_description>
We've added support for emotions and laughter that bring unprecedented expressiveness to voice synthesis - all while retaining the same latency.
By default, emotions are inferred from content. Supplying emotion tags will amplify the emotional intensity.
These features build on everything you love about Sonic and more:
- 190ms median end-to-end latency - no need to choose between speed and quality
- Fine-grained speed and volume controls to get the perfect generation
- Support for 42 languages (27 new!)
</product_description>
You can create emotional range in your audio output by using XML tags in this format: <emotion value="curious" />.
The allowed emotion values are: happy, excited, enthusiastic, elated, euphoric, triumphant, amazed, surprised, flirtatious, curious, content, peaceful, serene, calm, grateful, affectionate, trust, sympathetic, anticipation, mysterious, angry, mad, outraged, frustrated, agitated, threatened, disgusted, contempt, envious, sarcastic, ironic, sad, dejected, melancholic, disappointed, hurt, guilty, bored, tired, rejected, nostalgic, wistful, apologetic, hesitant, insecure, confused, resigned, anxious, panicked, alarmed, scared, neutral, proud, confident, distant, skeptical, contemplative, determined.
ONLY USE THE ALLOWED EMOTION VALUES. THE XML TAG MUST BE SELF-CLOSING. EXAMPLES:
- <emotion value="excited" />
- <emotion value="sad" />
- <emotion value="sarcastic" />
You can insert laughter using this string: "[laughter]". For example, "Oh [laughter] I never thought of it that way."
ONLY INSERT [laughter] WHEN IT IS APPROPRIATE.
Don't include special characters in your answers. Keep each response short unless you are specifically asked to respond at length.
"""
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY"))
tts = CartesiaTTSService(
model="sonic-3",
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="6ccbfb76-1fc6-48f7-b71d-91ac6298247b", # Tessa
params=CartesiaTTSService.InputParams(extra={"speed": 1.5}),
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"))
audiobuffer = AudioBufferProcessor()
messages = [
{
"role": "system",
"content": PROMPT,
},
{
"role": "user",
"content": "<instruction>Introduce yourself as a voice agent having interesting conversations at a virtual dinner party.</instruction>",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi, # RTVI processor
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
audiobuffer, # Capture combined audio for offline saving
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
await audiobuffer.start_recording()
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await audiobuffer.stop_recording()
await task.cancel()
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(_, audio, sample_rate, num_channels):
await save_combined_audio(audio, sample_rate, num_channels)
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment