Skip to content

fix: drain bidi stream response side in publisher to prevent deadlock#75

Open
ColinkaMir wants to merge 1 commit into
getoptimum:mainfrom
ColinkaMir:fix/drain-publisher-bidi-stream
Open

fix: drain bidi stream response side in publisher to prevent deadlock#75
ColinkaMir wants to merge 1 commit into
getoptimum:mainfrom
ColinkaMir:fix/drain-publisher-bidi-stream

Conversation

@ColinkaMir
Copy link
Copy Markdown

@ColinkaMir ColinkaMir commented May 28, 2026

Closes #74

Summary

The publisher CLIs (cmd/multi-publish and cmd/single in publish mode) open the ListenCommands bidi stream and only call stream.Send() in a loop. They never stream.Recv(). The server emits trace-event Response messages on every publish; without a drain on the client side, the per-stream HTTP/2 flow-control window fills, the server handler blocks, and stream.Send on the client eventually blocks too. Result: publisher hangs after ~7-8 publications at payloads >=100 KB. Full root-cause analysis in the linked issue.

This PR adds a drain goroutine in each of the two affected call sites. Trace events are discarded (they were not used by the publisher anyway).

Changes

grpc_p2p_client/cmd/multi-publish/main.go | 13 +++++++++++++
grpc_p2p_client/cmd/single/main.go        | 12 ++++++++++++
2 files changed, 25 insertions(+)

Both call sites add the same pattern immediately after client.ListenCommands(ctx) succeeds:

// Drain the response side of the bidi stream so the per-stream HTTP/2
// flow-control window does not fill up and block stream.Send.
go func() {
    for {
        if _, err := stream.Recv(); err != nil {
            return
        }
    }
}()

The goroutine exits when the connection closes (the defer conn.Close() already in sendMessages at the end of the function triggers the exit). No leak.

Why this approach

  • Minimal change: 5 lines per call site, no proto modification, no API change, no behaviour change at the user level (the publisher was already discarding trace events implicitly by not reading them).
  • Matches the pattern already used by cmd/multi-subscribe and cmd/single in subscribe mode, which do stream.Recv in their main loop.
  • An alternative would be to switch Publish to a unary RPC, but that requires a proto change and breaks compatibility with the current Request/Response typing used by both subscribe and publish on the same ListenCommands stream.

Verification

Run on the local Docker stack from this repo, image getoptimum/p2pnode:v0.0.1-rc16. Before applying the patch the OLD multi-publish hangs:

./grpc_p2p_client/p2p-multi-publish -topic=t -ipfile=ips.txt \
  -start-index=0 -end-index=1 -count=40 -datasize=102400 -sleep=200ms
# 7 publications, then hangs indefinitely.

After this patch, with rebuilt binaries:

test result
-count=40 -datasize=102400 -sleep=200ms (100 KB × 40) completes in ~8.06 s
-count=20 -datasize=1048576 -sleep=400ms (1 MB × 20) completes in ~8.10 s
-count=10 -datasize=2516582 -sleep=600ms (2.4 MB × 10) completes in ~6.12 s
-count=50 -datasize=1024 -sleep=100ms (1 KB × 50, the size that worked before) unchanged, completes normally

Sample output from the 100 KB run (post-fix):

[127.0.0.1:33221] published 102416 bytes to "fix-test-100kb" (took 282us)
[127.0.0.1:33221] published 102416 bytes to "fix-test-100kb" (took 367us)
[... 38 more ...]

real    0m8.058s

Notes for reviewers

  • The drain goroutine intentionally has no logging on stream.Recv errors. The errors here are the natural shutdown signal (stream closed because connection closed) and logging them would be noise.
  • cmd/multi-subscribe and cmd/single in subscribe mode are not touched — they already drain in their main loop.
  • I considered using a small buffered channel to forward the responses for later inspection (e.g. to add a -trace flag to multi-publish symmetrical to multi-subscribe). That seemed out of scope for a bug fix. Happy to do it as a follow-up if useful.

Test environment

  • Repo: this branch off main at bd8c0b8.
  • Stack: docker-compose-optimum.yml 4 p2pnodes + 2 proxies, defaults.
  • OS: Ubuntu 22.04 LTS x86_64.
  • Go: 1.26.3 (snap).
  • Docker: 29.3.0.

Summary by CodeRabbit

  • Bug Fixes
    • Fixed potential deadlocks occurring when publishing multiple messages with large payloads. Enhanced stream receive handling prevents flow-control window saturation that could block subsequent publish requests, improving reliability across publish operations.

Review Change Stack

The publisher CLIs (cmd/multi-publish, cmd/single in publish mode) call
client.ListenCommands(ctx) and only invoke stream.Send() in a loop. They
never call stream.Recv(). Because ListenCommands is a bidirectional gRPC
stream and the server emits trace-event Response messages for every
published Request, the per-stream HTTP/2 flow-control window fills up,
the server's response handler blocks, and stream.Send() on the client
eventually blocks too. Result: a publisher that completes 7-8 messages
at payloads >=100KB and then hangs indefinitely.

Fix: spawn a goroutine that drains stream.Recv() until the stream
closes. This unblocks the response side without changing the proto or
the response handling on the client (the trace events are discarded,
which matches existing behaviour at the user-visible level).

Verified on getoptimum/p2pnode:v0.0.1-rc16 with the docker-compose-optimum
stack:

  before fix: -count=40 -datasize=102400 hung after 7 publications.
  after fix:  -count=40 -datasize=102400 completes in ~8s.
              -count=20 -datasize=1048576 completes in ~8s.
              -count=10 -datasize=2516582 completes in ~6s.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 28, 2026

Actionable comments posted: 0

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 28, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 1fd4268d-4d3d-464a-bd85-655b28895982

📥 Commits

Reviewing files that changed from the base of the PR and between bd8c0b8 and ab826f6.

📒 Files selected for processing (2)
  • grpc_p2p_client/cmd/multi-publish/main.go
  • grpc_p2p_client/cmd/single/main.go

📝 Walkthrough

Walkthrough

This PR fixes a bidirectional gRPC stream deadlock affecting two publisher CLI commands. The server emits trace-event Response messages for each publish operation, but the publishers never called stream.Recv() to read them. With default HTTP/2 settings, the per-stream flow-control window fills, blocking the server's outbound buffer and subsequently freezing the client's stream.Send() calls. Both multi-publish and single (publish mode) now spawn background goroutines that continuously drain the stream's receive side until error, keeping flow control unblocked. No public API or protocol changes.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

🚥 Pre-merge checks | ✅ 7 | ❌ 3

❌ Failed checks (3 warnings)

Check name Status Explanation Resolution
Title check ⚠️ Warning Title lacks required domain/package specification in the format specified. Reformat title as 'fix(grpc_p2p_client/cmd): drain bidi stream response side in publisher to prevent deadlock' to include the affected domain/package.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Go Build And Test Rationale ⚠️ Warning Concurrency changes (drain goroutines on bidi streams) lack unit tests; project has zero test files, making critical deadlock fix untested. Add unit tests with mocked gRPC streams to verify drain goroutine exits on error/cancellation, or document why these functions are excluded from testing.
✅ Passed checks (7 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Linked Issues check ✅ Passed The code changes fully address the root cause and requirements of issue #74: drain goroutines prevent HTTP/2 flow-control deadlock in both affected files.
Out of Scope Changes check ✅ Passed All changes directly address the deadlock fix specified in #74; no unrelated modifications detected in the two targeted files.
Concurrency Safety ✅ Passed Drain goroutines have clear exit paths via stream.Recv() errors after defer conn.Close(); concurrent Send/Recv on gRPC is safe; context properly plumbed; no leaks, unbounded channels, or data races.
Public Api Changes ✅ Passed No changes to exported/public types or functions. PR only modifies private functions in CLI tools (sendMessages in multi-publish, publish in single) to add internal goroutine for stream draining.
Security Considerations ✅ Passed No security risks detected. Changes add stream-draining goroutines to fix deadlock; no SQL injection, command injection, path traversal, credential exposure, or weak crypto introduced.
Rust Best Practices ✅ Passed Custom check for Rust best practices is inapplicable; PR modifies only Go files in a Go-only repository with zero Rust code.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

ColinkaMir pushed a commit to ColinkaMir/optimum-rlnc-bench that referenced this pull request May 28, 2026
Independent benchmark of Optimum mump2p RLNC propagation against plain
GossipSub, run on a local 8-node Docker mesh with MeshDegreeMax=4 so that
messages transit through intermediate nodes (i.e. so RLNC's recoding has
a place to do work).

32 datapoints total: optimum + gossipsub modes × {1KB, 100KB, 1MB, 2.4MB}
× {0, 5, 10, 20}% packet loss.

Headline finding (full details in results-n8/combined/report-n8.md):
  optimum delivers 100% in 11 of 16 (size, loss) conditions; gossipsub
  delivers 100% in 0 of 16 at the same topology. Clearest single point
  is 1 MB / 10% loss: 100% vs 14%.

Methodology:
- REST publish via the Optimum proxy (sidesteps the multi-publish bidi
  deadlock, patch in upstream PR getoptimum/optimum-dev-setup-guide#75)
- Subscribers via direct gRPC sidecar on all 8 nodes
- Packet loss scoped to peer-to-peer mesh traffic only (tc netem prio
  qdisc with u32 filter matching dst 172.28.0.12/30 and 172.28.0.16/29),
  proxy control plane untouched
- Latency per (msg_id, subscriber) from mump2p trace TSV
- Bandwidth from Prometheus process_network_transmit_bytes_total deltas
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

multi-publish (and single in publish mode) deadlocks on payloads >=100KB after ~7-8 messages

1 participant