Scheduler-driven IOController poll on POSIX#194
Conversation
Use a dedicated IOController pool with LOW self-resubmitting poll tasks and HIGH control handlers plus default-pool bump reactions for wake ordering. Co-authored-by: Cursor <cursoragent@cursor.com>
| /// Dedicated single-consumer pool for IOController scheduler-driven polling. | ||
| namespace io_pool { | ||
| struct IO { | ||
| static constexpr const char* name = "IOController"; | ||
| static constexpr int concurrency = 1; | ||
| static constexpr bool counts_for_idle = false; | ||
| }; | ||
| } // namespace io_pool |
There was a problem hiding this comment.
Just put these on the IOController class, rather than making a whole extra namespace and type for it. It is the "IOController" pool
There was a problem hiding this comment.
🤖 Moved the pool descriptor onto IOController as a private nested IOPool struct (matching TraceController::TracePool). Reactions now use Pool<IOPool> directly.
| - **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays | ||
| - **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles | ||
| - **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays, driven by a dedicated `IOController` pool (`concurrency = 1`, MPSC). A LOW-priority poll task blocks in `::poll()` and resubmits itself after each iteration. Control handlers (`IOConfiguration`, `IOFinished`, `Unbind`, `Shutdown`) run at HIGH priority on the same pool; separate default-pool bump reactions wake the poll thread via a notify pipe once the HIGH task is queued. | ||
| - **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles, still using `on<Always>` (scheduler-driven parity deferred). |
There was a problem hiding this comment.
Swap the windows one over too, the closer the two implementations are to eachother, the less platform specific code we need.
There was a problem hiding this comment.
🤖 Windows now uses the same scheduler-driven model: dedicated IOController pool, HIGH control handlers, inline bump reactions, and a NORMAL-priority self-resubmitting poll task on Startup.
|
|
||
| ### Blocking pools and priority ordering | ||
|
|
||
| Some extensions use a single-consumer pool with a blocking wait in the worker thread (for example, IOController on POSIX blocks in `::poll()`). Priority does not preempt inside the blocking call; instead, cross-thread control work is enqueued at HIGH priority on the same pool, a notify pipe wakes the poll thread, the LOW poll task finishes its iteration and resubmits, and the worker dequeues the already-queued HIGH task before the resubmitted poll. Control handlers that mutate poll state from another thread pair with separate default-pool bump reactions registered after the HIGH handlers so the bump runs from a thread that is not blocked in `::poll()`. |
There was a problem hiding this comment.
Follow the docs guidelines, these should be sembr
There was a problem hiding this comment.
🤖 Rewrote the blocking-pools section with SemBr line breaks (one sentence per line) and updated the poll priority wording to NORMAL.
| } | ||
| } | ||
|
|
||
| void IOController::poll_iteration() { |
There was a problem hiding this comment.
This can be inlined I think. I prefer reactions/reactors to have all their logic inlined where it makes sense rather than having special functions.
There was a problem hiding this comment.
🤖 Removed poll_iteration() / reschedule_poll() and inlined the poll loop body directly in the Startup reaction.
| void IOController::reschedule_poll() { | ||
| if (!running.load(std::memory_order_acquire)) { | ||
| return; | ||
| } | ||
| if (const auto* current = threading::ReactionTask::get_current_task()) { | ||
| powerplant.submit(current->parent->get_task()); | ||
| } | ||
| } |
There was a problem hiding this comment.
I don't think we need all this machinery, we can just submit a new task right? If we have a message and keep emitting it it'll be fine. If we aren't running anymore, then NUClear won't reschedule us so we don't need to deal with it
There was a problem hiding this comment.
🤖 Poll now resubmits with a direct powerplant.submit(get_current_task()->parent->get_task()) at the end of the reaction. Shutdown is handled by the running guard at the top of the poll body.
|
|
||
| if (polled) { | ||
| // Get the lock so we don't concurrently modify the list | ||
| const std::lock_guard<std::mutex> lock(tasks_mutex); |
There was a problem hiding this comment.
We don't need this anymore if we're doing the "Single pool" approach. That's one of the advantages of the method. That we don't need locks/mutexes anymore. Writing to the bump file descriptor can move poll out, and besides that NUClear's scheduler handles the rest.
There was a problem hiding this comment.
🤖 Dropped the separate io_pool namespace and colocated bump reactions, but kept the minimal notify wake/handoff (NotifierWakeGuard + NotifierPollScope) — removing it caused IO test deadlocks when IOFinished mutates watches[] while poll may still be inside ::poll().
There was a problem hiding this comment.
🤖 Update — root cause fixed in f62b7dc
The wake/lock machinery was papering over a pool-model violation: IOFinished / IOConfiguration / Unbind handlers used Pool<IOPool> but not Inline::NEVER. Because IO.hpp emits those via emit<Inline> with get_task(true), the scheduler ran the HIGH handlers inline on the default-pool thread while the IO poll task held watches[] inside ::poll() on the IO pool thread — concurrent mutation of the poll set.
Fix: add Inline::NEVER to every Pool<IOPool> handler (including poll). All tasks[] / watches[] mutations now happen sequentially on the single IO pool worker. Inline bump reactions remain on the default pool but only write the notify pipe (thread-safe wake). Removed NotifierWakeGuard, NotifierPollScope, wake_requested, notifier.mutex, tasks_mutex, and the extra DSL #includes.
Evidence: tests/dsl/{IO,TCP,UDP} ×30 runs (Debug), same ×10 under TSAN — all pass.
| bool polled = false; | ||
| /* mutex scope */ { | ||
| const NotifierPollScope poll(notifier); | ||
| if (!poll.wake_pending()) { |
There was a problem hiding this comment.
We don't need this extra stuff either. wake_pending was there to handle the old method before it was handled by the scheduler
There was a problem hiding this comment.
🤖 Same as above — wake_pending is still needed for the bump handoff so a wake-only iteration skips ::poll() and yields to queued HIGH work before resubmitting. Without it the IO extension tests hang.
There was a problem hiding this comment.
🤖 Update — unnecessary DSL includes removed in f62b7dc
Dropped the explicit #includes of Inline.hpp, Pool.hpp, Priority.hpp, and Startup.hpp from the IOController .ipp files. Inside a Reactor subclass those words are already exported via Reactor.hpp usings, so the includes were redundant.
Same commit also removes the notifier locking machinery; see the sibling thread for the root-cause write-up.
| if (dirty.load(std::memory_order_acquire)) { | ||
| rebuild_list(); | ||
| } | ||
| on<Trigger<dsl::word::IOConfiguration>>().then("Configure IO bump", [this] { bump(); }); |
There was a problem hiding this comment.
Place these reactions after the types that they apply to, so you have a "main reaction" followed by a "bump" reaction for it. Also I feel like these should be Inline::ALWAYS rather than needing to spin up an entire reaction on a new thread, whichever thread is emitting these can do the bump itself. Given that reactions will emit in order from a single thread, if they are declared as "action" then "bump" it'll always have emitted the first action first before doing the bump one.
There was a problem hiding this comment.
🤖 Each control handler is now immediately followed by its bump reaction, and bumps use Inline::ALWAYS so the emitting thread writes the notify pipe/event without queueing another task.
| } | ||
| }); | ||
| // Scheduler-driven poll: dedicated single-consumer IO pool, LOW priority, self-resubmitting. | ||
| on<Startup, IOPool, Priority::LOW>().then("IO Poll", [this] { poll_iteration(); }); |
There was a problem hiding this comment.
Just put the body of poll_iteration here. It's not like we call it elsewhere and need to dedupe the code.
There was a problem hiding this comment.
🤖 Done — the poll loop body lives directly in the on<Startup, Pool<IOPool>, Priority::NORMAL>() reaction now.
Move pool constexprs onto IOController, raise poll to NORMAL priority, colocate inline bump reactions with their handlers, inline the poll loop, and bring Windows onto the same scheduler-driven model. Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
emit<Inline> was running Pool<IOPool> HIGH handlers on the default pool while ::poll() held watches[], which required notifier mutex machinery. Inline::NEVER routes all control mutations through the IO pool thread; bump reactions only wake the notify pipe. Removes NotifierWakeGuard, wake_requested, tasks_mutex, and unnecessary DSL includes. Co-authored-by: Cursor <cursoragent@cursor.com>
IO bind/unbind/finished events now use default Local emit so Pool<IOPool> handlers are queued to the IO pool instead of running on the emitter thread. Inline::NEVER on control handlers and Inline::ALWAYS bump reactions are kept. Co-authored-by: Cursor <cursoragent@cursor.com>
|
🤖 IO emit scope fix ( Root cause: Change: switched all three IO control emits to default Local scope ( Stress tests: IO ×25, TCP ×25, UDP ×25 — all passed. |
|
🤖 IO scheduler performance analysis (main @ Environment: macOS, Apple M2 Max, Release build, MethodologyHand-rolled standalone benchmark (
Both branches require explicit Results
InterpretationVerdict: comparative — no meaningful regression for typical workloads.
Recommended follow-ups
Benchmark source available locally; not committed in this comment. |
Only wakes the poll thread via the notifier; it does not mutate controller state.
Move bump reactions, shutdown control, and poll resubmit logic into IOController_Common.ipp so POSIX and Windows paths share one implementation and satisfy SonarCloud duplicated-lines quality gate. Co-authored-by: Cursor <cursoragent@cursor.com>
Undo SonarCloud duplication refactor: delete IOController_Common.ipp and restore inline bump reactions, shutdown control, and poll loop registration in the platform .ipp files. Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Pull request overview
This PR refactors IOController on POSIX (and aligns Windows) to use a scheduler-driven polling model on a dedicated single-consumer pool, replacing the previous Always-loop approach and documenting the new behavior.
Changes:
- Introduces a dedicated
IOControllerpool (concurrency = 1) and moves IO polling to a self-resubmittingStartuptask. - Routes IO control messages (
IOConfiguration,IOFinished,Unbind,Shutdown) onto the IO pool at HIGH priority, with separate inline “bump” reactions to wake the blocking poll. - Updates IO DSL emits to default (non-inline) and updates documentation to describe the new blocking-pool + priority ordering model.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/extension/IOController.hpp | Adds the dedicated IOPool, removes now-unused notifier/task mutexes, and makes bump() const. |
| src/extension/IOController_Windows.ipp | Moves control handlers to IO pool + scheduler-driven polling loop with self-resubmit; adds inline bump reactions. |
| src/extension/IOController_Posix.ipp | Same scheduler-driven polling + self-resubmit model for POSIX; simplifies notifier logic and adds inline bump reactions. |
| src/dsl/word/IO.hpp | Switches IO control emits to default (non-inline) emission. |
| docs/reference/extensions/built-in-extensions.md | Documents IOController’s dedicated pool, poll task behavior, and bump model for POSIX/Windows. |
| docs/explanation/scheduler.md | Adds explanation of blocking pools and how priority ordering is achieved via wake + resubmit. |
| void IOController::bump() const { | ||
| uint8_t val = 1; | ||
| if (::write(notifier.send, &val, sizeof(val)) < 0) { | ||
| throw std::system_error(network_errno, | ||
| std::system_category(), | ||
| "There was an error while writing to the notification pipe"); | ||
| } | ||
| } |
There was a problem hiding this comment.
🤖 Fixed in 941f093 — bump() now skips the pipe write when poll(notifier.recv, 0) shows a pending wake byte, and retries ::write() on EINTR. One unread byte is sufficient to wake ::poll(); coalescing avoids filling the pipe buffer and blocking the emitting thread under bursty control traffic.
Skip the notify-pipe write when a wake byte is already pending, retry on EINTR, and generalise the Windows WSASetEvent error message. Co-authored-by: Cursor <cursoragent@cursor.com>
Summary
Alwaysreaction loop with scheduler-driven poll tasks on a dedicatedIOController::IOPool(MPSC, concurrency=1).Inline::NEVERon the IO pool.Inline::ALWAYSbump reactions wake::poll()via the notifier pipe.IOController_Common.ipp).main.Performance (vs main Always-based IO)
Comparative overall: ~+10% per-event latency, ~−7% UDP throughput, ~−11% bind/unbind, similar idle CPU. See PR comments for full benchmark table.
Test plan
-DBUILD_TESTS=ONIO,TCP,UDPstress tests (×25+) on macOS