diff --git a/docs/explanation/scheduler.md b/docs/explanation/scheduler.md index 3d14a8e8..36b44a88 100644 --- a/docs/explanation/scheduler.md +++ b/docs/explanation/scheduler.md @@ -275,6 +275,17 @@ When the main thread pool exits (after shutdown), pools are stopped in order — Persistent pools (`ThreadPoolDescriptor::persistent`) continue accepting tasks during a normal shutdown so networking or logging reactors can finish in-flight work. +### Blocking pools and priority ordering + +Some extensions use a single-consumer pool with a blocking wait in the worker thread. +For example, IOController blocks in `::poll()` or `WSAWaitForMultipleEvents()`. + +Priority does not preempt inside the blocking call. +Instead, cross-thread control work is enqueued at HIGH priority on the same pool. +An inline bump reaction writes the notify pipe or event from the emitting thread. +The poll task finishes its iteration and resubmits at NORMAL priority. +The worker then dequeues the already-queued HIGH task before the resubmitted poll. + ## Design tradeoffs | Choice | Rationale | diff --git a/docs/reference/extensions/built-in-extensions.md b/docs/reference/extensions/built-in-extensions.md index 9961b26e..d72a5b74 100644 --- a/docs/reference/extensions/built-in-extensions.md +++ b/docs/reference/extensions/built-in-extensions.md @@ -95,12 +95,14 @@ Monitors file descriptors for IO readiness events and triggers corresponding rea **Implementation:** -Uses platform-native polling mechanisms: +Uses platform-native polling mechanisms on a dedicated `IOController` pool (`concurrency = 1`, MPSC): -- **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays -- **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles +- **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays. + A NORMAL-priority poll task blocks in `::poll()` and resubmits itself after each iteration. + Control handlers (`IOConfiguration`, `IOFinished`, `Unbind`, `Shutdown`) run at HIGH priority on the same pool. + Each control handler is followed by an `Inline::ALWAYS` bump reaction that writes the notify pipe from the emitting thread. +- **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles, using the same scheduler-driven pool and priority model. -A dedicated thread blocks on the polling call. When events are detected on registered file descriptors, the controller creates tasks for the corresponding reactions, passing the event flags through `ThreadStore` so the `get` method can report which specific events occurred. **Key internals:** diff --git a/src/dsl/word/IO.hpp b/src/dsl/word/IO.hpp index a3644988..174f434e 100644 --- a/src/dsl/word/IO.hpp +++ b/src/dsl/word/IO.hpp @@ -30,7 +30,6 @@ #include "../store/ThreadStore.hpp" #include "../trait/is_transient.hpp" #include "Single.hpp" -#include "emit/Inline.hpp" namespace NUClear { namespace dsl { @@ -132,13 +131,13 @@ namespace dsl { static void bind(const std::shared_ptr& reaction, fd_t fd, event_t watch_set) { reaction->unbinders.emplace_back([](const threading::Reaction& r) { - r.reactor.emit(std::make_unique>(r.id)); + r.reactor.emit(std::make_unique>(r.id)); }); auto io_config = std::make_unique(fd, watch_set, reaction); // Send our configuration out - reaction->reactor.emit(io_config); + reaction->reactor.emit(io_config); } template @@ -155,7 +154,7 @@ namespace dsl { template static void post_run(threading::ReactionTask& task) { - task.parent->reactor.emit(std::make_unique(task.parent->id)); + task.parent->reactor.emit(std::make_unique(task.parent->id)); } }; diff --git a/src/extension/IOController.hpp b/src/extension/IOController.hpp index 69c0d685..2aed5953 100644 --- a/src/extension/IOController.hpp +++ b/src/extension/IOController.hpp @@ -43,18 +43,14 @@ namespace extension { using tasks_t = std::map; struct notifier_t { WSAEVENT notifier{WSA_INVALID_EVENT}; ///< This is the event that is waited on by WSAWaitForMultipleEvents - std::mutex mutex; ///< This mutex is used to ensure that wait has woken up }; #else using event_t = decltype(pollfd::events); using watcher_t = pollfd; using tasks_t = std::vector; struct notifier_t { - fd_t recv{-1}; ///< This is the file descriptor that is waited on by poll - fd_t send{-1}; ///< This is the file descriptor that is written to to wake up the poll command - std::mutex mutex; ///< This mutex is used to ensure that a write to poll has worked - /// Armed by NotifierWakeGuard during the wake-then-lock handoff; checked under mutex before ::poll(). - std::atomic wake_requested{false}; + fd_t recv{-1}; ///< This is the file descriptor that is waited on by poll + fd_t send{-1}; ///< This is the file descriptor that is written to to wake up the poll command }; #endif @@ -95,6 +91,13 @@ namespace extension { }; private: + /// Dedicated single-consumer pool for scheduler-driven IO polling. + struct IOPool { + static constexpr const char* name = "IOController"; + static constexpr int concurrency = 1; + static constexpr bool counts_for_idle = false; + }; + /** * Rebuilds the list of file descriptors to poll. * @@ -121,8 +124,7 @@ namespace extension { * If the poll command is waiting it will wait forever if something doesn't happen. * When trying to update what to poll or shut down we need to wake it up so it can. */ - // NOLINTNEXTLINE(readability-make-member-function-const) this changes states - void bump(); + void bump() const; public: explicit IOController(std::unique_ptr environment); @@ -133,8 +135,6 @@ namespace extension { /// If the IOController should continue running std::atomic running{true}; - /// The mutex that protects the tasks list - std::mutex tasks_mutex; /// Whether or not the list of file descriptors is dirty compared to tasks std::atomic dirty{true}; /// The list of events that are being watched diff --git a/src/extension/IOController_Posix.ipp b/src/extension/IOController_Posix.ipp index b53910d0..4b3ae2f7 100644 --- a/src/extension/IOController_Posix.ipp +++ b/src/extension/IOController_Posix.ipp @@ -22,78 +22,12 @@ #include "IOController.hpp" +#include "../threading/ReactionTask.hpp" + namespace NUClear { namespace extension { - namespace { - - /** - * RAII wake-then-lock handoff for the poll notifier pipe. - * - * Arms wake_requested, writes the notify pipe, then (via lock()) acquires notifier.mutex - * and clears wake_requested so the poll thread cannot re-enter ::poll() mid-handoff. - */ - class NotifierWakeGuard { - public: - explicit NotifierWakeGuard(IOController::notifier_t& notifier) : notifier_(notifier) { - notifier_.wake_requested.store(true, std::memory_order_release); - } - - NotifierWakeGuard(const NotifierWakeGuard&) = delete; - NotifierWakeGuard& operator=(const NotifierWakeGuard&) = delete; - NotifierWakeGuard(NotifierWakeGuard&&) = delete; - NotifierWakeGuard& operator=(NotifierWakeGuard&&) = delete; - - void signal() const { - uint8_t val = 1; - if (::write(notifier_.send, &val, sizeof(val)) < 0) { - throw std::system_error(network_errno, - std::system_category(), - "There was an error while writing to the notification pipe"); - } - } - - /// Acquire notifier mutex and clear wake_requested for the handoff to poll. - std::unique_lock lock() { - cleared_ = true; - std::unique_lock l(notifier_.mutex); - notifier_.wake_requested.store(false, std::memory_order_release); - return l; - } - - ~NotifierWakeGuard() { - if (!cleared_) { - notifier_.wake_requested.store(false, std::memory_order_release); - } - } - - private: - IOController::notifier_t& notifier_; - bool cleared_{false}; - }; - - /// Holds notifier.mutex while the poll thread decides whether to enter ::poll(). - class NotifierPollScope { - public: - explicit NotifierPollScope(IOController::notifier_t& notifier) - : lock_(notifier.mutex) - , notifier_(notifier) {} - - bool wake_pending() const { - return notifier_.wake_requested.load(std::memory_order_acquire); - } - - private: - std::lock_guard lock_; - IOController::notifier_t& notifier_; - }; - - } // namespace - void IOController::rebuild_list() { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - // Clear our fds to be rebuilt watches.resize(0); @@ -211,11 +145,23 @@ namespace extension { event.revents = 0; } - void IOController::bump() { - NotifierWakeGuard wake(notifier); - wake.signal(); - // Locking here will ensure we won't return until poll is not running - const auto lock = wake.lock(); + void IOController::bump() const { + pollfd pfd{notifier.recv, POLLIN, 0}; + if (::poll(&pfd, 1, 0) > 0 && (pfd.revents & POLLIN) != 0) { + return; + } + + uint8_t val = 1; + ssize_t written = 0; + do { + written = ::write(notifier.send, &val, sizeof(val)); + } while (written < 0 && network_errno == EINTR); + + if (written < 0) { + throw std::system_error(network_errno, + std::system_category(), + "There was an error while writing to the notification pipe"); + } } IOController::IOController(std::unique_ptr environment) : Reactor(std::move(environment)) { @@ -233,51 +179,28 @@ namespace extension { // Start by rebuilding the list rebuild_list(); - on>().then( + on, Pool, Priority::HIGH, Inline::NEVER>().then( "Configure IO Reaction", [this](const dsl::word::IOConfiguration& config) { - // Lock our mutex to avoid concurrent modification - const std::lock_guard lock(tasks_mutex); - // NOLINTNEXTLINE(google-runtime-int) tasks.emplace_back(config.fd, event_t(config.events), config.reaction); - // Resort our list std::sort(tasks.begin(), tasks.end()); - - // Let the poll command know that stuff happened dirty.store(true, std::memory_order_release); - bump(); }); + on, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); }); - on>().then("IO Finished", [this](const dsl::word::IOFinished& event) { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - - // Find the reaction that finished processing + on, Pool, Priority::HIGH, Inline::NEVER>().then("IO Finished", [this](const dsl::word::IOFinished& event) { auto task = std::find_if(tasks.begin(), tasks.end(), [&event](const Task& t) { return t.reaction->id == event.id; }); if (task != tasks.end()) { - // If the events we were processing included close remove it from the list if ((task->processing_events & IO::CLOSE) != 0) { dirty.store(true, std::memory_order_release); tasks.erase(task); } else { - // We are about to mutate `watches[].events`, which the poll thread reads - // from inside ::poll(). Write to the notify pipe to kick poll out, then - // hold notifier.mutex for the duration of the mutation so the poll thread - // cannot re-enter ::poll() against a half-updated entry. This is the same - // wake-then-lock pattern bump() uses, but we keep the lock held until the - // watches update (and the follow-up fire_event, which can also touch - // watches[].events) is finished. - NotifierWakeGuard wake(notifier); - wake.signal(); - const auto notifier_lock = wake.lock(); - - // Unmask the events that were just processed auto it = std::lower_bound(watches.begin(), watches.end(), task->fd, @@ -286,22 +209,16 @@ namespace extension { it->events = event_t(it->events | task->processing_events); } - // No longer processing events task->processing_events = 0; - - // Try to fire again which will check if there are any waiting events fire_event(*task); } } }); + on, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); }); - on>>().then( + on>, Pool, Priority::HIGH, Inline::NEVER>().then( "Unbind IO Reaction", [this](const dsl::operation::Unbind& unbind) { - // Lock our mutex to avoid concurrent modification - const std::lock_guard lock(tasks_mutex); - - // Find our reaction auto reaction = std::find_if(tasks.begin(), tasks.end(), [&unbind](const Task& t) { return t.reaction->id == unbind.id; }); @@ -310,53 +227,37 @@ namespace extension { tasks.erase(reaction); } - // Let the poll command know that stuff happened dirty.store(true, std::memory_order_release); - bump(); }); + on>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); }); - on().then("Shutdown IO Controller", [this] { + on, Priority::HIGH, Inline::NEVER>().then("Shutdown IO Controller", [this] { running.store(false, std::memory_order_release); - bump(); }); + on().then("Shutdown IO bump", [this] { bump(); }); - on().then("IO Controller", [this] { - // Stay in this reaction to improve the performance without going back/fourth between reactions - if (running.load(std::memory_order_acquire)) { - // Rebuild the list if something changed - if (dirty.load(std::memory_order_acquire)) { - rebuild_list(); - } - - // Wait for an event to happen on one of our file descriptors - bool polled = false; - /* mutex scope */ { - const NotifierPollScope poll(notifier); - if (!poll.wake_pending()) { - if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) { - throw std::system_error(network_errno, - std::system_category(), - "There was an IO error while attempting to poll the file descriptors"); - } - polled = true; - } - } + on, Priority::NORMAL, Inline::NEVER>().then("IO Poll", [this] { + if (!running.load(std::memory_order_acquire)) { + return; + } - if (!polled) { - return; - } + if (dirty.load(std::memory_order_acquire)) { + rebuild_list(); + } - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - for (auto& fd : watches) { + if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) { + throw std::system_error(network_errno, + std::system_category(), + "There was an IO error while attempting to poll the file descriptors"); + } - // Collect the events that happened into the tasks list - // Something happened - if (fd.revents != 0) { - process_event(fd); - } + for (auto& fd : watches) { + if (fd.revents != 0) { + process_event(fd); } } + + powerplant.submit(threading::ReactionTask::get_current_task()->parent->get_task()); }); } diff --git a/src/extension/IOController_Windows.ipp b/src/extension/IOController_Windows.ipp index 5217891a..4270b80e 100644 --- a/src/extension/IOController_Windows.ipp +++ b/src/extension/IOController_Windows.ipp @@ -22,6 +22,8 @@ #include "IOController.hpp" +#include "../threading/ReactionTask.hpp" + namespace NUClear { namespace extension { @@ -49,9 +51,6 @@ namespace extension { } void IOController::rebuild_list() { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - // Clear our fds to be rebuilt watches.resize(0); @@ -96,9 +95,6 @@ namespace extension { void IOController::process_event(WSAEVENT& event) { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - if (event == notifier.notifier) { // Reset the notifier signal if (!WSAResetEvent(event)) { @@ -129,15 +125,12 @@ namespace extension { } } - void IOController::bump() { + void IOController::bump() const { if (!WSASetEvent(notifier.notifier)) { throw std::system_error(WSAGetLastError(), std::system_category(), - "WSASetEvent() for configure io reaction failed"); + "WSASetEvent() for IOController notifier failed"); } - - // Locking here will ensure we won't return until poll is not running - const std::lock_guard lock(notifier.mutex); } IOController::IOController(std::unique_ptr environment) : Reactor(std::move(environment)) { @@ -151,12 +144,9 @@ namespace extension { // Start by rebuilding the list rebuild_list(); - on>().then( + on, Pool, Priority::HIGH, Inline::NEVER>().then( "Configure IO Reaction", [this](const dsl::word::IOConfiguration& config) { - // Lock our mutex - std::lock_guard lock(tasks_mutex); - // Make an event for this SOCKET auto event = WSACreateEvent(); if (event == WSA_INVALID_EVENT) { @@ -170,47 +160,33 @@ namespace extension { throw std::system_error(WSAGetLastError(), std::system_category(), "WSAEventSelect() failed"); } - // Add all the information to the list and mark the list as dirty, to sync with the list of events tasks.insert(std::make_pair(event, Task{config.fd, config.events, config.reaction})); dirty.store(true, std::memory_order_release); - - bump(); }); + on, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); }); - on>().then("IO Finished", [this](const dsl::word::IOFinished& event) { - // Get the lock so we don't concurrently modify the list - const std::lock_guard lock(tasks_mutex); - - // Find the reaction that finished processing + on, Pool, Priority::HIGH, Inline::NEVER>().then("IO Finished", [this](const dsl::word::IOFinished& event) { auto it = std::find_if(tasks.begin(), tasks.end(), [&event](const std::pair& t) { return t.second.reaction->id == event.id; }); - // If we found it then clear the waiting events if (it != tasks.end()) { auto& task = it->second; - // If the events we were processing included close remove it from the list if (task.processing_events & IO::CLOSE) { dirty.store(true, std::memory_order_release); remove_task(tasks, it); } else { - // We have finished processing events task.processing_events = 0; - - // Try to fire again which will check if there are any waiting events fire_event(task); } } }); + on, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); }); - on>>().then( + on>, Pool, Priority::HIGH, Inline::NEVER>().then( "Unbind IO Reaction", [this](const dsl::operation::Unbind& unbind) { - // Lock our mutex to avoid concurrent modification - const std::lock_guard lock(tasks_mutex); - - // Find our reaction auto it = std::find_if(tasks.begin(), tasks.end(), [&unbind](const std::pair& t) { return t.second.reaction->id == unbind.id; }); @@ -219,43 +195,36 @@ namespace extension { remove_task(tasks, it); } - // Let the poll command know that stuff happened dirty.store(true, std::memory_order_release); - bump(); }); + on>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); }); - on().then("Shutdown IO Controller", [this] { + on, Priority::HIGH, Inline::NEVER>().then("Shutdown IO Controller", [this] { running.store(false, std::memory_order_release); - bump(); }); + on().then("Shutdown IO bump", [this] { bump(); }); - on().then("IO Controller", [this] { - while (running.load(std::memory_order_acquire)) { - // Rebuild the list if something changed - if (dirty.load(std::memory_order_acquire)) { - rebuild_list(); - } + on, Priority::NORMAL, Inline::NEVER>().then("IO Poll", [this] { + if (!running.load(std::memory_order_acquire)) { + return; + } - // Wait for events - DWORD event_index = 0; - /*mutex scope*/ { - const std::lock_guard lock(notifier.mutex); - event_index = WSAWaitForMultipleEvents(static_cast(watches.size()), - watches.data(), - false, - WSA_INFINITE, - false); - } + if (dirty.load(std::memory_order_acquire)) { + rebuild_list(); + } - // Check if the return value is an event in our list - if (event_index >= WSA_WAIT_EVENT_0 && event_index < WSA_WAIT_EVENT_0 + watches.size()) { - // Get the signalled event - auto& event = watches[event_index - WSA_WAIT_EVENT_0]; + const DWORD event_index = WSAWaitForMultipleEvents(static_cast(watches.size()), + watches.data(), + false, + WSA_INFINITE, + false); - // Collect the events that happened into the tasks list - process_event(event); - } + if (event_index >= WSA_WAIT_EVENT_0 && event_index < WSA_WAIT_EVENT_0 + watches.size()) { + auto& event = watches[event_index - WSA_WAIT_EVENT_0]; + process_event(event); } + + powerplant.submit(threading::ReactionTask::get_current_task()->parent->get_task()); }); }