Skip to content

Streaming Responses

Streaming responses (stream=True) work transparently with Paygent. Your code looks the same — you iterate the generator, Paygent captures token counts when the stream ends, and the metering happens after the final chunk.

How it works

When the patched client.chat.completions.create(stream=True) runs, Paygent doesn't return the original generator. It returns a StreamWrapper that:

  1. Yields every chunk from the original stream unchanged (zero latency overhead)
  2. Accumulates chunks in a list as they pass through
  3. When StopIteration is raised (the stream ends), inspects the accumulated chunks for usage data
  4. Builds a UsageEvent and pushes it to the queue

From your code's perspective the wrapper is indistinguishable from the original — same __iter__, same __next__, same context manager support.

Capturing tokens: include_usage

OpenAI's streaming API only includes usage data in the final chunk if you ask for it. You ask by passing stream_options={"include_usage": True}.

from openai import OpenAI
from paygent import Paygent, paygent_context

pg = Paygent.init(api_key="pg_live_...")
client = OpenAI()

with paygent_context(user_id="user_123"):
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "Tell me a story"}],
        stream=True,
        stream_options={"include_usage": True},   # ← this
    )

    for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)
    print()
// Coming soon

After the loop ends, Paygent extracts chunk.usage from the final chunk, calculates cost, and meters the call.

Without include_usage

If you don't pass stream_options={"include_usage": True}, the OpenAI API doesn't include usage info in any chunk. Paygent has no way to know how many tokens were consumed.

What happens:

  • The stream still works correctly — you get every chunk, Paygent doesn't interfere
  • Paygent fires update_cache with total_tokens=0 and cost_total=0
  • No usage event reflects the real cost
  • Your guards never see the spend → user can effectively bypass spend caps via streaming

Always set include_usage=True for streamed calls

Without it, streamed calls don't contribute to spend tracking. The fix is one line:

stream_options={"include_usage": True}

For Anthropic (anthropic SDK), the message-stream API includes usage in message_delta events automatically — no equivalent flag needed.

Async streaming

Identical pattern with AsyncOpenAI:

from openai import AsyncOpenAI
import asyncio

async_client = AsyncOpenAI()

async def stream_response(user_id: str, prompt: str):
    with paygent_context(user_id=user_id):
        stream = await async_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            stream=True,
            stream_options={"include_usage": True},
        )

        async for chunk in stream:
            if chunk.choices and chunk.choices[0].delta.content:
                print(chunk.choices[0].delta.content, end="", flush=True)
        print()

asyncio.run(stream_response("user_123", "Tell me a poem"))
// Coming soon

The AsyncStreamWrapper mirrors the sync version: it implements __aiter__ / __anext__ and fires the metering callback when the async generator exhausts.

Streaming with FastAPI

A common production pattern — streaming server-sent events to the browser:

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from openai import OpenAI

from paygent import Paygent, PaygentLimitExceeded, paygent_context

pg = Paygent.init(api_key=os.environ["PAYGENT_API_KEY"])
client = OpenAI()
app = FastAPI()

@app.get("/chat/stream")
def stream(user_id: str, prompt: str):
    def event_stream():
        try:
            with paygent_context(user_id=user_id):
                response = client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[{"role": "user", "content": prompt}],
                    stream=True,
                    stream_options={"include_usage": True},
                )
                for chunk in response:
                    if chunk.choices and chunk.choices[0].delta.content:
                        yield f"data: {chunk.choices[0].delta.content}\n\n"
                yield "data: [DONE]\n\n"
        except PaygentLimitExceeded as e:
            yield f"event: limit\ndata: {e.guard_result.message}\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")
// Coming soon

The hard gate raises before the stream starts (during client.chat.completions.create(...), before any chunks come back), so you can convert it to an SSE error event and close the stream cleanly.

Streaming with pg.wrap()

The wrap() and awrap() paths also handle streams — same StreamWrapper returns, same behavior:

stream = pg.wrap(
    lambda: client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "..."}],
        stream=True,
        stream_options={"include_usage": True},
    ),
    user_id="user_123",
    model="gpt-4o-mini",
)

for chunk in stream:
    ...
// Coming soon

Edge cases

  • Stream is consumed only partially. If you break out of the loop early without exhausting the stream, the metering callback does not fire (it's tied to StopIteration). The call is unmetered. To force metering, fully iterate the stream or close it explicitly via stream.close().
  • Stream raises mid-flight. If the network drops between chunks, the metering happens with whatever was accumulated so far. You'll get a usage event with the partial token count visible up to the failure.
  • Multiple streams concurrently for the same user. Each stream has its own StreamWrapper. They each independently call update_cache after their own StopIteration. The cache update is atomic per-event, so concurrent streams don't race.

Next steps