Speko Docs

connect_realtime

Speech-to-speech WebSocket sessions, async only.

Open a speech-to-speech (S2S) session. The server mints a short-lived WebSocket token, then proxies the client WS directly to the underlying provider (OpenAI Realtime, Gemini Live, xAI Grok Voice). The browser media transport is skipped entirely so time-to-first-audio stays under ~300 ms.

Realtime sessions are only available on AsyncSpeko. A synchronous WebSocket loop would block the event loop on every audio chunk, defeating the purpose of a low-latency S2S pipeline.

import asyncio
from spekoai import AsyncSpeko, RealtimeConnectParams

async def main():
    async with AsyncSpeko(api_key=os.environ["SPEKO_API_KEY"]) as speko:
        session = await speko.connect_realtime(
            RealtimeConnectParams(provider="openai", model="gpt-realtime"),
        )
        async with session:
            await session.send_audio(pcm_chunk)
            async for frame in session:
                if frame["type"] == "audio":
                    play(frame["pcm"])
                elif frame["type"] == "transcript":
                    print(frame["text"])

asyncio.run(main())

Signature

await AsyncSpeko.connect_realtime(
    params: RealtimeConnectParams,
) -> AsyncRealtimeSession

RealtimeConnectParams

provider'openai' | 'google' | 'xai'required
modelstringrequired

Provider-specific model id (e.g. gpt-realtime, gemini-2.5-flash-native-audio, grok-voice-beta).

voicestring

Voice id override — interpreted per provider.

system_promptstring
temperaturefloat
input_sample_rate16000 | 24000
output_sample_rate16000 | 24000
toolslist[RealtimeToolSpec]

Tool definitions the assistant may call. Receive tool_call frames and respond with send_tool_result.

metadatadict[str, object]

Free-form metadata attached to the session record.

ttl_secondsint

Max session duration in seconds. Server-capped at 1800 (30 min).

AsyncRealtimeSession

The returned session is both an async context manager and an async iterator.

Properties

PropertyTypeDescription
session_idstrServer-assigned session identifier.
expires_atstrISO-8601 expiry for the WS token.

Methods

send_audio(pcm: bytes)coroutine

Ship a PCM16 chunk up to the model (binary WS frame).

commit()coroutine

Signal end-of-user-turn; the server flushes buffered audio upstream.

interrupt()coroutine

Cancel the assistant's current response mid-generation.

send_tool_result(call_id: str, output: str)coroutine

Return the result of a previously-issued tool call.

close(code=1000, reason='client_closed')coroutine

Close the socket. Safe to call multiple times; the context manager calls it for you on exit.

Frame types

Iterating the session yields dicts tagged by type:

Frame typePayload fields
audiopcm: bytes, sample_rate: 24000
transcriptrole: 'user' | 'assistant', text: str, final: bool
tool_callcall_id: str, name: str, arguments: str (JSON)
usageinput_audio_tokens: int, output_audio_tokens: int
errorcode: str, message: str
closereason: str

Example — tool calling

import json
from spekoai import AsyncSpeko, RealtimeConnectParams, RealtimeToolSpec

async with AsyncSpeko(api_key=os.environ["SPEKO_API_KEY"]) as speko:
    session = await speko.connect_realtime(
        RealtimeConnectParams(
            provider="openai",
            model="gpt-realtime",
            tools=[
                RealtimeToolSpec(
                    name="get_weather",
                    description="Current weather for a city.",
                    parameters={
                        "type": "object",
                        "properties": {"city": {"type": "string"}},
                        "required": ["city"],
                    },
                ),
            ],
        ),
    )

    async with session:
        async for frame in session:
            if frame["type"] == "tool_call" and frame["name"] == "get_weather":
                args = json.loads(frame["arguments"])
                result = fetch_weather(args["city"])
                await session.send_tool_result(frame["call_id"], result)

On this page