| Approach | Supported providers | Base path |
|---|---|---|
| Provider proxy (native SDK) | Google Gemini, Google Vertex AI, OpenAI, Azure Voice in AI Foundry | wss://{controlPlaneUrl}/live/{providerAccountName} |
{controlPlaneUrl} with your gateway URL and your-tfy-api-key with your TrueFoundry API key. Replace {providerAccountName} with the display name of your provider account on TrueFoundry.
Model names: The model ID in code must match the display name of the model on your TrueFoundry provider account.
Which SDK to use: Use the
google-genai Python SDK for Google Gemini and Google Vertex AI, the openai Python SDK for OpenAI and Azure AI Foundry, and the azure-ai-voicelive Python SDK for Azure AI Foundry (alternative) — all pointed at the gateway WebSocket URL above.Model type: When adding a realtime model to the gateway, make sure to select Realtime as the model type.
Code snippet
After adding the models, you can get a ready-to-use code snippet from the TrueFoundry platform or use the examples below. The example below demonstrates a realtime audio session, streaming microphone input to the model and playing back audio responses through the speaker. You can adapt the code to use other modalities as needed.Google Gemini
Google Gemini
Copy
Ask AI
"""
Gemini Live API - Realtime Audio Streaming
pip install google-genai pyaudio
"""
import asyncio
import pyaudio
from google import genai
from google.genai import types
FORMAT = pyaudio.paInt16
CHANNELS = 1
SEND_SAMPLE_RATE = 16000
RECEIVE_SAMPLE_RATE = 24000
CHUNK_SIZE = 1024
API_KEY = "your-tfy-api-key"
MODEL = "gemini-live-2.5-flash" # actual model id
BASE_URL = "https://{controlPlaneUrl}/api/llm/live/{geminiProviderAccountName}"
client = genai.Client(
http_options={
"base_url": BASE_URL,
"headers": {
"Authorization": f"Bearer {API_KEY}",
},
},
api_key=API_KEY,
)
CONFIG = types.LiveConnectConfig(
response_modalities=["AUDIO"],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr")
)
),
)
pya = pyaudio.PyAudio()
async def main():
try:
async with client.aio.live.connect(model=MODEL, config=CONFIG) as session:
print("Connected!")
mic_info = pya.get_default_input_device_info()
mic_stream = pya.open(
format=FORMAT, channels=CHANNELS, rate=SEND_SAMPLE_RATE,
input=True, input_device_index=mic_info["index"],
frames_per_buffer=CHUNK_SIZE,
)
speaker_stream = pya.open(
format=FORMAT, channels=CHANNELS, rate=RECEIVE_SAMPLE_RATE,
output=True,
)
audio_in_queue = asyncio.Queue()
async def send_audio():
while True:
data = await asyncio.to_thread(
mic_stream.read, CHUNK_SIZE, exception_on_overflow=False
)
await session.send(input={"data": data, "mime_type": "audio/pcm"})
async def receive_audio():
while True:
turn = session.receive()
was_interrupted = False
async for response in turn:
if data := response.data:
audio_in_queue.put_nowait(data)
if text := response.text:
print(text, end="")
if (
hasattr(response, "server_content")
and response.server_content
and hasattr(response.server_content, "interrupted")
and response.server_content.interrupted
):
was_interrupted = True
if was_interrupted:
while not audio_in_queue.empty():
audio_in_queue.get_nowait()
async def play_audio():
while True:
data = await audio_in_queue.get()
await asyncio.to_thread(speaker_stream.write, data)
async with asyncio.TaskGroup() as tg:
tg.create_task(send_audio())
tg.create_task(receive_audio())
tg.create_task(play_audio())
except Exception as e:
print(f"Error: {e}")
finally:
pya.terminate()
asyncio.run(main())
Google Vertex AI
Google Vertex AI
Copy
Ask AI
"""
Gemini Live API (Vertex AI) - Realtime Audio Streaming
pip install google-genai pyaudio google-auth
"""
import asyncio
import pyaudio
import google.auth.credentials
from google import genai
from google.genai import types
FORMAT = pyaudio.paInt16
CHANNELS = 1
SEND_SAMPLE_RATE = 16000
RECEIVE_SAMPLE_RATE = 24000
CHUNK_SIZE = 1024
API_KEY = "your-tfy-api-key"
MODEL = "gemini-live-2.5-flash" # actual model id
BASE_URL = "https://{controlPlaneUrl}/api/llm/live/{vertexProviderAccountName}"
class _GatewayCredentials(google.auth.credentials.Credentials):
"""Bypasses local ADC; the gateway handles Vertex AI authentication."""
def __init__(self, token):
super().__init__()
self.token = token
def refresh(self, request):
pass
@property
def valid(self):
return True
client = genai.Client(
http_options={
"base_url": BASE_URL,
"headers": {"Authorization": f"Bearer {API_KEY}"},
},
vertexai=True,
project="your-gcp-project",
location="us-central1",
credentials=_GatewayCredentials(API_KEY),
)
CONFIG = types.LiveConnectConfig(
response_modalities=["AUDIO"],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr")
)
),
)
pya = pyaudio.PyAudio()
async def main():
try:
async with client.aio.live.connect(model=MODEL, config=CONFIG) as session:
print("Connected!")
mic_info = pya.get_default_input_device_info()
mic_stream = pya.open(
format=FORMAT, channels=CHANNELS, rate=SEND_SAMPLE_RATE,
input=True, input_device_index=mic_info["index"],
frames_per_buffer=CHUNK_SIZE,
)
speaker_stream = pya.open(
format=FORMAT, channels=CHANNELS, rate=RECEIVE_SAMPLE_RATE,
output=True,
)
audio_in_queue = asyncio.Queue()
async def send_audio():
while True:
data = await asyncio.to_thread(
mic_stream.read, CHUNK_SIZE, exception_on_overflow=False
)
await session.send(input={"data": data, "mime_type": "audio/pcm"})
async def receive_audio():
while True:
turn = session.receive()
was_interrupted = False
async for response in turn:
if data := response.data:
audio_in_queue.put_nowait(data)
if text := response.text:
print(text, end="")
if (
hasattr(response, "server_content")
and response.server_content
and hasattr(response.server_content, "interrupted")
and response.server_content.interrupted
):
was_interrupted = True
if was_interrupted:
while not audio_in_queue.empty():
audio_in_queue.get_nowait()
async def play_audio():
while True:
data = await audio_in_queue.get()
await asyncio.to_thread(speaker_stream.write, data)
async with asyncio.TaskGroup() as tg:
tg.create_task(send_audio())
tg.create_task(receive_audio())
tg.create_task(play_audio())
except Exception as e:
print(f"Error: {e}")
finally:
pya.terminate()
asyncio.run(main())
OpenAI
OpenAI
Copy
Ask AI
"""
OpenAI Realtime API - Audio Streaming
Ref: https://github.com/openai/openai-python/blob/main/examples/realtime/audio_util.py
Requires Python 3.11+
pip install "openai[realtime]" numpy sounddevice
"""
import base64
import asyncio
import threading
import numpy as np
import sounddevice as sd
from openai import AsyncOpenAI
from openai.resources.realtime.realtime import AsyncRealtimeConnection
SAMPLE_RATE = 24000
CHANNELS = 1
CHUNK_LENGTH_S = 0.05
API_KEY = "your-tfy-api-key"
MODEL = "gpt-4o-realtime-preview" # actual model id
client = AsyncOpenAI(
api_key=API_KEY,
websocket_base_url="wss://{controlPlaneUrl}/live/{openaiProviderAccountName}",
)
class AudioPlayerAsync:
def __init__(self):
self.queue = []
self.lock = threading.Lock()
self.stream = sd.OutputStream(
callback=self._callback, samplerate=SAMPLE_RATE,
channels=CHANNELS, dtype=np.int16,
blocksize=int(CHUNK_LENGTH_S * SAMPLE_RATE),
)
self.playing = False
def _callback(self, outdata, frames, time, status):
with self.lock:
data = np.empty(0, dtype=np.int16)
while len(data) < frames and self.queue:
item = self.queue.pop(0)
needed = frames - len(data)
data = np.concatenate((data, item[:needed]))
if len(item) > needed:
self.queue.insert(0, item[needed:])
if len(data) < frames:
data = np.concatenate((data, np.zeros(frames - len(data), dtype=np.int16)))
outdata[:] = data.reshape(-1, 1)
def add_data(self, data: bytes):
with self.lock:
self.queue.append(np.frombuffer(data, dtype=np.int16))
if not self.playing:
self.playing = True
self.stream.start()
def stop(self):
self.playing = False
self.stream.stop()
with self.lock:
self.queue = []
def terminate(self):
self.stream.close()
async def send_mic_audio(connection: AsyncRealtimeConnection):
read_size = int(SAMPLE_RATE * 0.02)
stream = sd.InputStream(channels=CHANNELS, samplerate=SAMPLE_RATE, dtype="int16")
stream.start()
try:
while True:
if stream.read_available < read_size:
await asyncio.sleep(0)
continue
data, _ = stream.read(read_size)
await connection.input_audio_buffer.append(
audio=base64.b64encode(data).decode("utf-8"),
)
await asyncio.sleep(0)
except KeyboardInterrupt:
pass
finally:
stream.stop()
stream.close()
async def main():
player = AudioPlayerAsync()
try:
async with client.realtime.connect(model=MODEL) as connection:
print("Connected!")
await connection.session.update(session={
"type": "realtime",
"output_modalities": ["audio"],
"audio": {
"input": {
"turn_detection": {"type": "server_vad"}
},
"output": {
"voice": "alloy"
}
}
})
async def receive_events():
async for event in connection:
if event.type == "response.output_audio.delta":
player.add_data(base64.b64decode(event.delta))
elif event.type == "response.output_audio_transcript.delta":
print(event.delta, end="", flush=True)
elif event.type == "response.output_audio_transcript.done":
print()
elif event.type == "input_audio_buffer.speech_started":
player.stop()
elif event.type == "error":
print(f"\n[ERROR] {event}")
print("Start speaking! (Ctrl+C to stop)\n")
async with asyncio.TaskGroup() as tg:
tg.create_task(send_mic_audio(connection))
tg.create_task(receive_events())
except Exception as e:
print(f"Error: {e}")
finally:
player.terminate()
asyncio.run(main())
Azure AI Foundry
Azure AI Foundry
Copy
Ask AI
"""
OpenAI Realtime API via Azure AI Foundry - Audio Streaming
Ref: https://github.com/openai/openai-python/blob/main/examples/realtime/audio_util.py
Requires Python 3.11+
pip install "openai[realtime]" numpy sounddevice
"""
import base64
import asyncio
import threading
import numpy as np
import sounddevice as sd
from openai import AsyncOpenAI
from openai.resources.realtime.realtime import AsyncRealtimeConnection
SAMPLE_RATE = 24000
CHANNELS = 1
CHUNK_LENGTH_S = 0.05
API_KEY = "your-tfy-api-key"
MODEL = "gpt-4o-realtime-preview" # actual model id
client = AsyncOpenAI(
api_key=API_KEY,
websocket_base_url="wss://{controlPlaneUrl}/live/{azureFoundryProviderAccountName}",
)
class AudioPlayerAsync:
def __init__(self):
self.queue = []
self.lock = threading.Lock()
self.stream = sd.OutputStream(
callback=self._callback, samplerate=SAMPLE_RATE,
channels=CHANNELS, dtype=np.int16,
blocksize=int(CHUNK_LENGTH_S * SAMPLE_RATE),
)
self.playing = False
def _callback(self, outdata, frames, time, status):
with self.lock:
data = np.empty(0, dtype=np.int16)
while len(data) < frames and self.queue:
item = self.queue.pop(0)
needed = frames - len(data)
data = np.concatenate((data, item[:needed]))
if len(item) > needed:
self.queue.insert(0, item[needed:])
if len(data) < frames:
data = np.concatenate((data, np.zeros(frames - len(data), dtype=np.int16)))
outdata[:] = data.reshape(-1, 1)
def add_data(self, data: bytes):
with self.lock:
self.queue.append(np.frombuffer(data, dtype=np.int16))
if not self.playing:
self.playing = True
self.stream.start()
def stop(self):
self.playing = False
self.stream.stop()
with self.lock:
self.queue = []
def terminate(self):
self.stream.close()
async def send_mic_audio(connection: AsyncRealtimeConnection):
read_size = int(SAMPLE_RATE * 0.02)
stream = sd.InputStream(channels=CHANNELS, samplerate=SAMPLE_RATE, dtype="int16")
stream.start()
try:
while True:
if stream.read_available < read_size:
await asyncio.sleep(0)
continue
data, _ = stream.read(read_size)
await connection.input_audio_buffer.append(
audio=base64.b64encode(data).decode("utf-8"),
)
await asyncio.sleep(0)
except KeyboardInterrupt:
pass
finally:
stream.stop()
stream.close()
async def main():
player = AudioPlayerAsync()
try:
async with client.realtime.connect(model=MODEL) as connection:
print("Connected!")
await connection.session.update(session={
"type": "realtime",
"output_modalities": ["audio"],
"audio": {
"input": {
"turn_detection": {"type": "server_vad"}
},
"output": {
"voice": "alloy"
}
}
})
async def receive_events():
async for event in connection:
if event.type == "response.output_audio.delta":
player.add_data(base64.b64decode(event.delta))
elif event.type == "response.output_audio_transcript.delta":
print(event.delta, end="", flush=True)
elif event.type == "response.output_audio_transcript.done":
print()
elif event.type == "input_audio_buffer.speech_started":
player.stop()
elif event.type == "error":
print(f"\n[ERROR] {event}")
print("Start speaking! (Ctrl+C to stop)\n")
async with asyncio.TaskGroup() as tg:
tg.create_task(send_mic_audio(connection))
tg.create_task(receive_events())
except Exception as e:
print(f"Error: {e}")
finally:
player.terminate()
asyncio.run(main())
Azure AI Foundry (VoiceLive SDK)
Azure AI Foundry (VoiceLive SDK)
Copy
Ask AI
# pip install "azure-ai-voicelive[aiohttp]"
import asyncio
from azure.core.credentials import AccessToken
from azure.ai.voicelive.aio import connect
from azure.ai.voicelive.models import (
RequestSession, Modality, InputAudioFormat, OutputAudioFormat,
ServerVad, ServerEventType,
)
API_KEY = "your-tfy-api-key"
MODEL = "gpt-4o-realtime-preview" # actual model id
ENDPOINT = "wss://{controlPlaneUrl}/live/{azureFoundryProviderAccountName}"
class BearerTokenCredential:
"""Sends token as Authorization: Bearer header instead of api-key header."""
def __init__(self, token: str):
self._token = token
async def get_token(self, *scopes, **kwargs):
return AccessToken(self._token, 0)
async def close(self):
pass
async def __aenter__(self):
return self
async def __aexit__(self, *args):
pass
async def main():
async with connect(
endpoint=ENDPOINT,
credential=BearerTokenCredential(API_KEY),
model=MODEL,
) as conn:
session = RequestSession(
modalities=[Modality.TEXT, Modality.AUDIO],
instructions="You are a helpful assistant.",
input_audio_format=InputAudioFormat.PCM16,
output_audio_format=OutputAudioFormat.PCM16,
turn_detection=ServerVad(
threshold=0.5,
prefix_padding_ms=300,
silence_duration_ms=500,
),
)
await conn.session.update(session=session)
async for evt in conn:
print(f"Event: {evt.type}")
if evt.type == ServerEventType.RESPONSE_DONE:
break
asyncio.run(main())