fix(streaming): normalize non-SSE upstream chat completions#412
Conversation
Some OpenAI-compatible upstreams ignore stream:true and reply with a single buffered application/json completion (no data: framing, no [DONE]). The gateway forwarded that body verbatim under a text/event-stream content type, so SSE clients waited forever for an end-of-stream marker that never arrived — the connection appeared to hang after the model had clearly finished (issue #411). Add EnsureChatCompletionSSE, applied in the shared CompatibleProvider.StreamChatCompletion so every OpenAI-compatible provider benefits. Genuine SSE streams pass through untouched with no buffering; a buffered JSON completion is re-emitted as one SSE chunk (object -> chat.completion.chunk, message -> delta) followed by a terminal data: [DONE]. This is a long-standing latent defect, not a regression between the versions named in the issue; the fix hardens the gateway against any upstream that silently drops streaming. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdds ChangesSSE Stream Normalization
Sequence Diagram(s)sequenceDiagram
participant Client
participant Provider as Provider.StreamChatCompletion
participant DoStream as p.client.DoStream
participant Normalizer as EnsureChatCompletionSSE
Client->>Provider: StreamChatCompletion(request)
Provider->>DoStream: POST /chat/completions
DoStream-->>Provider: (stream, err)
alt err != nil
Provider-->>Client: return nil, err
else stream ok
Provider->>Normalizer: wrap(stream)
Normalizer->>Normalizer: peek first 512 bytes
alt buffered JSON response
Normalizer->>Normalizer: rewrite object to chat.completion.chunk
Normalizer->>Normalizer: move message to delta fields
Normalizer->>Normalizer: emit data: [DONE] terminator
else genuine SSE stream
Normalizer->>Normalizer: pass through via bufferedReadCloser
end
Normalizer-->>Provider: normalized SSE stream
Provider-->>Client: normalized SSE stream
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Confidence Score: 5/5This looks safe to merge.
Reviews (3): Last reviewed commit: "fix(streaming): avoid tainted size arith..." | Re-trigger Greptile |
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/providers/chat_stream_normalize.go`:
- Around line 40-44: The error handling path after io.ReadAll in the function
discards any partial data that was successfully read before the error occurred,
silently losing generated content. Instead of returning only chatDonePayload
when err is not nil, modify the error branch to combine the partial body content
that was read with the done payload, ensuring that any successfully generated
content is preserved and emitted before the DONE marker. This requires returning
both the partial data and the completion signal rather than discarding the
partial data entirely.
- Around line 33-35: The reader.Peek(peekForNonSSE) call blocks waiting for the
full 512 bytes (peekForNonSSE) to be available or EOF, which delays SSE streams
and makes streaming appear frozen. Instead of peeking the full peekForNonSSE
buffer size, use a smaller peek size that is sufficient to detect whether the
response starts with a JSON object (indicated by the opening brace character)
without blocking unnecessarily. Consider using a minimal peek size like 1 byte
or a small reasonable amount to check the first non-whitespace character,
allowing valid SSE streams to pass through immediately without waiting for the
full buffer to fill.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 854ae2ef-3f38-4502-b64e-9b0c100034b3
📒 Files selected for processing (3)
internal/providers/chat_stream_normalize.gointernal/providers/chat_stream_normalize_test.gointernal/providers/openai/compatible_provider.go
Address PR review feedback on the chat-completions SSE normalizer: - Classify the stream from its first non-whitespace byte via incremental Peek instead of Peek(512), so a genuine SSE upstream that emits a small first token then pauses is no longer held back waiting for a full buffer. - Preserve partially-read bodies: when the upstream looked like buffered JSON but the read failed mid-body, forward what arrived before [DONE] instead of replacing it with a bare done marker. - Apply EnsureChatCompletionSSE to the deepseek and xai providers, which stream via raw DoStream and previously bypassed the normalizer. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
CodeQL flagged the make() capacity computation (len(payload)+constants) as a possible allocation-size overflow. Build the normalized SSE chunk with a bytes.Buffer instead, which removes the explicit size arithmetic; behavior is unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/providers/chat_stream_normalize.go`:
- Around line 41-44: The condition at line 41 in the error handling block checks
both `err != nil && len(body) == 0`, but this is unreachable dead code. After
the Peek operation on line 34 successfully identifies the JSON opening brace
character, that byte is buffered and will be returned by the subsequent
io.ReadAll call, ensuring the body is never empty. Remove the `len(body) == 0`
check from the condition and keep only `if err != nil` to eliminate the dead
code branch and clarify the actual error handling path.
In `@internal/providers/xai/xai.go`:
- Around line 186-191: The StreamChatCompletion method in the Provider type
dereferences the req parameter without validating it is not nil, which will
cause a panic if a nil request is passed. Add a nil-check guard at the beginning
of the StreamChatCompletion method that returns an appropriate error if req is
nil, following the same pattern already implemented in the DeepSeek provider
referenced in this PR. This should occur before the req.WithStreaming() call on
line 190.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 1fc2ef05-f172-41c1-af83-7e6f3c50deea
📒 Files selected for processing (4)
internal/providers/chat_stream_normalize.gointernal/providers/chat_stream_normalize_test.gointernal/providers/deepseek/deepseek.gointernal/providers/xai/xai.go
Address follow-up PR review:
- xai StreamChatCompletion now guards a nil request before dereferencing it,
matching the deepseek provider's contract in this PR.
- Remove the unreachable len(body)==0 branch in EnsureChatCompletionSSE: the
'{' that classifies the body is already buffered, so io.ReadAll always
returns at least that byte. bufferedCompletionToSSE already forwards partial
bytes and appends [DONE], so behavior is unchanged.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary
Fixes #411 — streaming chat completions where the connection appears to hang after the model has clearly finished, leaving the client "waiting for the model to continue outputting."
Root cause: Some OpenAI-compatible upstreams ignore
stream:trueand reply with a single bufferedapplication/jsoncompletion (nodata:framing, no[DONE]). The gateway forwarded that body verbatim under atext/event-streamcontent type, so SSE clients waited forever for an end-of-stream marker that never arrived.Reproduced with
bailian/deepseek-v3.2(a dedicated Aliyun MaaS deepseek deployment that silently drops streaming):Other models (
qwen-flash,glm-5.1,deepseek-v4-*, OpenAI, Anthropic, Groq) stream normally — so this is a general robustness gap for any upstream that drops streaming, not a single-provider issue.Not a version regression
The faulty path predates the versions named in the issue:
CompatibleProvider.StreamChatCompletion(rawDoStreampassthrough) is byte-identical atv0.1.40and today, and the unconditionaltext/event-streamheader has existed since the first commit. Thev0.1.40..v0.1.42diff touches only embeddings, health/readiness, dashboard, and storage — nothing in the chat-streaming path. The hang is triggered by the upstream model/endpoint behavior, not a gateway code change. This PR hardens the long-standing edge case.Fix
New shared wrapper
EnsureChatCompletionSSE, applied inCompatibleProvider.StreamChatCompletionso every OpenAI-compatible provider (bailian, groq, oracle, vllm, xiaomi, deepseek, openai) benefits. Following Postel's Law:data:,:,event:, …) passes through untouched, no buffering.{) is re-emitted as one SSE chunk (object→chat.completion.chunk, each choice'smessage→delta) followed by a terminaldata: [DONE].Testing
EnsureChatCompletionSSE(buffered-JSON conversion, genuine-SSE passthrough, leading-comment passthrough, nil).deepseek-v3.2now emits a proper chunk and[DONE]; previously-working providers still stream multi-chunk unchanged.make test-raceandmake lintpass (pre-commit).🤖 Generated with Claude Code
Summary by CodeRabbit
data:frames and a proper[DONE]terminator.