Last active
August 18, 2025 14:47
-
-
Save moha-abdi/8ddbcb206c38f592c65ada1e5479f2bf to your computer and use it in GitHub Desktop.
Subclass for edge_tts.Communicate but it generates the TTS with pauses as the custom SSML is not working anymore.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| from typing import Union, Optional | |
| from pydub import AudioSegment | |
| import io | |
| from edge_tts import Communicate | |
| class NoPausesFound(Exception): | |
| def __init__(self, description = None) -> None: | |
| self.description = (f'No pauses were found in the text. Please ' | |
| + f'consider using `edge_tts.Communicate` instead.') | |
| super().__init__(self.description) | |
| class CommWithPauses(Communicate): | |
| """ | |
| This class uses edge_tts to generate text | |
| but with pauses for example:- text: 'Hello | |
| this is simple text. [pause: 2s] Paused 2s' | |
| """ | |
| def __init__( | |
| self, | |
| text: str, | |
| voice: str = "Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)", | |
| max_pause: int = 6, # maximum pause time in seconds. | |
| **kwargs | |
| ) -> None: | |
| super().__init__(text, voice, **kwargs) | |
| self.max_pause = max_pause * 1000 | |
| self.parsed = self.parse_text() | |
| self.file = io.BytesIO() | |
| def parse_text(self): | |
| if not "[pause:" in self.text: | |
| raise NoPausesFound | |
| parts = self.text.split("[pause:") | |
| for part in parts: | |
| if "]" in part: | |
| pause_time, content = part.split("]", 1) | |
| pause_time = self.parse_time(pause_time) | |
| yield pause_time, content.strip() | |
| else: | |
| content = part | |
| yield 0, content.strip() | |
| def parse_time(self, time_str: str) -> int: | |
| if time_str[-2:] == 'ms': | |
| unit = 'ms' | |
| time_value = int(time_str[:-2]) | |
| return min(time_value, self.max_pause) | |
| elif time_str[-1] == 's': | |
| unit = 's' | |
| time_value = int(time_str[:-1]) * 1000 | |
| return min(time_value, self.max_pause) | |
| else: | |
| raise ValueError(f"Invalid time unit! only m/ms are are allowed") | |
| async def chunkify(self): | |
| for pause_time, content in self.parsed: | |
| if not pause_time and not content: | |
| pass | |
| elif not pause_time and content: | |
| audio_bytes = await self.generate_audio(content) | |
| self.file.write(audio_bytes) | |
| elif not content and pause_time: | |
| pause_bytes = self.generate_pause(pause_time) | |
| self.file.write(pause_bytes) | |
| else: | |
| pause_bytes = self.generate_pause(pause_time) | |
| audio_bytes = await self.generate_audio(content) | |
| self.file.write(pause_bytes) | |
| self.file.write(audio_bytes) | |
| def generate_pause(self, time: int) -> bytes: | |
| """ | |
| pause time should be provided in ms | |
| """ | |
| silent: AudioSegment = AudioSegment.silent(time, 24000) | |
| return silent.raw_data | |
| async def generate_audio(self, text: str) -> bytes: | |
| """ | |
| this genertes the real TTS using edge_tts for this part. | |
| """ | |
| temp_chunk = io.BytesIO() | |
| self.text = text | |
| async for chunk in self.stream(): | |
| if chunk['type'] == 'audio': | |
| temp_chunk.write(chunk['data']) | |
| temp_chunk.seek(0) | |
| decoded_chunk = AudioSegment.from_mp3(temp_chunk) | |
| return decoded_chunk.raw_data | |
| async def save( | |
| self, | |
| audio_fname: Union[str, bytes], | |
| metadata_fname: Optional[Union[str, bytes]] = None, | |
| ) -> None: | |
| """ | |
| Save the audio and metadata to the specified files. | |
| """ | |
| await self.chunkify() | |
| await super().save(audio_fname, metadata_fname) | |
| self.file.seek(0) | |
| audio: AudioSegment = AudioSegment.from_raw( | |
| self.file, | |
| sample_width=2, | |
| frame_rate=24000, | |
| channels=1 | |
| ) | |
| audio.export(audio_fname) | |
| """ | |
| Example Usage | |
| """ | |
| text = "Hello Brother [pause: 2000ms] Just paused 2 seconds [pause: 3s]" | |
| async def main(): | |
| com = CommWithPauses(text) | |
| await com.save('out.mp3') | |
| asyncio.run(main()) | |
corrected version
import asyncio
from typing import Union, Optional
from pydub import AudioSegment
import io
from edge_tts import Communicate
import os
import glob
class NoPausesFound(Exception):
def __init__(self, description=None) -> None:
self.description = (
f'No pauses were found in the text. Please '
+ f'consider using `edge_tts.Communicate` instead.'
)
super().__init__(self.description)
class CommWithPauses(Communicate):
"""
This class uses edge_tts to generate text
but with pauses for example:- text: 'Hello
this is simple text. [pause: 2s] Paused 2s'
"""
def __init__(
self,
text: str,
voice: str = "Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)",
max_pause: int = 6, # maximum pause time in seconds.
**kwargs
) -> None:
self.text = text
self.voice = voice
self.kwargs = kwargs
self.max_pause = max_pause * 1000
self.parsed = self.parse_text()
self.file = io.BytesIO()
super().__init__("", voice, **kwargs) # Initialize parent with empty text to set up other state
def parse_text(self):
if not "[pause:" in self.text:
raise NoPausesFound
parts = self.text.split("[pause:")
for part in parts:
if "]" in part:
pause_time, content = part.split("]", 1)
pause_time = self.parse_time(pause_time)
yield pause_time, content.strip()
else:
content = part
yield 0, content.strip()
def parse_time(self, time_str: str) -> int:
if time_str[-2:] == 'ms':
unit = 'ms'
time_value = int(time_str[:-2])
return min(time_value, self.max_pause)
elif time_str[-1] == 's':
unit = 's'
time_value = int(time_str[:-1]) * 1000
return min(time_value, self.max_pause)
else:
raise ValueError(f"Invalid time unit! only ms/s are allowed")
async def chunkify(self):
for pause_time, content in self.parsed:
if not pause_time and not content:
pass
elif not pause_time and content:
audio_bytes = await self.generate_audio(content)
self.file.write(audio_bytes)
elif not content and pause_time:
pause_bytes = self.generate_pause(pause_time)
self.file.write(pause_bytes)
else:
pause_bytes = self.generate_pause(pause_time)
audio_bytes = await self.generate_audio(content)
self.file.write(pause_bytes)
self.file.write(audio_bytes)
def generate_pause(self, time: int) -> bytes:
"""
pause time should be provided in ms
"""
silent: AudioSegment = AudioSegment.silent(time, 24000)
return silent.raw_data
async def generate_audio(self, text: str) -> bytes:
"""
this generates the real TTS using edge_tts for this part.
"""
temp_comm = Communicate(text, self.voice, **self.kwargs)
temp_chunk = io.BytesIO()
async for chunk in temp_comm.stream():
if chunk['type'] == 'audio':
temp_chunk.write(chunk['data'])
temp_chunk.seek(0)
decoded_chunk = AudioSegment.from_mp3(temp_chunk)
return decoded_chunk.raw_data
async def save(
self,
audio_fname: Union[str, bytes],
metadata_fname: Optional[Union[str, bytes]] = None,
) -> None:
"""
Save the audio and metadata to the specified files.
"""
await self.chunkify()
# Remove super().save() - we're handling the export manually
self.file.seek(0)
audio: AudioSegment = AudioSegment.from_raw(
self.file,
sample_width=2,
frame_rate=24000,
channels=1
)
audio.export(audio_fname)
"""
Example Usage
"""
text = "Hello Brother [pause: 2000ms] Just paused 2 seconds [pause: 3s]"
async def main():
com = CommWithPauses(text,voice=VOICE)
await com.save('out.mp3')
# Clean up temporary ffcache files created by pydub/ffmpeg
for file in glob.glob("ffcache*"):
os.remove(file)
if __name__ == "__main__":
# asyncio.run(amain())
asyncio.run(main())
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Please, tell me how to add these values to your script
--rate=-10% --volume=+20%