From fd91a44ce745dec2c863b90f4109febba9691051 Mon Sep 17 00:00:00 2001 From: Trent Houliston Date: Wed, 17 Jun 2026 11:59:29 +1000 Subject: [PATCH 1/9] Replace IOController Always loop with scheduler-driven poll on POSIX. Use a dedicated IOController pool with LOW self-resubmitting poll tasks and HIGH control handlers plus default-pool bump reactions for wake ordering. Co-authored-by: Cursor --- docs/explanation/scheduler.md | 4 + .../extensions/built-in-extensions.md | 7 +- src/extension/IOController.hpp | 23 +++ src/extension/IOController_Posix.ipp | 133 +++++++++--------- src/extension/IOController_Windows.ipp | 4 + 5 files changed, 103 insertions(+), 68 deletions(-) diff --git a/docs/explanation/scheduler.md b/docs/explanation/scheduler.md index 3d14a8e8..802601d7 100644 --- a/docs/explanation/scheduler.md +++ b/docs/explanation/scheduler.md @@ -275,6 +275,10 @@ When the main thread pool exits (after shutdown), pools are stopped in order — Persistent pools (`ThreadPoolDescriptor::persistent`) continue accepting tasks during a normal shutdown so networking or logging reactors can finish in-flight work. +### Blocking pools and priority ordering + +Some extensions use a single-consumer pool with a blocking wait in the worker thread (for example, IOController on POSIX blocks in `::poll()`). Priority does not preempt inside the blocking call; instead, cross-thread control work is enqueued at HIGH priority on the same pool, a notify pipe wakes the poll thread, the LOW poll task finishes its iteration and resubmits, and the worker dequeues the already-queued HIGH task before the resubmitted poll. Control handlers that mutate poll state from another thread pair with separate default-pool bump reactions registered after the HIGH handlers so the bump runs from a thread that is not blocked in `::poll()`. + ## Design tradeoffs | Choice | Rationale | diff --git a/docs/reference/extensions/built-in-extensions.md b/docs/reference/extensions/built-in-extensions.md index 9961b26e..6404e4f1 100644 --- a/docs/reference/extensions/built-in-extensions.md +++ b/docs/reference/extensions/built-in-extensions.md @@ -95,12 +95,13 @@ Monitors file descriptors for IO readiness events and triggers corresponding rea **Implementation:** +**Implementation:** + Uses platform-native polling mechanisms: -- **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays -- **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles +- **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays, driven by a dedicated `IOController` pool (`concurrency = 1`, MPSC). A LOW-priority poll task blocks in `::poll()` and resubmits itself after each iteration. Control handlers (`IOConfiguration`, `IOFinished`, `Unbind`, `Shutdown`) run at HIGH priority on the same pool; separate default-pool bump reactions wake the poll thread via a notify pipe once the HIGH task is queued. +- **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles, still using `on` (scheduler-driven parity deferred). -A dedicated thread blocks on the polling call. When events are detected on registered file descriptors, the controller creates tasks for the corresponding reactions, passing the event flags through `ThreadStore` so the `get` method can report which specific events occurred. **Key internals:** diff --git a/src/extension/IOController.hpp b/src/extension/IOController.hpp index 69c0d685..5ff9b8f7 100644 --- a/src/extension/IOController.hpp +++ b/src/extension/IOController.hpp @@ -32,6 +32,15 @@ namespace NUClear { namespace extension { + /// Dedicated single-consumer pool for IOController scheduler-driven polling. + namespace io_pool { + struct IO { + static constexpr const char* name = "IOController"; + static constexpr int concurrency = 1; + static constexpr bool counts_for_idle = false; + }; + } // namespace io_pool + class IOController : public Reactor { public: struct Task; @@ -124,6 +133,20 @@ namespace extension { // NOLINTNEXTLINE(readability-make-member-function-const) this changes states void bump(); + /** + * Runs one poll iteration on the IO pool and reschedules the poll task when running. + * + * The poll loop is a LOW-priority task on a dedicated single-consumer pool. After each + * iteration it resubmits at LOW; cross-thread control work is queued at HIGH on the same + * pool before a separate bump reaction wakes ::poll(), so the worker dequeues HIGH first. + */ + void poll_iteration(); + + /** + * Resubmit the poll reaction to the scheduler (same pool, LOW priority). + */ + void reschedule_poll(); + public: explicit IOController(std::unique_ptr environment); diff --git a/src/extension/IOController_Posix.ipp b/src/extension/IOController_Posix.ipp index b53910d0..574d6524 100644 --- a/src/extension/IOController_Posix.ipp +++ b/src/extension/IOController_Posix.ipp @@ -22,9 +22,16 @@ #include "IOController.hpp" +#include "../dsl/word/Pool.hpp" +#include "../dsl/word/Priority.hpp" +#include "../dsl/word/Startup.hpp" +#include "../threading/ReactionTask.hpp" + namespace NUClear { namespace extension { + using IOPool = dsl::word::Pool; + namespace { /** @@ -90,6 +97,52 @@ namespace extension { } // namespace + void IOController::reschedule_poll() { + if (!running.load(std::memory_order_acquire)) { + return; + } + if (const auto* current = threading::ReactionTask::get_current_task()) { + powerplant.submit(current->parent->get_task()); + } + } + + void IOController::poll_iteration() { + // Rebuild the list if something changed + if (dirty.load(std::memory_order_acquire)) { + rebuild_list(); + } + + // Wait for an event to happen on one of our file descriptors + bool polled = false; + /* mutex scope */ { + const NotifierPollScope poll(notifier); + if (!poll.wake_pending()) { + if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) { + throw std::system_error(network_errno, + std::system_category(), + "There was an IO error while attempting to poll the file descriptors"); + } + polled = true; + } + } + + if (polled) { + // Get the lock so we don't concurrently modify the list + const std::lock_guard lock(tasks_mutex); + for (auto& fd : watches) { + + // Collect the events that happened into the tasks list + // Something happened + if (fd.revents != 0) { + process_event(fd); + } + } + } + + // Yield to the scheduler. Any HIGH control tasks queued before the bump wake are dequeued first. + reschedule_poll(); + } + void IOController::rebuild_list() { // Get the lock so we don't concurrently modify the list const std::lock_guard lock(tasks_mutex); @@ -233,51 +286,40 @@ namespace extension { // Start by rebuilding the list rebuild_list(); - on>().then( + // Control handlers run on the IO pool at HIGH priority. Separate bump reactions (default pool, + // registered after the HIGH handlers) wake ::poll() once the HIGH task is already queued. + // TypeCallbackStore iteration follows registration order, so HIGH is enqueued before bump. + + on, IOPool, Priority::HIGH>().then( "Configure IO Reaction", [this](const dsl::word::IOConfiguration& config) { - // Lock our mutex to avoid concurrent modification const std::lock_guard lock(tasks_mutex); // NOLINTNEXTLINE(google-runtime-int) tasks.emplace_back(config.fd, event_t(config.events), config.reaction); - // Resort our list std::sort(tasks.begin(), tasks.end()); - - // Let the poll command know that stuff happened dirty.store(true, std::memory_order_release); - bump(); }); - on>().then("IO Finished", [this](const dsl::word::IOFinished& event) { - // Get the lock so we don't concurrently modify the list + on, IOPool, Priority::HIGH>().then("IO Finished", [this](const dsl::word::IOFinished& event) { const std::lock_guard lock(tasks_mutex); - // Find the reaction that finished processing auto task = std::find_if(tasks.begin(), tasks.end(), [&event](const Task& t) { return t.reaction->id == event.id; }); if (task != tasks.end()) { - // If the events we were processing included close remove it from the list if ((task->processing_events & IO::CLOSE) != 0) { dirty.store(true, std::memory_order_release); tasks.erase(task); } else { - // We are about to mutate `watches[].events`, which the poll thread reads - // from inside ::poll(). Write to the notify pipe to kick poll out, then - // hold notifier.mutex for the duration of the mutation so the poll thread - // cannot re-enter ::poll() against a half-updated entry. This is the same - // wake-then-lock pattern bump() uses, but we keep the lock held until the - // watches update (and the follow-up fire_event, which can also touch - // watches[].events) is finished. + // Mutate watches[] only while poll is out of ::poll() — wake-then-lock on the IO thread. NotifierWakeGuard wake(notifier); wake.signal(); const auto notifier_lock = wake.lock(); - // Unmask the events that were just processed auto it = std::lower_bound(watches.begin(), watches.end(), task->fd, @@ -286,22 +328,17 @@ namespace extension { it->events = event_t(it->events | task->processing_events); } - // No longer processing events task->processing_events = 0; - - // Try to fire again which will check if there are any waiting events fire_event(*task); } } }); - on>>().then( + on>, IOPool, Priority::HIGH>().then( "Unbind IO Reaction", [this](const dsl::operation::Unbind& unbind) { - // Lock our mutex to avoid concurrent modification const std::lock_guard lock(tasks_mutex); - // Find our reaction auto reaction = std::find_if(tasks.begin(), tasks.end(), [&unbind](const Task& t) { return t.reaction->id == unbind.id; }); @@ -310,54 +347,20 @@ namespace extension { tasks.erase(reaction); } - // Let the poll command know that stuff happened dirty.store(true, std::memory_order_release); - bump(); }); - on().then("Shutdown IO Controller", [this] { + on().then("Shutdown IO Controller", [this] { running.store(false, std::memory_order_release); - bump(); }); - on().then("IO Controller", [this] { - // Stay in this reaction to improve the performance without going back/fourth between reactions - if (running.load(std::memory_order_acquire)) { - // Rebuild the list if something changed - if (dirty.load(std::memory_order_acquire)) { - rebuild_list(); - } + on>().then("Configure IO bump", [this] { bump(); }); + on>().then("IO Finished bump", [this] { bump(); }); + on>>().then("Unbind IO bump", [this] { bump(); }); + on().then("Shutdown IO bump", [this] { bump(); }); - // Wait for an event to happen on one of our file descriptors - bool polled = false; - /* mutex scope */ { - const NotifierPollScope poll(notifier); - if (!poll.wake_pending()) { - if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) { - throw std::system_error(network_errno, - std::system_category(), - "There was an IO error while attempting to poll the file descriptors"); - } - polled = true; - } - } - - if (!polled) { - return; - } - - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - for (auto& fd : watches) { - - // Collect the events that happened into the tasks list - // Something happened - if (fd.revents != 0) { - process_event(fd); - } - } - } - }); + // Scheduler-driven poll: dedicated single-consumer IO pool, LOW priority, self-resubmitting. + on().then("IO Poll", [this] { poll_iteration(); }); } } // namespace extension diff --git a/src/extension/IOController_Windows.ipp b/src/extension/IOController_Windows.ipp index 5217891a..2823834c 100644 --- a/src/extension/IOController_Windows.ipp +++ b/src/extension/IOController_Windows.ipp @@ -20,6 +20,10 @@ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +// Windows IOController still uses on with WSAWaitForMultipleEvents. The scheduler-driven +// poll model (dedicated IOController pool, HIGH control handlers + default-pool bump reactions) +// is implemented on POSIX only; Windows parity is deferred. + #include "IOController.hpp" namespace NUClear { From 4f82a76a62726d99c0653f4c5b136e17e78fae99 Mon Sep 17 00:00:00 2001 From: Trent Houliston Date: Wed, 17 Jun 2026 13:25:34 +1000 Subject: [PATCH 2/9] Simplify IOController pool identity and address PR review feedback. Move pool constexprs onto IOController, raise poll to NORMAL priority, colocate inline bump reactions with their handlers, inline the poll loop, and bring Windows onto the same scheduler-driven model. Co-authored-by: Cursor --- docs/explanation/scheduler.md | 9 +- .../extensions/built-in-extensions.md | 11 +- src/extension/IOController.hpp | 30 ++--- src/extension/IOController_Posix.ipp | 104 +++++++----------- src/extension/IOController_Windows.ipp | 81 ++++++-------- 5 files changed, 95 insertions(+), 140 deletions(-) diff --git a/docs/explanation/scheduler.md b/docs/explanation/scheduler.md index 802601d7..36b44a88 100644 --- a/docs/explanation/scheduler.md +++ b/docs/explanation/scheduler.md @@ -277,7 +277,14 @@ Persistent pools (`ThreadPoolDescriptor::persistent`) continue accepting tasks d ### Blocking pools and priority ordering -Some extensions use a single-consumer pool with a blocking wait in the worker thread (for example, IOController on POSIX blocks in `::poll()`). Priority does not preempt inside the blocking call; instead, cross-thread control work is enqueued at HIGH priority on the same pool, a notify pipe wakes the poll thread, the LOW poll task finishes its iteration and resubmits, and the worker dequeues the already-queued HIGH task before the resubmitted poll. Control handlers that mutate poll state from another thread pair with separate default-pool bump reactions registered after the HIGH handlers so the bump runs from a thread that is not blocked in `::poll()`. +Some extensions use a single-consumer pool with a blocking wait in the worker thread. +For example, IOController blocks in `::poll()` or `WSAWaitForMultipleEvents()`. + +Priority does not preempt inside the blocking call. +Instead, cross-thread control work is enqueued at HIGH priority on the same pool. +An inline bump reaction writes the notify pipe or event from the emitting thread. +The poll task finishes its iteration and resubmits at NORMAL priority. +The worker then dequeues the already-queued HIGH task before the resubmitted poll. ## Design tradeoffs diff --git a/docs/reference/extensions/built-in-extensions.md b/docs/reference/extensions/built-in-extensions.md index 6404e4f1..82530dd1 100644 --- a/docs/reference/extensions/built-in-extensions.md +++ b/docs/reference/extensions/built-in-extensions.md @@ -95,12 +95,13 @@ Monitors file descriptors for IO readiness events and triggers corresponding rea **Implementation:** -**Implementation:** - -Uses platform-native polling mechanisms: +Uses platform-native polling mechanisms on a dedicated `IOController` pool (`concurrency = 1`, MPSC): -- **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays, driven by a dedicated `IOController` pool (`concurrency = 1`, MPSC). A LOW-priority poll task blocks in `::poll()` and resubmits itself after each iteration. Control handlers (`IOConfiguration`, `IOFinished`, `Unbind`, `Shutdown`) run at HIGH priority on the same pool; separate default-pool bump reactions wake the poll thread via a notify pipe once the HIGH task is queued. -- **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles, still using `on` (scheduler-driven parity deferred). +- **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays. + A NORMAL-priority poll task blocks in `::poll()` and resubmits itself after each iteration. + Control handlers (`IOConfiguration`, `IOFinished`, `Unbind`, `Shutdown`) run at HIGH priority on the same pool. + Each control handler is followed by an `Inline::ALWAYS` bump reaction that writes the notify pipe from the emitting thread. +- **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles, using the same scheduler-driven pool and priority model. When events are detected on registered file descriptors, the controller creates tasks for the corresponding reactions, passing the event flags through `ThreadStore` so the `get` method can report which specific events occurred. diff --git a/src/extension/IOController.hpp b/src/extension/IOController.hpp index 5ff9b8f7..d140afc4 100644 --- a/src/extension/IOController.hpp +++ b/src/extension/IOController.hpp @@ -32,15 +32,6 @@ namespace NUClear { namespace extension { - /// Dedicated single-consumer pool for IOController scheduler-driven polling. - namespace io_pool { - struct IO { - static constexpr const char* name = "IOController"; - static constexpr int concurrency = 1; - static constexpr bool counts_for_idle = false; - }; - } // namespace io_pool - class IOController : public Reactor { public: struct Task; @@ -104,6 +95,13 @@ namespace extension { }; private: + /// Dedicated single-consumer pool for scheduler-driven IO polling. + struct IOPool { + static constexpr const char* name = "IOController"; + static constexpr int concurrency = 1; + static constexpr bool counts_for_idle = false; + }; + /** * Rebuilds the list of file descriptors to poll. * @@ -133,20 +131,6 @@ namespace extension { // NOLINTNEXTLINE(readability-make-member-function-const) this changes states void bump(); - /** - * Runs one poll iteration on the IO pool and reschedules the poll task when running. - * - * The poll loop is a LOW-priority task on a dedicated single-consumer pool. After each - * iteration it resubmits at LOW; cross-thread control work is queued at HIGH on the same - * pool before a separate bump reaction wakes ::poll(), so the worker dequeues HIGH first. - */ - void poll_iteration(); - - /** - * Resubmit the poll reaction to the scheduler (same pool, LOW priority). - */ - void reschedule_poll(); - public: explicit IOController(std::unique_ptr environment); diff --git a/src/extension/IOController_Posix.ipp b/src/extension/IOController_Posix.ipp index 574d6524..8c367829 100644 --- a/src/extension/IOController_Posix.ipp +++ b/src/extension/IOController_Posix.ipp @@ -22,6 +22,7 @@ #include "IOController.hpp" +#include "../dsl/word/Inline.hpp" #include "../dsl/word/Pool.hpp" #include "../dsl/word/Priority.hpp" #include "../dsl/word/Startup.hpp" @@ -30,8 +31,6 @@ namespace NUClear { namespace extension { - using IOPool = dsl::word::Pool; - namespace { /** @@ -97,52 +96,6 @@ namespace extension { } // namespace - void IOController::reschedule_poll() { - if (!running.load(std::memory_order_acquire)) { - return; - } - if (const auto* current = threading::ReactionTask::get_current_task()) { - powerplant.submit(current->parent->get_task()); - } - } - - void IOController::poll_iteration() { - // Rebuild the list if something changed - if (dirty.load(std::memory_order_acquire)) { - rebuild_list(); - } - - // Wait for an event to happen on one of our file descriptors - bool polled = false; - /* mutex scope */ { - const NotifierPollScope poll(notifier); - if (!poll.wake_pending()) { - if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) { - throw std::system_error(network_errno, - std::system_category(), - "There was an IO error while attempting to poll the file descriptors"); - } - polled = true; - } - } - - if (polled) { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - for (auto& fd : watches) { - - // Collect the events that happened into the tasks list - // Something happened - if (fd.revents != 0) { - process_event(fd); - } - } - } - - // Yield to the scheduler. Any HIGH control tasks queued before the bump wake are dequeued first. - reschedule_poll(); - } - void IOController::rebuild_list() { // Get the lock so we don't concurrently modify the list const std::lock_guard lock(tasks_mutex); @@ -286,11 +239,7 @@ namespace extension { // Start by rebuilding the list rebuild_list(); - // Control handlers run on the IO pool at HIGH priority. Separate bump reactions (default pool, - // registered after the HIGH handlers) wake ::poll() once the HIGH task is already queued. - // TypeCallbackStore iteration follows registration order, so HIGH is enqueued before bump. - - on, IOPool, Priority::HIGH>().then( + on, Pool, Priority::HIGH>().then( "Configure IO Reaction", [this](const dsl::word::IOConfiguration& config) { const std::lock_guard lock(tasks_mutex); @@ -301,8 +250,9 @@ namespace extension { std::sort(tasks.begin(), tasks.end()); dirty.store(true, std::memory_order_release); }); + on, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); }); - on, IOPool, Priority::HIGH>().then("IO Finished", [this](const dsl::word::IOFinished& event) { + on, Pool, Priority::HIGH>().then("IO Finished", [this](const dsl::word::IOFinished& event) { const std::lock_guard lock(tasks_mutex); auto task = std::find_if(tasks.begin(), tasks.end(), [&event](const Task& t) { @@ -315,7 +265,6 @@ namespace extension { tasks.erase(task); } else { - // Mutate watches[] only while poll is out of ::poll() — wake-then-lock on the IO thread. NotifierWakeGuard wake(notifier); wake.signal(); const auto notifier_lock = wake.lock(); @@ -333,8 +282,9 @@ namespace extension { } } }); + on, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); }); - on>, IOPool, Priority::HIGH>().then( + on>, Pool, Priority::HIGH>().then( "Unbind IO Reaction", [this](const dsl::operation::Unbind& unbind) { const std::lock_guard lock(tasks_mutex); @@ -349,18 +299,46 @@ namespace extension { dirty.store(true, std::memory_order_release); }); + on>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); }); - on().then("Shutdown IO Controller", [this] { + on, Priority::HIGH>().then("Shutdown IO Controller", [this] { running.store(false, std::memory_order_release); }); + on().then("Shutdown IO bump", [this] { bump(); }); - on>().then("Configure IO bump", [this] { bump(); }); - on>().then("IO Finished bump", [this] { bump(); }); - on>>().then("Unbind IO bump", [this] { bump(); }); - on().then("Shutdown IO bump", [this] { bump(); }); + on, Priority::NORMAL>().then("IO Poll", [this] { + if (!running.load(std::memory_order_acquire)) { + return; + } - // Scheduler-driven poll: dedicated single-consumer IO pool, LOW priority, self-resubmitting. - on().then("IO Poll", [this] { poll_iteration(); }); + if (dirty.load(std::memory_order_acquire)) { + rebuild_list(); + } + + bool polled = false; + { + const NotifierPollScope poll(notifier); + if (!poll.wake_pending()) { + if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) { + throw std::system_error(network_errno, + std::system_category(), + "There was an IO error while attempting to poll the file descriptors"); + } + polled = true; + } + } + + if (polled) { + const std::lock_guard lock(tasks_mutex); + for (auto& fd : watches) { + if (fd.revents != 0) { + process_event(fd); + } + } + } + + powerplant.submit(threading::ReactionTask::get_current_task()->parent->get_task()); + }); } } // namespace extension diff --git a/src/extension/IOController_Windows.ipp b/src/extension/IOController_Windows.ipp index 2823834c..34689ffc 100644 --- a/src/extension/IOController_Windows.ipp +++ b/src/extension/IOController_Windows.ipp @@ -20,12 +20,14 @@ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Windows IOController still uses on with WSAWaitForMultipleEvents. The scheduler-driven -// poll model (dedicated IOController pool, HIGH control handlers + default-pool bump reactions) -// is implemented on POSIX only; Windows parity is deferred. - #include "IOController.hpp" +#include "../dsl/word/Inline.hpp" +#include "../dsl/word/Pool.hpp" +#include "../dsl/word/Priority.hpp" +#include "../dsl/word/Startup.hpp" +#include "../threading/ReactionTask.hpp" + namespace NUClear { namespace extension { @@ -155,11 +157,10 @@ namespace extension { // Start by rebuilding the list rebuild_list(); - on>().then( + on, Pool, Priority::HIGH>().then( "Configure IO Reaction", [this](const dsl::word::IOConfiguration& config) { - // Lock our mutex - std::lock_guard lock(tasks_mutex); + const std::lock_guard lock(tasks_mutex); // Make an event for this SOCKET auto event = WSACreateEvent(); @@ -174,47 +175,37 @@ namespace extension { throw std::system_error(WSAGetLastError(), std::system_category(), "WSAEventSelect() failed"); } - // Add all the information to the list and mark the list as dirty, to sync with the list of events tasks.insert(std::make_pair(event, Task{config.fd, config.events, config.reaction})); dirty.store(true, std::memory_order_release); - - bump(); }); + on, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); }); - on>().then("IO Finished", [this](const dsl::word::IOFinished& event) { - // Get the lock so we don't concurrently modify the list + on, Pool, Priority::HIGH>().then("IO Finished", [this](const dsl::word::IOFinished& event) { const std::lock_guard lock(tasks_mutex); - // Find the reaction that finished processing auto it = std::find_if(tasks.begin(), tasks.end(), [&event](const std::pair& t) { return t.second.reaction->id == event.id; }); - // If we found it then clear the waiting events if (it != tasks.end()) { auto& task = it->second; - // If the events we were processing included close remove it from the list if (task.processing_events & IO::CLOSE) { dirty.store(true, std::memory_order_release); remove_task(tasks, it); } else { - // We have finished processing events task.processing_events = 0; - - // Try to fire again which will check if there are any waiting events fire_event(task); } } }); + on, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); }); - on>>().then( + on>, Pool, Priority::HIGH>().then( "Unbind IO Reaction", [this](const dsl::operation::Unbind& unbind) { - // Lock our mutex to avoid concurrent modification const std::lock_guard lock(tasks_mutex); - // Find our reaction auto it = std::find_if(tasks.begin(), tasks.end(), [&unbind](const std::pair& t) { return t.second.reaction->id == unbind.id; }); @@ -223,43 +214,37 @@ namespace extension { remove_task(tasks, it); } - // Let the poll command know that stuff happened dirty.store(true, std::memory_order_release); - bump(); }); + on>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); }); - on().then("Shutdown IO Controller", [this] { + on, Priority::HIGH>().then("Shutdown IO Controller", [this] { running.store(false, std::memory_order_release); - bump(); }); + on().then("Shutdown IO bump", [this] { bump(); }); - on().then("IO Controller", [this] { - while (running.load(std::memory_order_acquire)) { - // Rebuild the list if something changed - if (dirty.load(std::memory_order_acquire)) { - rebuild_list(); - } + on, Priority::NORMAL>().then("IO Poll", [this] { + if (!running.load(std::memory_order_acquire)) { + return; + } - // Wait for events - DWORD event_index = 0; - /*mutex scope*/ { - const std::lock_guard lock(notifier.mutex); - event_index = WSAWaitForMultipleEvents(static_cast(watches.size()), - watches.data(), - false, - WSA_INFINITE, - false); - } + if (dirty.load(std::memory_order_acquire)) { + rebuild_list(); + } - // Check if the return value is an event in our list - if (event_index >= WSA_WAIT_EVENT_0 && event_index < WSA_WAIT_EVENT_0 + watches.size()) { - // Get the signalled event - auto& event = watches[event_index - WSA_WAIT_EVENT_0]; + const std::lock_guard lock(notifier.mutex); + const DWORD event_index = WSAWaitForMultipleEvents(static_cast(watches.size()), + watches.data(), + false, + WSA_INFINITE, + false); - // Collect the events that happened into the tasks list - process_event(event); - } + if (event_index >= WSA_WAIT_EVENT_0 && event_index < WSA_WAIT_EVENT_0 + watches.size()) { + auto& event = watches[event_index - WSA_WAIT_EVENT_0]; + process_event(event); } + + powerplant.submit(threading::ReactionTask::get_current_task()->parent->get_task()); }); } From b531c6e8a2fc0aadb8f40a48f2bd8c1f184d3638 Mon Sep 17 00:00:00 2001 From: Trent Houliston Date: Wed, 17 Jun 2026 13:27:02 +1000 Subject: [PATCH 3/9] Fix markdown formatting Co-authored-by: Cursor --- docs/reference/extensions/built-in-extensions.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/reference/extensions/built-in-extensions.md b/docs/reference/extensions/built-in-extensions.md index 82530dd1..d72a5b74 100644 --- a/docs/reference/extensions/built-in-extensions.md +++ b/docs/reference/extensions/built-in-extensions.md @@ -98,9 +98,9 @@ Monitors file descriptors for IO readiness events and triggers corresponding rea Uses platform-native polling mechanisms on a dedicated `IOController` pool (`concurrency = 1`, MPSC): - **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays. - A NORMAL-priority poll task blocks in `::poll()` and resubmits itself after each iteration. - Control handlers (`IOConfiguration`, `IOFinished`, `Unbind`, `Shutdown`) run at HIGH priority on the same pool. - Each control handler is followed by an `Inline::ALWAYS` bump reaction that writes the notify pipe from the emitting thread. + A NORMAL-priority poll task blocks in `::poll()` and resubmits itself after each iteration. + Control handlers (`IOConfiguration`, `IOFinished`, `Unbind`, `Shutdown`) run at HIGH priority on the same pool. + Each control handler is followed by an `Inline::ALWAYS` bump reaction that writes the notify pipe from the emitting thread. - **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles, using the same scheduler-driven pool and priority model. When events are detected on registered file descriptors, the controller creates tasks for the corresponding reactions, passing the event flags through `ThreadStore` so the `get` method can report which specific events occurred. From f62b7dc9d0ddabd54480e10a3c462d8904727db9 Mon Sep 17 00:00:00 2001 From: Trent Houliston Date: Wed, 17 Jun 2026 13:42:12 +1000 Subject: [PATCH 4/9] Fix IO pool single-thread model by forbidding inline control handlers. emit was running Pool HIGH handlers on the default pool while ::poll() held watches[], which required notifier mutex machinery. Inline::NEVER routes all control mutations through the IO pool thread; bump reactions only wake the notify pipe. Removes NotifierWakeGuard, wake_requested, tasks_mutex, and unnecessary DSL includes. Co-authored-by: Cursor --- src/extension/IOController.hpp | 10 +- src/extension/IOController_Posix.ipp | 129 ++++--------------------- src/extension/IOController_Windows.ipp | 30 +----- 3 files changed, 28 insertions(+), 141 deletions(-) diff --git a/src/extension/IOController.hpp b/src/extension/IOController.hpp index d140afc4..911517dc 100644 --- a/src/extension/IOController.hpp +++ b/src/extension/IOController.hpp @@ -43,18 +43,14 @@ namespace extension { using tasks_t = std::map; struct notifier_t { WSAEVENT notifier{WSA_INVALID_EVENT}; ///< This is the event that is waited on by WSAWaitForMultipleEvents - std::mutex mutex; ///< This mutex is used to ensure that wait has woken up }; #else using event_t = decltype(pollfd::events); using watcher_t = pollfd; using tasks_t = std::vector; struct notifier_t { - fd_t recv{-1}; ///< This is the file descriptor that is waited on by poll - fd_t send{-1}; ///< This is the file descriptor that is written to to wake up the poll command - std::mutex mutex; ///< This mutex is used to ensure that a write to poll has worked - /// Armed by NotifierWakeGuard during the wake-then-lock handoff; checked under mutex before ::poll(). - std::atomic wake_requested{false}; + fd_t recv{-1}; ///< This is the file descriptor that is waited on by poll + fd_t send{-1}; ///< This is the file descriptor that is written to to wake up the poll command }; #endif @@ -140,8 +136,6 @@ namespace extension { /// If the IOController should continue running std::atomic running{true}; - /// The mutex that protects the tasks list - std::mutex tasks_mutex; /// Whether or not the list of file descriptors is dirty compared to tasks std::atomic dirty{true}; /// The list of events that are being watched diff --git a/src/extension/IOController_Posix.ipp b/src/extension/IOController_Posix.ipp index 8c367829..87ac322f 100644 --- a/src/extension/IOController_Posix.ipp +++ b/src/extension/IOController_Posix.ipp @@ -22,84 +22,12 @@ #include "IOController.hpp" -#include "../dsl/word/Inline.hpp" -#include "../dsl/word/Pool.hpp" -#include "../dsl/word/Priority.hpp" -#include "../dsl/word/Startup.hpp" #include "../threading/ReactionTask.hpp" namespace NUClear { namespace extension { - namespace { - - /** - * RAII wake-then-lock handoff for the poll notifier pipe. - * - * Arms wake_requested, writes the notify pipe, then (via lock()) acquires notifier.mutex - * and clears wake_requested so the poll thread cannot re-enter ::poll() mid-handoff. - */ - class NotifierWakeGuard { - public: - explicit NotifierWakeGuard(IOController::notifier_t& notifier) : notifier_(notifier) { - notifier_.wake_requested.store(true, std::memory_order_release); - } - - NotifierWakeGuard(const NotifierWakeGuard&) = delete; - NotifierWakeGuard& operator=(const NotifierWakeGuard&) = delete; - NotifierWakeGuard(NotifierWakeGuard&&) = delete; - NotifierWakeGuard& operator=(NotifierWakeGuard&&) = delete; - - void signal() const { - uint8_t val = 1; - if (::write(notifier_.send, &val, sizeof(val)) < 0) { - throw std::system_error(network_errno, - std::system_category(), - "There was an error while writing to the notification pipe"); - } - } - - /// Acquire notifier mutex and clear wake_requested for the handoff to poll. - std::unique_lock lock() { - cleared_ = true; - std::unique_lock l(notifier_.mutex); - notifier_.wake_requested.store(false, std::memory_order_release); - return l; - } - - ~NotifierWakeGuard() { - if (!cleared_) { - notifier_.wake_requested.store(false, std::memory_order_release); - } - } - - private: - IOController::notifier_t& notifier_; - bool cleared_{false}; - }; - - /// Holds notifier.mutex while the poll thread decides whether to enter ::poll(). - class NotifierPollScope { - public: - explicit NotifierPollScope(IOController::notifier_t& notifier) - : lock_(notifier.mutex) - , notifier_(notifier) {} - - bool wake_pending() const { - return notifier_.wake_requested.load(std::memory_order_acquire); - } - - private: - std::lock_guard lock_; - IOController::notifier_t& notifier_; - }; - - } // namespace - void IOController::rebuild_list() { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - // Clear our fds to be rebuilt watches.resize(0); @@ -218,10 +146,12 @@ namespace extension { } void IOController::bump() { - NotifierWakeGuard wake(notifier); - wake.signal(); - // Locking here will ensure we won't return until poll is not running - const auto lock = wake.lock(); + uint8_t val = 1; + if (::write(notifier.send, &val, sizeof(val)) < 0) { + throw std::system_error(network_errno, + std::system_category(), + "There was an error while writing to the notification pipe"); + } } IOController::IOController(std::unique_ptr environment) : Reactor(std::move(environment)) { @@ -239,11 +169,12 @@ namespace extension { // Start by rebuilding the list rebuild_list(); - on, Pool, Priority::HIGH>().then( + // Control handlers run on the IO pool at HIGH priority with Inline::NEVER so emit + // cannot execute them on the default pool while ::poll() holds watches[]. Inline wake + // reactions registered after them only write to the notify pipe so ::poll() unblocks. + on, Pool, Priority::HIGH, Inline::NEVER>().then( "Configure IO Reaction", [this](const dsl::word::IOConfiguration& config) { - const std::lock_guard lock(tasks_mutex); - // NOLINTNEXTLINE(google-runtime-int) tasks.emplace_back(config.fd, event_t(config.events), config.reaction); @@ -252,9 +183,7 @@ namespace extension { }); on, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); }); - on, Pool, Priority::HIGH>().then("IO Finished", [this](const dsl::word::IOFinished& event) { - const std::lock_guard lock(tasks_mutex); - + on, Pool, Priority::HIGH, Inline::NEVER>().then("IO Finished", [this](const dsl::word::IOFinished& event) { auto task = std::find_if(tasks.begin(), tasks.end(), [&event](const Task& t) { return t.reaction->id == event.id; }); @@ -265,10 +194,6 @@ namespace extension { tasks.erase(task); } else { - NotifierWakeGuard wake(notifier); - wake.signal(); - const auto notifier_lock = wake.lock(); - auto it = std::lower_bound(watches.begin(), watches.end(), task->fd, @@ -284,11 +209,9 @@ namespace extension { }); on, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); }); - on>, Pool, Priority::HIGH>().then( + on>, Pool, Priority::HIGH, Inline::NEVER>().then( "Unbind IO Reaction", [this](const dsl::operation::Unbind& unbind) { - const std::lock_guard lock(tasks_mutex); - auto reaction = std::find_if(tasks.begin(), tasks.end(), [&unbind](const Task& t) { return t.reaction->id == unbind.id; }); @@ -301,12 +224,12 @@ namespace extension { }); on>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); }); - on, Priority::HIGH>().then("Shutdown IO Controller", [this] { + on, Priority::HIGH, Inline::NEVER>().then("Shutdown IO Controller", [this] { running.store(false, std::memory_order_release); }); on().then("Shutdown IO bump", [this] { bump(); }); - on, Priority::NORMAL>().then("IO Poll", [this] { + on, Priority::NORMAL, Inline::NEVER>().then("IO Poll", [this] { if (!running.load(std::memory_order_acquire)) { return; } @@ -315,25 +238,15 @@ namespace extension { rebuild_list(); } - bool polled = false; - { - const NotifierPollScope poll(notifier); - if (!poll.wake_pending()) { - if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) { - throw std::system_error(network_errno, - std::system_category(), - "There was an IO error while attempting to poll the file descriptors"); - } - polled = true; - } + if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) { + throw std::system_error(network_errno, + std::system_category(), + "There was an IO error while attempting to poll the file descriptors"); } - if (polled) { - const std::lock_guard lock(tasks_mutex); - for (auto& fd : watches) { - if (fd.revents != 0) { - process_event(fd); - } + for (auto& fd : watches) { + if (fd.revents != 0) { + process_event(fd); } } diff --git a/src/extension/IOController_Windows.ipp b/src/extension/IOController_Windows.ipp index 34689ffc..34f1d995 100644 --- a/src/extension/IOController_Windows.ipp +++ b/src/extension/IOController_Windows.ipp @@ -22,10 +22,6 @@ #include "IOController.hpp" -#include "../dsl/word/Inline.hpp" -#include "../dsl/word/Pool.hpp" -#include "../dsl/word/Priority.hpp" -#include "../dsl/word/Startup.hpp" #include "../threading/ReactionTask.hpp" namespace NUClear { @@ -55,9 +51,6 @@ namespace extension { } void IOController::rebuild_list() { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - // Clear our fds to be rebuilt watches.resize(0); @@ -102,9 +95,6 @@ namespace extension { void IOController::process_event(WSAEVENT& event) { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - if (event == notifier.notifier) { // Reset the notifier signal if (!WSAResetEvent(event)) { @@ -141,9 +131,6 @@ namespace extension { std::system_category(), "WSASetEvent() for configure io reaction failed"); } - - // Locking here will ensure we won't return until poll is not running - const std::lock_guard lock(notifier.mutex); } IOController::IOController(std::unique_ptr environment) : Reactor(std::move(environment)) { @@ -157,11 +144,9 @@ namespace extension { // Start by rebuilding the list rebuild_list(); - on, Pool, Priority::HIGH>().then( + on, Pool, Priority::HIGH, Inline::NEVER>().then( "Configure IO Reaction", [this](const dsl::word::IOConfiguration& config) { - const std::lock_guard lock(tasks_mutex); - // Make an event for this SOCKET auto event = WSACreateEvent(); if (event == WSA_INVALID_EVENT) { @@ -180,9 +165,7 @@ namespace extension { }); on, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); }); - on, Pool, Priority::HIGH>().then("IO Finished", [this](const dsl::word::IOFinished& event) { - const std::lock_guard lock(tasks_mutex); - + on, Pool, Priority::HIGH, Inline::NEVER>().then("IO Finished", [this](const dsl::word::IOFinished& event) { auto it = std::find_if(tasks.begin(), tasks.end(), [&event](const std::pair& t) { return t.second.reaction->id == event.id; }); @@ -201,11 +184,9 @@ namespace extension { }); on, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); }); - on>, Pool, Priority::HIGH>().then( + on>, Pool, Priority::HIGH, Inline::NEVER>().then( "Unbind IO Reaction", [this](const dsl::operation::Unbind& unbind) { - const std::lock_guard lock(tasks_mutex); - auto it = std::find_if(tasks.begin(), tasks.end(), [&unbind](const std::pair& t) { return t.second.reaction->id == unbind.id; }); @@ -218,12 +199,12 @@ namespace extension { }); on>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); }); - on, Priority::HIGH>().then("Shutdown IO Controller", [this] { + on, Priority::HIGH, Inline::NEVER>().then("Shutdown IO Controller", [this] { running.store(false, std::memory_order_release); }); on().then("Shutdown IO bump", [this] { bump(); }); - on, Priority::NORMAL>().then("IO Poll", [this] { + on, Priority::NORMAL, Inline::NEVER>().then("IO Poll", [this] { if (!running.load(std::memory_order_acquire)) { return; } @@ -232,7 +213,6 @@ namespace extension { rebuild_list(); } - const std::lock_guard lock(notifier.mutex); const DWORD event_index = WSAWaitForMultipleEvents(static_cast(watches.size()), watches.data(), false, From 1a25c3806c8db7b6c220bc154b0d63f367e359c0 Mon Sep 17 00:00:00 2001 From: Trent Houliston Date: Wed, 17 Jun 2026 14:23:38 +1000 Subject: [PATCH 5/9] Stop inline emits for IO control messages. IO bind/unbind/finished events now use default Local emit so Pool handlers are queued to the IO pool instead of running on the emitter thread. Inline::NEVER on control handlers and Inline::ALWAYS bump reactions are kept. Co-authored-by: Cursor --- src/dsl/word/IO.hpp | 7 +++---- src/extension/IOController_Posix.ipp | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/dsl/word/IO.hpp b/src/dsl/word/IO.hpp index a3644988..174f434e 100644 --- a/src/dsl/word/IO.hpp +++ b/src/dsl/word/IO.hpp @@ -30,7 +30,6 @@ #include "../store/ThreadStore.hpp" #include "../trait/is_transient.hpp" #include "Single.hpp" -#include "emit/Inline.hpp" namespace NUClear { namespace dsl { @@ -132,13 +131,13 @@ namespace dsl { static void bind(const std::shared_ptr& reaction, fd_t fd, event_t watch_set) { reaction->unbinders.emplace_back([](const threading::Reaction& r) { - r.reactor.emit(std::make_unique>(r.id)); + r.reactor.emit(std::make_unique>(r.id)); }); auto io_config = std::make_unique(fd, watch_set, reaction); // Send our configuration out - reaction->reactor.emit(io_config); + reaction->reactor.emit(io_config); } template @@ -155,7 +154,7 @@ namespace dsl { template static void post_run(threading::ReactionTask& task) { - task.parent->reactor.emit(std::make_unique(task.parent->id)); + task.parent->reactor.emit(std::make_unique(task.parent->id)); } }; diff --git a/src/extension/IOController_Posix.ipp b/src/extension/IOController_Posix.ipp index 87ac322f..6e9b068a 100644 --- a/src/extension/IOController_Posix.ipp +++ b/src/extension/IOController_Posix.ipp @@ -169,8 +169,8 @@ namespace extension { // Start by rebuilding the list rebuild_list(); - // Control handlers run on the IO pool at HIGH priority with Inline::NEVER so emit - // cannot execute them on the default pool while ::poll() holds watches[]. Inline wake + // Control handlers run on the IO pool at HIGH priority with Inline::NEVER so they are + // always queued to the IO pool rather than running on the emitting thread. Inline wake // reactions registered after them only write to the notify pipe so ::poll() unblocks. on, Pool, Priority::HIGH, Inline::NEVER>().then( "Configure IO Reaction", From c7624477dce3ccee831e6d7c5cf43262ccfac032 Mon Sep 17 00:00:00 2001 From: Trent Houliston Date: Wed, 17 Jun 2026 15:42:49 +1000 Subject: [PATCH 6/9] Mark IOController::bump as const for clang-tidy. Only wakes the poll thread via the notifier; it does not mutate controller state. --- src/extension/IOController.hpp | 3 +-- src/extension/IOController_Posix.ipp | 2 +- src/extension/IOController_Windows.ipp | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/extension/IOController.hpp b/src/extension/IOController.hpp index 911517dc..2aed5953 100644 --- a/src/extension/IOController.hpp +++ b/src/extension/IOController.hpp @@ -124,8 +124,7 @@ namespace extension { * If the poll command is waiting it will wait forever if something doesn't happen. * When trying to update what to poll or shut down we need to wake it up so it can. */ - // NOLINTNEXTLINE(readability-make-member-function-const) this changes states - void bump(); + void bump() const; public: explicit IOController(std::unique_ptr environment); diff --git a/src/extension/IOController_Posix.ipp b/src/extension/IOController_Posix.ipp index 6e9b068a..d29cb6bc 100644 --- a/src/extension/IOController_Posix.ipp +++ b/src/extension/IOController_Posix.ipp @@ -145,7 +145,7 @@ namespace extension { event.revents = 0; } - void IOController::bump() { + void IOController::bump() const { uint8_t val = 1; if (::write(notifier.send, &val, sizeof(val)) < 0) { throw std::system_error(network_errno, diff --git a/src/extension/IOController_Windows.ipp b/src/extension/IOController_Windows.ipp index 34f1d995..45aaf944 100644 --- a/src/extension/IOController_Windows.ipp +++ b/src/extension/IOController_Windows.ipp @@ -125,7 +125,7 @@ namespace extension { } } - void IOController::bump() { + void IOController::bump() const { if (!WSASetEvent(notifier.notifier)) { throw std::system_error(WSAGetLastError(), std::system_category(), From 62617d2979cae71aaf76cd2144d5c016999ff559 Mon Sep 17 00:00:00 2001 From: Trent Houliston Date: Thu, 18 Jun 2026 17:00:32 +1000 Subject: [PATCH 7/9] Extract shared IOController scheduler wiring to reduce duplication. Move bump reactions, shutdown control, and poll resubmit logic into IOController_Common.ipp so POSIX and Windows paths share one implementation and satisfy SonarCloud duplicated-lines quality gate. Co-authored-by: Cursor --- src/extension/IOController.cpp | 2 + src/extension/IOController.hpp | 20 ++++++++ src/extension/IOController_Common.ipp | 71 ++++++++++++++++++++++++++ src/extension/IOController_Posix.ipp | 27 ++-------- src/extension/IOController_Windows.ipp | 24 ++------- 5 files changed, 99 insertions(+), 45 deletions(-) create mode 100644 src/extension/IOController_Common.ipp diff --git a/src/extension/IOController.cpp b/src/extension/IOController.cpp index 4d1542bc..3330f69a 100644 --- a/src/extension/IOController.cpp +++ b/src/extension/IOController.cpp @@ -25,3 +25,5 @@ #else #include "IOController_Posix.ipp" #endif // _WIN32 + +#include "IOController_Common.ipp" diff --git a/src/extension/IOController.hpp b/src/extension/IOController.hpp index 2aed5953..ecdc1f67 100644 --- a/src/extension/IOController.hpp +++ b/src/extension/IOController.hpp @@ -28,6 +28,7 @@ #include "../util/platform.hpp" #include +#include namespace NUClear { namespace extension { @@ -126,6 +127,25 @@ namespace extension { */ void bump() const; + /// Inline bump reactions that wake the poll task from the emitting thread. + void register_inline_bump_reactions(); + + /// HIGH-priority shutdown handler on the IO pool. + void register_shutdown_control(); + + /** + * Registers the self-resubmitting poll task. + * + * @param wait_and_process platform-specific blocking wait and event dispatch + */ + void register_poll_loop(std::function wait_and_process); + + /// Returns false when the poll loop should exit without blocking. + bool prepare_poll_iteration(); + + /// Resubmits the poll reaction after one blocking iteration. + void resubmit_poll_task(); + public: explicit IOController(std::unique_ptr environment); diff --git a/src/extension/IOController_Common.ipp b/src/extension/IOController_Common.ipp new file mode 100644 index 00000000..cf37f44b --- /dev/null +++ b/src/extension/IOController_Common.ipp @@ -0,0 +1,71 @@ +/* + * MIT License + * + * Copyright (c) 2026 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include "IOController.hpp" + +#include "../threading/ReactionTask.hpp" + +namespace NUClear { +namespace extension { + + void IOController::register_inline_bump_reactions() { + on, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); }); + on, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); }); + on>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); }); + on().then("Shutdown IO bump", [this] { bump(); }); + } + + void IOController::register_shutdown_control() { + on, Priority::HIGH, Inline::NEVER>().then("Shutdown IO Controller", [this] { + running.store(false, std::memory_order_release); + }); + } + + bool IOController::prepare_poll_iteration() { + if (!running.load(std::memory_order_acquire)) { + return false; + } + + if (dirty.load(std::memory_order_acquire)) { + rebuild_list(); + } + + return true; + } + + void IOController::resubmit_poll_task() { + powerplant.submit(threading::ReactionTask::get_current_task()->parent->get_task()); + } + + void IOController::register_poll_loop(std::function wait_and_process) { + on, Priority::NORMAL, Inline::NEVER>().then("IO Poll", [this, wait = std::move(wait_and_process)] { + if (!prepare_poll_iteration()) { + return; + } + + wait(); + resubmit_poll_task(); + }); + } + +} // namespace extension +} // namespace NUClear diff --git a/src/extension/IOController_Posix.ipp b/src/extension/IOController_Posix.ipp index d29cb6bc..7ccbc269 100644 --- a/src/extension/IOController_Posix.ipp +++ b/src/extension/IOController_Posix.ipp @@ -22,8 +22,6 @@ #include "IOController.hpp" -#include "../threading/ReactionTask.hpp" - namespace NUClear { namespace extension { @@ -169,9 +167,6 @@ namespace extension { // Start by rebuilding the list rebuild_list(); - // Control handlers run on the IO pool at HIGH priority with Inline::NEVER so they are - // always queued to the IO pool rather than running on the emitting thread. Inline wake - // reactions registered after them only write to the notify pipe so ::poll() unblocks. on, Pool, Priority::HIGH, Inline::NEVER>().then( "Configure IO Reaction", [this](const dsl::word::IOConfiguration& config) { @@ -181,7 +176,6 @@ namespace extension { std::sort(tasks.begin(), tasks.end()); dirty.store(true, std::memory_order_release); }); - on, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); }); on, Pool, Priority::HIGH, Inline::NEVER>().then("IO Finished", [this](const dsl::word::IOFinished& event) { auto task = std::find_if(tasks.begin(), tasks.end(), [&event](const Task& t) { @@ -207,7 +201,6 @@ namespace extension { } } }); - on, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); }); on>, Pool, Priority::HIGH, Inline::NEVER>().then( "Unbind IO Reaction", @@ -222,22 +215,10 @@ namespace extension { dirty.store(true, std::memory_order_release); }); - on>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); }); - - on, Priority::HIGH, Inline::NEVER>().then("Shutdown IO Controller", [this] { - running.store(false, std::memory_order_release); - }); - on().then("Shutdown IO bump", [this] { bump(); }); - - on, Priority::NORMAL, Inline::NEVER>().then("IO Poll", [this] { - if (!running.load(std::memory_order_acquire)) { - return; - } - - if (dirty.load(std::memory_order_acquire)) { - rebuild_list(); - } + register_shutdown_control(); + register_inline_bump_reactions(); + register_poll_loop([this] { if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) { throw std::system_error(network_errno, std::system_category(), @@ -249,8 +230,6 @@ namespace extension { process_event(fd); } } - - powerplant.submit(threading::ReactionTask::get_current_task()->parent->get_task()); }); } diff --git a/src/extension/IOController_Windows.ipp b/src/extension/IOController_Windows.ipp index 45aaf944..db7426e9 100644 --- a/src/extension/IOController_Windows.ipp +++ b/src/extension/IOController_Windows.ipp @@ -22,8 +22,6 @@ #include "IOController.hpp" -#include "../threading/ReactionTask.hpp" - namespace NUClear { namespace extension { @@ -163,7 +161,6 @@ namespace extension { tasks.insert(std::make_pair(event, Task{config.fd, config.events, config.reaction})); dirty.store(true, std::memory_order_release); }); - on, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); }); on, Pool, Priority::HIGH, Inline::NEVER>().then("IO Finished", [this](const dsl::word::IOFinished& event) { auto it = std::find_if(tasks.begin(), tasks.end(), [&event](const std::pair& t) { @@ -182,7 +179,6 @@ namespace extension { } } }); - on, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); }); on>, Pool, Priority::HIGH, Inline::NEVER>().then( "Unbind IO Reaction", @@ -197,22 +193,10 @@ namespace extension { dirty.store(true, std::memory_order_release); }); - on>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); }); - - on, Priority::HIGH, Inline::NEVER>().then("Shutdown IO Controller", [this] { - running.store(false, std::memory_order_release); - }); - on().then("Shutdown IO bump", [this] { bump(); }); - - on, Priority::NORMAL, Inline::NEVER>().then("IO Poll", [this] { - if (!running.load(std::memory_order_acquire)) { - return; - } - - if (dirty.load(std::memory_order_acquire)) { - rebuild_list(); - } + register_shutdown_control(); + register_inline_bump_reactions(); + register_poll_loop([this] { const DWORD event_index = WSAWaitForMultipleEvents(static_cast(watches.size()), watches.data(), false, @@ -223,8 +207,6 @@ namespace extension { auto& event = watches[event_index - WSA_WAIT_EVENT_0]; process_event(event); } - - powerplant.submit(threading::ReactionTask::get_current_task()->parent->get_task()); }); } From b8df3da4078b820e958e022ca902ea8dacb7e977 Mon Sep 17 00:00:00 2001 From: Trent Houliston Date: Thu, 18 Jun 2026 19:13:53 +1000 Subject: [PATCH 8/9] Re-inline IOController scheduler wiring. Undo SonarCloud duplication refactor: delete IOController_Common.ipp and restore inline bump reactions, shutdown control, and poll loop registration in the platform .ipp files. Co-authored-by: Cursor --- src/extension/IOController.cpp | 2 - src/extension/IOController.hpp | 20 -------- src/extension/IOController_Common.ipp | 71 -------------------------- src/extension/IOController_Posix.ipp | 24 +++++++-- src/extension/IOController_Windows.ipp | 24 +++++++-- 5 files changed, 42 insertions(+), 99 deletions(-) delete mode 100644 src/extension/IOController_Common.ipp diff --git a/src/extension/IOController.cpp b/src/extension/IOController.cpp index 3330f69a..4d1542bc 100644 --- a/src/extension/IOController.cpp +++ b/src/extension/IOController.cpp @@ -25,5 +25,3 @@ #else #include "IOController_Posix.ipp" #endif // _WIN32 - -#include "IOController_Common.ipp" diff --git a/src/extension/IOController.hpp b/src/extension/IOController.hpp index ecdc1f67..2aed5953 100644 --- a/src/extension/IOController.hpp +++ b/src/extension/IOController.hpp @@ -28,7 +28,6 @@ #include "../util/platform.hpp" #include -#include namespace NUClear { namespace extension { @@ -127,25 +126,6 @@ namespace extension { */ void bump() const; - /// Inline bump reactions that wake the poll task from the emitting thread. - void register_inline_bump_reactions(); - - /// HIGH-priority shutdown handler on the IO pool. - void register_shutdown_control(); - - /** - * Registers the self-resubmitting poll task. - * - * @param wait_and_process platform-specific blocking wait and event dispatch - */ - void register_poll_loop(std::function wait_and_process); - - /// Returns false when the poll loop should exit without blocking. - bool prepare_poll_iteration(); - - /// Resubmits the poll reaction after one blocking iteration. - void resubmit_poll_task(); - public: explicit IOController(std::unique_ptr environment); diff --git a/src/extension/IOController_Common.ipp b/src/extension/IOController_Common.ipp deleted file mode 100644 index cf37f44b..00000000 --- a/src/extension/IOController_Common.ipp +++ /dev/null @@ -1,71 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2026 NUClear Contributors - * - * This file is part of the NUClear codebase. - * See https://github.com/Fastcode/NUClear for further info. - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated - * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the - * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to - * permit persons to whom the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the - * Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE - * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR - * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#include "IOController.hpp" - -#include "../threading/ReactionTask.hpp" - -namespace NUClear { -namespace extension { - - void IOController::register_inline_bump_reactions() { - on, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); }); - on, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); }); - on>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); }); - on().then("Shutdown IO bump", [this] { bump(); }); - } - - void IOController::register_shutdown_control() { - on, Priority::HIGH, Inline::NEVER>().then("Shutdown IO Controller", [this] { - running.store(false, std::memory_order_release); - }); - } - - bool IOController::prepare_poll_iteration() { - if (!running.load(std::memory_order_acquire)) { - return false; - } - - if (dirty.load(std::memory_order_acquire)) { - rebuild_list(); - } - - return true; - } - - void IOController::resubmit_poll_task() { - powerplant.submit(threading::ReactionTask::get_current_task()->parent->get_task()); - } - - void IOController::register_poll_loop(std::function wait_and_process) { - on, Priority::NORMAL, Inline::NEVER>().then("IO Poll", [this, wait = std::move(wait_and_process)] { - if (!prepare_poll_iteration()) { - return; - } - - wait(); - resubmit_poll_task(); - }); - } - -} // namespace extension -} // namespace NUClear diff --git a/src/extension/IOController_Posix.ipp b/src/extension/IOController_Posix.ipp index 7ccbc269..2f296f16 100644 --- a/src/extension/IOController_Posix.ipp +++ b/src/extension/IOController_Posix.ipp @@ -22,6 +22,8 @@ #include "IOController.hpp" +#include "../threading/ReactionTask.hpp" + namespace NUClear { namespace extension { @@ -176,6 +178,7 @@ namespace extension { std::sort(tasks.begin(), tasks.end()); dirty.store(true, std::memory_order_release); }); + on, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); }); on, Pool, Priority::HIGH, Inline::NEVER>().then("IO Finished", [this](const dsl::word::IOFinished& event) { auto task = std::find_if(tasks.begin(), tasks.end(), [&event](const Task& t) { @@ -201,6 +204,7 @@ namespace extension { } } }); + on, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); }); on>, Pool, Priority::HIGH, Inline::NEVER>().then( "Unbind IO Reaction", @@ -215,10 +219,22 @@ namespace extension { dirty.store(true, std::memory_order_release); }); + on>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); }); + + on, Priority::HIGH, Inline::NEVER>().then("Shutdown IO Controller", [this] { + running.store(false, std::memory_order_release); + }); + on().then("Shutdown IO bump", [this] { bump(); }); + + on, Priority::NORMAL, Inline::NEVER>().then("IO Poll", [this] { + if (!running.load(std::memory_order_acquire)) { + return; + } + + if (dirty.load(std::memory_order_acquire)) { + rebuild_list(); + } - register_shutdown_control(); - register_inline_bump_reactions(); - register_poll_loop([this] { if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) { throw std::system_error(network_errno, std::system_category(), @@ -230,6 +246,8 @@ namespace extension { process_event(fd); } } + + powerplant.submit(threading::ReactionTask::get_current_task()->parent->get_task()); }); } diff --git a/src/extension/IOController_Windows.ipp b/src/extension/IOController_Windows.ipp index db7426e9..45aaf944 100644 --- a/src/extension/IOController_Windows.ipp +++ b/src/extension/IOController_Windows.ipp @@ -22,6 +22,8 @@ #include "IOController.hpp" +#include "../threading/ReactionTask.hpp" + namespace NUClear { namespace extension { @@ -161,6 +163,7 @@ namespace extension { tasks.insert(std::make_pair(event, Task{config.fd, config.events, config.reaction})); dirty.store(true, std::memory_order_release); }); + on, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); }); on, Pool, Priority::HIGH, Inline::NEVER>().then("IO Finished", [this](const dsl::word::IOFinished& event) { auto it = std::find_if(tasks.begin(), tasks.end(), [&event](const std::pair& t) { @@ -179,6 +182,7 @@ namespace extension { } } }); + on, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); }); on>, Pool, Priority::HIGH, Inline::NEVER>().then( "Unbind IO Reaction", @@ -193,10 +197,22 @@ namespace extension { dirty.store(true, std::memory_order_release); }); + on>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); }); + + on, Priority::HIGH, Inline::NEVER>().then("Shutdown IO Controller", [this] { + running.store(false, std::memory_order_release); + }); + on().then("Shutdown IO bump", [this] { bump(); }); + + on, Priority::NORMAL, Inline::NEVER>().then("IO Poll", [this] { + if (!running.load(std::memory_order_acquire)) { + return; + } + + if (dirty.load(std::memory_order_acquire)) { + rebuild_list(); + } - register_shutdown_control(); - register_inline_bump_reactions(); - register_poll_loop([this] { const DWORD event_index = WSAWaitForMultipleEvents(static_cast(watches.size()), watches.data(), false, @@ -207,6 +223,8 @@ namespace extension { auto& event = watches[event_index - WSA_WAIT_EVENT_0]; process_event(event); } + + powerplant.submit(threading::ReactionTask::get_current_task()->parent->get_task()); }); } From 941f09328af5952d4cbbff46f7fc9c588584de61 Mon Sep 17 00:00:00 2001 From: Trent Houliston Date: Fri, 19 Jun 2026 15:27:30 +1000 Subject: [PATCH 9/9] Avoid blocking on redundant POSIX bump writes. Skip the notify-pipe write when a wake byte is already pending, retry on EINTR, and generalise the Windows WSASetEvent error message. Co-authored-by: Cursor --- src/extension/IOController_Posix.ipp | 12 +++++++++++- src/extension/IOController_Windows.ipp | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/extension/IOController_Posix.ipp b/src/extension/IOController_Posix.ipp index 2f296f16..4b3ae2f7 100644 --- a/src/extension/IOController_Posix.ipp +++ b/src/extension/IOController_Posix.ipp @@ -146,8 +146,18 @@ namespace extension { } void IOController::bump() const { + pollfd pfd{notifier.recv, POLLIN, 0}; + if (::poll(&pfd, 1, 0) > 0 && (pfd.revents & POLLIN) != 0) { + return; + } + uint8_t val = 1; - if (::write(notifier.send, &val, sizeof(val)) < 0) { + ssize_t written = 0; + do { + written = ::write(notifier.send, &val, sizeof(val)); + } while (written < 0 && network_errno == EINTR); + + if (written < 0) { throw std::system_error(network_errno, std::system_category(), "There was an error while writing to the notification pipe"); diff --git a/src/extension/IOController_Windows.ipp b/src/extension/IOController_Windows.ipp index 45aaf944..4270b80e 100644 --- a/src/extension/IOController_Windows.ipp +++ b/src/extension/IOController_Windows.ipp @@ -129,7 +129,7 @@ namespace extension { if (!WSASetEvent(notifier.notifier)) { throw std::system_error(WSAGetLastError(), std::system_category(), - "WSASetEvent() for configure io reaction failed"); + "WSASetEvent() for IOController notifier failed"); } }