connect_realtime
Speech-to-speech WebSocket sessions, async only.
Open a speech-to-speech (S2S) session. The server mints a short-lived WebSocket token, then proxies the client WS directly to the underlying provider (OpenAI Realtime, Gemini Live, xAI Grok Voice). The browser media transport is skipped entirely so time-to-first-audio stays under ~300 ms.
Realtime sessions are only available on AsyncSpeko. A synchronous WebSocket loop would block the event loop on every audio chunk, defeating the purpose of a low-latency S2S pipeline.
import asyncio
from spekoai import AsyncSpeko, RealtimeConnectParams
async def main():
async with AsyncSpeko(api_key=os.environ["SPEKO_API_KEY"]) as speko:
session = await speko.connect_realtime(
RealtimeConnectParams(provider="openai", model="gpt-realtime"),
)
async with session:
await session.send_audio(pcm_chunk)
async for frame in session:
if frame["type"] == "audio":
play(frame["pcm"])
elif frame["type"] == "transcript":
print(frame["text"])
asyncio.run(main())Signature
await AsyncSpeko.connect_realtime(
params: RealtimeConnectParams,
) -> AsyncRealtimeSessionRealtimeConnectParams
provider'openai' | 'google' | 'xai'requiredmodelstringrequiredProvider-specific model id (e.g. gpt-realtime, gemini-2.5-flash-native-audio, grok-voice-beta).
voicestringVoice id override — interpreted per provider.
system_promptstringtemperaturefloatinput_sample_rate16000 | 24000output_sample_rate16000 | 24000toolslist[RealtimeToolSpec]Tool definitions the assistant may call. Receive tool_call frames and respond with send_tool_result.
metadatadict[str, object]Free-form metadata attached to the session record.
ttl_secondsintMax session duration in seconds. Server-capped at 1800 (30 min).
AsyncRealtimeSession
The returned session is both an async context manager and an async iterator.
Properties
| Property | Type | Description |
|---|---|---|
session_id | str | Server-assigned session identifier. |
expires_at | str | ISO-8601 expiry for the WS token. |
Methods
send_audio(pcm: bytes)coroutineShip a PCM16 chunk up to the model (binary WS frame).
commit()coroutineSignal end-of-user-turn; the server flushes buffered audio upstream.
interrupt()coroutineCancel the assistant's current response mid-generation.
send_tool_result(call_id: str, output: str)coroutineReturn the result of a previously-issued tool call.
close(code=1000, reason='client_closed')coroutineClose the socket. Safe to call multiple times; the context manager calls it for you on exit.
Frame types
Iterating the session yields dicts tagged by type:
Frame type | Payload fields |
|---|---|
audio | pcm: bytes, sample_rate: 24000 |
transcript | role: 'user' | 'assistant', text: str, final: bool |
tool_call | call_id: str, name: str, arguments: str (JSON) |
usage | input_audio_tokens: int, output_audio_tokens: int |
error | code: str, message: str |
close | reason: str |
Example — tool calling
import json
from spekoai import AsyncSpeko, RealtimeConnectParams, RealtimeToolSpec
async with AsyncSpeko(api_key=os.environ["SPEKO_API_KEY"]) as speko:
session = await speko.connect_realtime(
RealtimeConnectParams(
provider="openai",
model="gpt-realtime",
tools=[
RealtimeToolSpec(
name="get_weather",
description="Current weather for a city.",
parameters={
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
),
],
),
)
async with session:
async for frame in session:
if frame["type"] == "tool_call" and frame["name"] == "get_weather":
args = json.loads(frame["arguments"])
result = fetch_weather(args["city"])
await session.send_tool_result(frame["call_id"], result)