Skip to main content
Open a speech-to-speech (S2S) session. The server mints a short-lived WebSocket token, then proxies the client WS directly to the underlying provider (OpenAI Realtime, Gemini Live, xAI Grok Voice). LiveKit is skipped entirely so time-to-first-audio stays under ~300 ms.
Realtime sessions are only available on AsyncSpeko. A synchronous WebSocket loop would block the event loop on every audio chunk, defeating the purpose of a low-latency S2S pipeline.
import asyncio
from spekoai import AsyncSpeko, RealtimeConnectParams

async def main():
    async with AsyncSpeko(api_key=os.environ["SPEKO_API_KEY"]) as speko:
        session = await speko.connect_realtime(
            RealtimeConnectParams(provider="openai", model="gpt-realtime"),
        )
        async with session:
            await session.send_audio(pcm_chunk)
            async for frame in session:
                if frame["type"] == "audio":
                    play(frame["pcm"])
                elif frame["type"] == "transcript":
                    print(frame["text"])

asyncio.run(main())

Signature

await AsyncSpeko.connect_realtime(
    params: RealtimeConnectParams,
) -> AsyncRealtimeSession

RealtimeConnectParams

provider
'openai' | 'google' | 'xai'
required
model
string
required
Provider-specific model id (e.g. gpt-realtime, gemini-2.5-flash-native-audio, grok-voice-beta).
voice
string
Voice id override — interpreted per provider.
system_prompt
string
temperature
float
input_sample_rate
16000 | 24000
output_sample_rate
16000 | 24000
tools
list[RealtimeToolSpec]
Tool definitions the assistant may call. Receive tool_call frames and respond with send_tool_result.
metadata
dict[str, object]
Free-form metadata attached to the session record.
ttl_seconds
int
Max session duration in seconds. Server-capped at 1800 (30 min).

AsyncRealtimeSession

The returned session is both an async context manager and an async iterator.

Properties

PropertyTypeDescription
session_idstrServer-assigned session identifier.
expires_atstrISO-8601 expiry for the WS token.

Methods

send_audio(pcm: bytes)
coroutine
Ship a PCM16 chunk up to the model (binary WS frame).
commit()
coroutine
Signal end-of-user-turn; the server flushes buffered audio upstream.
interrupt()
coroutine
Cancel the assistant’s current response mid-generation.
send_tool_result(call_id: str, output: str)
coroutine
Return the result of a previously-issued tool call.
close(code=1000, reason='client_closed')
coroutine
Close the socket. Safe to call multiple times; the context manager calls it for you on exit.

Frame types

Iterating the session yields dicts tagged by type:
Frame typePayload fields
audiopcm: bytes, sample_rate: 24000
transcriptrole: 'user' | 'assistant', text: str, final: bool
tool_callcall_id: str, name: str, arguments: str (JSON)
usageinput_audio_tokens: int, output_audio_tokens: int
errorcode: str, message: str
closereason: str

Example — tool calling

import json
from spekoai import AsyncSpeko, RealtimeConnectParams, RealtimeToolSpec

async with AsyncSpeko(api_key=os.environ["SPEKO_API_KEY"]) as speko:
    session = await speko.connect_realtime(
        RealtimeConnectParams(
            provider="openai",
            model="gpt-realtime",
            tools=[
                RealtimeToolSpec(
                    name="get_weather",
                    description="Current weather for a city.",
                    parameters={
                        "type": "object",
                        "properties": {"city": {"type": "string"}},
                        "required": ["city"],
                    },
                ),
            ],
        ),
    )

    async with session:
        async for frame in session:
            if frame["type"] == "tool_call" and frame["name"] == "get_weather":
                args = json.loads(frame["arguments"])
                result = fetch_weather(args["city"])
                await session.send_tool_result(frame["call_id"], result)