On this tutorial, you’ll construct an end-to-end streaming voice agent that mirrors how fashionable low-latency dialog methods work in actual time. Simulate your complete pipeline, from chunked audio enter and streaming speech recognition to incremental language mannequin inference and streamed text-to-speech output, whereas explicitly monitoring latency at each stage. Through the use of tight latency budgets and observing metrics corresponding to time to first token and time to first voice, we deal with the sensible engineering tradeoffs that form a responsive voice-based consumer expertise. Please verify Full code here.
import time
import asyncio
import numpy as np
from collections import deque
from dataclasses import dataclass
from typing import Record, AsyncIterator
from enum import Enum
import matplotlib.pyplot as plt
@dataclass
class LatencyMetrics:
audio_chunk_received: float = 0.0
asr_started: float = 0.0
asr_partial: float = 0.0
asr_complete: float = 0.0
llm_started: float = 0.0
llm_first_token: float = 0.0
llm_complete: float = 0.0
tts_started: float = 0.0
tts_first_chunk: float = 0.0
tts_complete: float = 0.0
def get_time_to_first_audio(self) -> float:
return self.tts_first_chunk - self.asr_complete if self.tts_first_chunk and self.asr_complete else 0.0
def get_total_latency(self) -> float:
return self.tts_complete - self.audio_chunk_received if self.tts_complete else 0.0
@dataclass
class LatencyBudgets:
asr_processing: float = 0.1
asr_finalization: float = 0.3
llm_first_token: float = 0.5
llm_token_generation: float = 0.02
tts_first_chunk: float = 0.2
tts_chunk_generation: float = 0.05
time_to_first_audio: float = 1.0
class AgentState(Enum):
LISTENING = "listening"
PROCESSING_SPEECH = "processing_speech"
THINKING = "pondering"
SPEAKING = "talking"
INTERRUPTED = "interrupted"
Defines core information constructions and state representations that will let you monitor delays all through the audio pipeline. We formalize the ASR, LLM, and TTS timing alerts to make sure constant measurements throughout all levels. It additionally establishes a transparent agent state machine that guides how the system transitions throughout conversational turns. Please verify Full code here.
class AudioInputStream:
def __init__(self, sample_rate: int = 16000, chunk_duration_ms: int = 100):
self.sample_rate = sample_rate
self.chunk_duration_ms = chunk_duration_ms
self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)
async def stream_audio(self, textual content: str) -> AsyncIterator[np.ndarray]:
chars_per_second = (150 * 5) / 60
duration_seconds = len(textual content) / chars_per_second
num_chunks = int(duration_seconds * 1000 / self.chunk_duration_ms)
for _ in vary(num_chunks):
chunk = np.random.randn(self.chunk_size).astype(np.float32) * 0.1
await asyncio.sleep(self.chunk_duration_ms / 1000)
yield chunk
Simulates real-time audio enter by dividing the audio into chunks of fastened period that arrive asynchronously. Mannequin lifelike speech charges and streaming conduct to imitate reside microphone enter. This stream is used as a foundation for testing downstream delay-sensitive parts. Please verify Full code here.
class StreamingASR:
def __init__(self, latency_budget: float = 0.1):
self.latency_budget = latency_budget
self.silence_threshold = 0.5
async def transcribe_stream(
self,
audio_stream: AsyncIterator[np.ndarray],
ground_truth: str
) -> AsyncIterator[tuple[str, bool]]:
phrases = ground_truth.cut up()
words_transcribed = 0
silence_duration = 0.0
chunk_count = 0
async for chunk in audio_stream:
chunk_count += 1
await asyncio.sleep(self.latency_budget)
if chunk_count % 3 == 0 and words_transcribed < len(phrases):
words_transcribed += 1
yield " ".be a part of(phrases[:words_transcribed]), False
audio_power = np.imply(np.abs(chunk))
silence_duration = silence_duration + 0.1 if audio_power < 0.05 else 0.0
if silence_duration >= self.silence_threshold:
await asyncio.sleep(0.2)
yield ground_truth, True
return
yield ground_truth, True
Implement a streaming ASR module that generates partial transcriptions earlier than outputting the ultimate outcomes. Step by step we’ll reveal language that displays how fashionable ASR methods function in actual time. We additionally introduce silence-based termination processing to approximate the detection of utterance termination. Please verify Full code here.
class StreamingLLM:
def __init__(self, time_to_first_token: float = 0.3, tokens_per_second: float = 50):
self.time_to_first_token = time_to_first_token
self.tokens_per_second = tokens_per_second
async def generate_response(self, immediate: str) -> AsyncIterator[str]:
responses = {
"hiya": "Howdy! How can I assist you at present?",
"climate": "The climate is sunny with a temperature of 72°F.",
"time": "The present time is 2:30 PM.",
"default": "I perceive. Let me assist you with that."
}
response = responses["default"]
for key in responses:
if key in immediate.decrease():
response = responses[key]
break
await asyncio.sleep(self.time_to_first_token)
for phrase in response.cut up():
yield phrase + " "
await asyncio.sleep(1.0 / self.tokens_per_second)
class StreamingTTS:
def __init__(self, time_to_first_chunk: float = 0.2, chars_per_second: float = 15):
self.time_to_first_chunk = time_to_first_chunk
self.chars_per_second = chars_per_second
async def synthesize_stream(self, text_stream: AsyncIterator[str]) -> AsyncIterator[np.ndarray]:
first_chunk = True
buffer = ""
async for textual content in text_stream:
buffer += textual content
if len(buffer) >= 20 or first_chunk:
if first_chunk:
await asyncio.sleep(self.time_to_first_chunk)
first_chunk = False
period = len(buffer) / self.chars_per_second
yield np.random.randn(int(16000 * period)).astype(np.float32) * 0.1
buffer = ""
await asyncio.sleep(period * 0.5)
This snippet fashions a streaming language mannequin and a streaming text-to-speech engine that works collectively. Generate a response for every token to seize the conduct in time as much as the primary token. Subsequent, convert the incremental textual content into audio chunks to simulate early steady speech synthesis. Please verify Full code here.
class StreamingVoiceAgent:
def __init__(self, latency_budgets: LatencyBudgets):
self.budgets = latency_budgets
self.audio_stream = AudioInputStream()
self.asr = StreamingASR(latency_budgets.asr_processing)
self.llm = StreamingLLM(
latency_budgets.llm_first_token,
1.0 / latency_budgets.llm_token_generation
)
self.tts = StreamingTTS(
latency_budgets.tts_first_chunk,
1.0 / latency_budgets.tts_chunk_generation
)
self.state = AgentState.LISTENING
self.metrics_history: Record[LatencyMetrics] = []
async def process_turn(self, user_input: str) -> LatencyMetrics:
metrics = LatencyMetrics()
start_time = time.time()
metrics.audio_chunk_received = time.time() - start_time
audio_gen = self.audio_stream.stream_audio(user_input)
metrics.asr_started = time.time() - start_time
async for textual content, closing in self.asr.transcribe_stream(audio_gen, user_input):
if closing:
metrics.asr_complete = time.time() - start_time
transcription = textual content
metrics.llm_started = time.time() - start_time
response = ""
async for token in self.llm.generate_response(transcription):
if not metrics.llm_first_token:
metrics.llm_first_token = time.time() - start_time
response += token
metrics.llm_complete = time.time() - start_time
metrics.tts_started = time.time() - start_time
async def text_stream():
for phrase in response.cut up():
yield phrase + " "
async for _ in self.tts.synthesize_stream(text_stream()):
if not metrics.tts_first_chunk:
metrics.tts_first_chunk = time.time() - start_time
metrics.tts_complete = time.time() - start_time
self.metrics_history.append(metrics)
return metrics
Coordinate a full-voice agent by connecting voice enter, ASR, LLM, and TTS right into a single asynchronous circulate. Document precise timestamps on every transition to calculate essential latency metrics. To allow systematic efficiency evaluation, we deal with every consumer flip as an impartial experiment. Please verify Full code here.
async def run_demo():
budgets = LatencyBudgets(
asr_processing=0.08,
llm_first_token=0.3,
llm_token_generation=0.02,
tts_first_chunk=0.15,
time_to_first_audio=0.8
)
agent = StreamingVoiceAgent(budgets)
inputs = [
"Hello, how are you today?",
"What's the weather like?",
"Can you tell me the time?"
]
for textual content in inputs:
await agent.process_turn(textual content)
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(run_demo())
Run your complete system over a number of dialog turns and observe the consistency and variance in latency. Apply aggressive latency budgets to load the pipeline below lifelike constraints. Use these runs to validate whether or not your system meets your responsiveness objectives throughout interactions.
In conclusion, we demonstrated how a whole streaming voice agent could be orchestrated as a single asynchronous pipeline with clear stage boundaries and measurable efficiency ensures. We confirmed that combining partial ASR, token-level LLM streaming, and early-start TTS reduces perceived delay, despite the fact that complete computation time stays important. This strategy helps you systematically cause about turn-taking, responsiveness, and optimization levers, and offers a stable basis for extending your system to real-world deployments utilizing manufacturing ASR, LLM, and TTS fashions.
Please verify Full code here. Please be at liberty to observe us too Twitter Do not forget to affix us 100,000+ ML subreddits and subscribe our newsletter. grasp on! Are you on telegram? You can now also participate by telegram.
Asif Razzaq is the CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of synthetic intelligence for social good. His newest endeavor is the launch of Marktechpost, a synthetic intelligence media platform. It stands out for its thorough protection of machine studying and deep studying information, which is technically sound and simply understood by a large viewers. The platform boasts over 2 million views per thirty days, demonstrating its reputation amongst viewers.

