Wednesday, May 27, 2026
banner
Top Selling Multipurpose WP Theme

On this tutorial, you’ll construct an end-to-end streaming voice agent that mirrors how fashionable low-latency dialog methods work in actual time. Simulate your complete pipeline, from chunked audio enter and streaming speech recognition to incremental language mannequin inference and streamed text-to-speech output, whereas explicitly monitoring latency at each stage. Through the use of tight latency budgets and observing metrics corresponding to time to first token and time to first voice, we deal with the sensible engineering tradeoffs that form a responsive voice-based consumer expertise. Please verify Full code here.

import time
import asyncio
import numpy as np
from collections import deque
from dataclasses import dataclass
from typing import Record, AsyncIterator
from enum import Enum
import matplotlib.pyplot as plt


@dataclass
class LatencyMetrics:
   audio_chunk_received: float = 0.0
   asr_started: float = 0.0
   asr_partial: float = 0.0
   asr_complete: float = 0.0
   llm_started: float = 0.0
   llm_first_token: float = 0.0
   llm_complete: float = 0.0
   tts_started: float = 0.0
   tts_first_chunk: float = 0.0
   tts_complete: float = 0.0


   def get_time_to_first_audio(self) -> float:
       return self.tts_first_chunk - self.asr_complete if self.tts_first_chunk and self.asr_complete else 0.0


   def get_total_latency(self) -> float:
       return self.tts_complete - self.audio_chunk_received if self.tts_complete else 0.0


@dataclass
class LatencyBudgets:
   asr_processing: float = 0.1
   asr_finalization: float = 0.3
   llm_first_token: float = 0.5
   llm_token_generation: float = 0.02
   tts_first_chunk: float = 0.2
   tts_chunk_generation: float = 0.05
   time_to_first_audio: float = 1.0


class AgentState(Enum):
   LISTENING = "listening"
   PROCESSING_SPEECH = "processing_speech"
   THINKING = "pondering"
   SPEAKING = "talking"
   INTERRUPTED = "interrupted"

Defines core information constructions and state representations that will let you monitor delays all through the audio pipeline. We formalize the ASR, LLM, and TTS timing alerts to make sure constant measurements throughout all levels. It additionally establishes a transparent agent state machine that guides how the system transitions throughout conversational turns. Please verify Full code here.

class AudioInputStream:
   def __init__(self, sample_rate: int = 16000, chunk_duration_ms: int = 100):
       self.sample_rate = sample_rate
       self.chunk_duration_ms = chunk_duration_ms
       self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)


   async def stream_audio(self, textual content: str) -> AsyncIterator[np.ndarray]:
       chars_per_second = (150 * 5) / 60
       duration_seconds = len(textual content) / chars_per_second
       num_chunks = int(duration_seconds * 1000 / self.chunk_duration_ms)


       for _ in vary(num_chunks):
           chunk = np.random.randn(self.chunk_size).astype(np.float32) * 0.1
           await asyncio.sleep(self.chunk_duration_ms / 1000)
           yield chunk

Simulates real-time audio enter by dividing the audio into chunks of fastened period that arrive asynchronously. Mannequin lifelike speech charges and streaming conduct to imitate reside microphone enter. This stream is used as a foundation for testing downstream delay-sensitive parts. Please verify Full code here.

class StreamingASR:
   def __init__(self, latency_budget: float = 0.1):
       self.latency_budget = latency_budget
       self.silence_threshold = 0.5


   async def transcribe_stream(
       self,
       audio_stream: AsyncIterator[np.ndarray],
       ground_truth: str
   ) -> AsyncIterator[tuple[str, bool]]:
       phrases = ground_truth.cut up()
       words_transcribed = 0
       silence_duration = 0.0
       chunk_count = 0


       async for chunk in audio_stream:
           chunk_count += 1
           await asyncio.sleep(self.latency_budget)


           if chunk_count % 3 == 0 and words_transcribed < len(phrases):
               words_transcribed += 1
               yield " ".be a part of(phrases[:words_transcribed]), False


           audio_power = np.imply(np.abs(chunk))
           silence_duration = silence_duration + 0.1 if audio_power < 0.05 else 0.0


           if silence_duration >= self.silence_threshold:
               await asyncio.sleep(0.2)
               yield ground_truth, True
               return


       yield ground_truth, True

Implement a streaming ASR module that generates partial transcriptions earlier than outputting the ultimate outcomes. Step by step we’ll reveal language that displays how fashionable ASR methods function in actual time. We additionally introduce silence-based termination processing to approximate the detection of utterance termination. Please verify Full code here.

class StreamingLLM:
   def __init__(self, time_to_first_token: float = 0.3, tokens_per_second: float = 50):
       self.time_to_first_token = time_to_first_token
       self.tokens_per_second = tokens_per_second


   async def generate_response(self, immediate: str) -> AsyncIterator[str]:
       responses = {
           "hiya": "Howdy! How can I assist you at present?",
           "climate": "The climate is sunny with a temperature of 72°F.",
           "time": "The present time is 2:30 PM.",
           "default": "I perceive. Let me assist you with that."
       }


       response = responses["default"]
       for key in responses:
           if key in immediate.decrease():
               response = responses[key]
               break


       await asyncio.sleep(self.time_to_first_token)


       for phrase in response.cut up():
           yield phrase + " "
           await asyncio.sleep(1.0 / self.tokens_per_second)


class StreamingTTS:
   def __init__(self, time_to_first_chunk: float = 0.2, chars_per_second: float = 15):
       self.time_to_first_chunk = time_to_first_chunk
       self.chars_per_second = chars_per_second


   async def synthesize_stream(self, text_stream: AsyncIterator[str]) -> AsyncIterator[np.ndarray]:
       first_chunk = True
       buffer = ""


       async for textual content in text_stream:
           buffer += textual content
           if len(buffer) >= 20 or first_chunk:
               if first_chunk:
                   await asyncio.sleep(self.time_to_first_chunk)
                   first_chunk = False


               period = len(buffer) / self.chars_per_second
               yield np.random.randn(int(16000 * period)).astype(np.float32) * 0.1
               buffer = ""
               await asyncio.sleep(period * 0.5)

This snippet fashions a streaming language mannequin and a streaming text-to-speech engine that works collectively. Generate a response for every token to seize the conduct in time as much as the primary token. Subsequent, convert the incremental textual content into audio chunks to simulate early steady speech synthesis. Please verify Full code here.

class StreamingVoiceAgent:
   def __init__(self, latency_budgets: LatencyBudgets):
       self.budgets = latency_budgets
       self.audio_stream = AudioInputStream()
       self.asr = StreamingASR(latency_budgets.asr_processing)
       self.llm = StreamingLLM(
           latency_budgets.llm_first_token,
           1.0 / latency_budgets.llm_token_generation
       )
       self.tts = StreamingTTS(
           latency_budgets.tts_first_chunk,
           1.0 / latency_budgets.tts_chunk_generation
       )
       self.state = AgentState.LISTENING
       self.metrics_history: Record[LatencyMetrics] = []


   async def process_turn(self, user_input: str) -> LatencyMetrics:
       metrics = LatencyMetrics()
       start_time = time.time()


       metrics.audio_chunk_received = time.time() - start_time
       audio_gen = self.audio_stream.stream_audio(user_input)


       metrics.asr_started = time.time() - start_time
       async for textual content, closing in self.asr.transcribe_stream(audio_gen, user_input):
           if closing:
               metrics.asr_complete = time.time() - start_time
               transcription = textual content


       metrics.llm_started = time.time() - start_time
       response = ""
       async for token in self.llm.generate_response(transcription):
           if not metrics.llm_first_token:
               metrics.llm_first_token = time.time() - start_time
           response += token


       metrics.llm_complete = time.time() - start_time
       metrics.tts_started = time.time() - start_time


       async def text_stream():
           for phrase in response.cut up():
               yield phrase + " "


       async for _ in self.tts.synthesize_stream(text_stream()):
           if not metrics.tts_first_chunk:
               metrics.tts_first_chunk = time.time() - start_time


       metrics.tts_complete = time.time() - start_time
       self.metrics_history.append(metrics)
       return metrics

Coordinate a full-voice agent by connecting voice enter, ASR, LLM, and TTS right into a single asynchronous circulate. Document precise timestamps on every transition to calculate essential latency metrics. To allow systematic efficiency evaluation, we deal with every consumer flip as an impartial experiment. Please verify Full code here.

async def run_demo():
   budgets = LatencyBudgets(
       asr_processing=0.08,
       llm_first_token=0.3,
       llm_token_generation=0.02,
       tts_first_chunk=0.15,
       time_to_first_audio=0.8
   )


   agent = StreamingVoiceAgent(budgets)


   inputs = [
       "Hello, how are you today?",
       "What's the weather like?",
       "Can you tell me the time?"
   ]


   for textual content in inputs:
       await agent.process_turn(textual content)
       await asyncio.sleep(1)


if __name__ == "__main__":
   asyncio.run(run_demo())

Run your complete system over a number of dialog turns and observe the consistency and variance in latency. Apply aggressive latency budgets to load the pipeline below lifelike constraints. Use these runs to validate whether or not your system meets your responsiveness objectives throughout interactions.

In conclusion, we demonstrated how a whole streaming voice agent could be orchestrated as a single asynchronous pipeline with clear stage boundaries and measurable efficiency ensures. We confirmed that combining partial ASR, token-level LLM streaming, and early-start TTS reduces perceived delay, despite the fact that complete computation time stays important. This strategy helps you systematically cause about turn-taking, responsiveness, and optimization levers, and offers a stable basis for extending your system to real-world deployments utilizing manufacturing ASR, LLM, and TTS fashions.


Please verify Full code here. Please be at liberty to observe us too Twitter Do not forget to affix us 100,000+ ML subreddits and subscribe our newsletter. grasp on! Are you on telegram? You can now also participate by telegram.


Asif Razzaq is the CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of synthetic intelligence for social good. His newest endeavor is the launch of Marktechpost, a synthetic intelligence media platform. It stands out for its thorough protection of machine studying and deep studying information, which is technically sound and simply understood by a large viewers. The platform boasts over 2 million views per thirty days, demonstrating its reputation amongst viewers.

banner
Top Selling Multipurpose WP Theme

Converter

Top Selling Multipurpose WP Theme

Newsletter

Subscribe my Newsletter for new blog posts, tips & new photos. Let's stay updated!

banner
Top Selling Multipurpose WP Theme

Leave a Comment

banner
Top Selling Multipurpose WP Theme

Latest

Best selling

22000,00 $
16000,00 $
6500,00 $

Top rated

6500,00 $
22000,00 $
900000,00 $

Products

Knowledge Unleashed
Knowledge Unleashed

Welcome to Ivugangingo!

At Ivugangingo, we're passionate about delivering insightful content that empowers and informs our readers across a spectrum of crucial topics. Whether you're delving into the world of insurance, navigating the complexities of cryptocurrency, or seeking wellness tips in health and fitness, we've got you covered.