Streaming Responses
Streaming responses (stream=True) work transparently with Paygent. Your code looks the same — you iterate the generator, Paygent captures token counts when the stream ends, and the metering happens after the final chunk.
How it works
When the patched client.chat.completions.create(stream=True) runs, Paygent doesn't return the original generator. It returns a StreamWrapper that:
- Yields every chunk from the original stream unchanged (zero latency overhead)
- Accumulates chunks in a list as they pass through
- When
StopIterationis raised (the stream ends), inspects the accumulated chunks for usage data - Builds a
UsageEventand pushes it to the queue
From your code's perspective the wrapper is indistinguishable from the original — same __iter__, same __next__, same context manager support.
Capturing tokens: include_usage
OpenAI's streaming API only includes usage data in the final chunk if you ask for it. You ask by passing stream_options={"include_usage": True}.
from openai import OpenAI
from paygent import Paygent, paygent_context
pg = Paygent.init(api_key="pg_live_...")
client = OpenAI()
with paygent_context(user_id="user_123"):
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Tell me a story"}],
stream=True,
stream_options={"include_usage": True}, # ← this
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
// Coming soon
After the loop ends, Paygent extracts chunk.usage from the final chunk, calculates cost, and meters the call.
Without include_usage
If you don't pass stream_options={"include_usage": True}, the OpenAI API doesn't include usage info in any chunk. Paygent has no way to know how many tokens were consumed.
What happens:
- The stream still works correctly — you get every chunk, Paygent doesn't interfere
- Paygent fires
update_cachewithtotal_tokens=0andcost_total=0 - No usage event reflects the real cost
- Your guards never see the spend → user can effectively bypass spend caps via streaming
Always set include_usage=True for streamed calls
Without it, streamed calls don't contribute to spend tracking. The fix is one line:
stream_options={"include_usage": True}
For Anthropic (anthropic SDK), the message-stream API includes usage in message_delta events automatically — no equivalent flag needed.
Async streaming
Identical pattern with AsyncOpenAI:
from openai import AsyncOpenAI
import asyncio
async_client = AsyncOpenAI()
async def stream_response(user_id: str, prompt: str):
with paygent_context(user_id=user_id):
stream = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True,
stream_options={"include_usage": True},
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
asyncio.run(stream_response("user_123", "Tell me a poem"))
// Coming soon
The AsyncStreamWrapper mirrors the sync version: it implements __aiter__ / __anext__ and fires the metering callback when the async generator exhausts.
Streaming with FastAPI
A common production pattern — streaming server-sent events to the browser:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from openai import OpenAI
from paygent import Paygent, PaygentLimitExceeded, paygent_context
pg = Paygent.init(api_key=os.environ["PAYGENT_API_KEY"])
client = OpenAI()
app = FastAPI()
@app.get("/chat/stream")
def stream(user_id: str, prompt: str):
def event_stream():
try:
with paygent_context(user_id=user_id):
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True,
stream_options={"include_usage": True},
)
for chunk in response:
if chunk.choices and chunk.choices[0].delta.content:
yield f"data: {chunk.choices[0].delta.content}\n\n"
yield "data: [DONE]\n\n"
except PaygentLimitExceeded as e:
yield f"event: limit\ndata: {e.guard_result.message}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
// Coming soon
The hard gate raises before the stream starts (during client.chat.completions.create(...), before any chunks come back), so you can convert it to an SSE error event and close the stream cleanly.
Streaming with pg.wrap()
The wrap() and awrap() paths also handle streams — same StreamWrapper returns, same behavior:
stream = pg.wrap(
lambda: client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "..."}],
stream=True,
stream_options={"include_usage": True},
),
user_id="user_123",
model="gpt-4o-mini",
)
for chunk in stream:
...
// Coming soon
Edge cases
- Stream is consumed only partially. If you
breakout of the loop early without exhausting the stream, the metering callback does not fire (it's tied toStopIteration). The call is unmetered. To force metering, fully iterate the stream or close it explicitly viastream.close(). - Stream raises mid-flight. If the network drops between chunks, the metering happens with whatever was accumulated so far. You'll get a usage event with the partial token count visible up to the failure.
- Multiple streams concurrently for the same user. Each stream has its own
StreamWrapper. They each independently callupdate_cacheafter their ownStopIteration. The cache update is atomic per-event, so concurrent streams don't race.
Next steps
- Cost Guardrails — how the guard, session window, and metering work
- Callbacks & Events — the same
on_usagecallback fires for streamed calls