Skip to content
Open
11 changes: 11 additions & 0 deletions docs/explanation/scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,17 @@ When the main thread pool exits (after shutdown), pools are stopped in order —

Persistent pools (`ThreadPoolDescriptor::persistent`) continue accepting tasks during a normal shutdown so networking or logging reactors can finish in-flight work.

### Blocking pools and priority ordering

Some extensions use a single-consumer pool with a blocking wait in the worker thread.
For example, IOController blocks in `::poll()` or `WSAWaitForMultipleEvents()`.

Priority does not preempt inside the blocking call.
Instead, cross-thread control work is enqueued at HIGH priority on the same pool.
An inline bump reaction writes the notify pipe or event from the emitting thread.
The poll task finishes its iteration and resubmits at NORMAL priority.
The worker then dequeues the already-queued HIGH task before the resubmitted poll.

## Design tradeoffs

| Choice | Rationale |
Expand Down
10 changes: 6 additions & 4 deletions docs/reference/extensions/built-in-extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,14 @@ Monitors file descriptors for IO readiness events and triggers corresponding rea

**Implementation:**

Uses platform-native polling mechanisms:
Uses platform-native polling mechanisms on a dedicated `IOController` pool (`concurrency = 1`, MPSC):

- **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays
- **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles
- **POSIX (Linux/macOS):** `poll()` with `pollfd` arrays.
A NORMAL-priority poll task blocks in `::poll()` and resubmits itself after each iteration.
Control handlers (`IOConfiguration`, `IOFinished`, `Unbind`, `Shutdown`) run at HIGH priority on the same pool.
Each control handler is followed by an `Inline::ALWAYS` bump reaction that writes the notify pipe from the emitting thread.
- **Windows:** `WSAWaitForMultipleEvents` with `WSAEVENT` handles, using the same scheduler-driven pool and priority model.

A dedicated thread blocks on the polling call.
When events are detected on registered file descriptors, the controller creates tasks for the corresponding reactions, passing the event flags through `ThreadStore` so the `get` method can report which specific events occurred.

**Key internals:**
Expand Down
7 changes: 3 additions & 4 deletions src/dsl/word/IO.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "../store/ThreadStore.hpp"
#include "../trait/is_transient.hpp"
#include "Single.hpp"
#include "emit/Inline.hpp"

namespace NUClear {
namespace dsl {
Expand Down Expand Up @@ -132,13 +131,13 @@ namespace dsl {
static void bind(const std::shared_ptr<threading::Reaction>& reaction, fd_t fd, event_t watch_set) {

reaction->unbinders.emplace_back([](const threading::Reaction& r) {
r.reactor.emit<emit::Inline>(std::make_unique<operation::Unbind<IO>>(r.id));
r.reactor.emit(std::make_unique<operation::Unbind<IO>>(r.id));
});

auto io_config = std::make_unique<IOConfiguration>(fd, watch_set, reaction);

// Send our configuration out
reaction->reactor.emit<emit::Inline>(io_config);
reaction->reactor.emit(io_config);
}

template <typename DSL>
Expand All @@ -155,7 +154,7 @@ namespace dsl {

template <typename DSL>
static void post_run(threading::ReactionTask& task) {
task.parent->reactor.emit<emit::Inline>(std::make_unique<IOFinished>(task.parent->id));
task.parent->reactor.emit(std::make_unique<IOFinished>(task.parent->id));
}
};

Expand Down
20 changes: 10 additions & 10 deletions src/extension/IOController.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,14 @@ namespace extension {
using tasks_t = std::map<WSAEVENT, Task>;
struct notifier_t {
WSAEVENT notifier{WSA_INVALID_EVENT}; ///< This is the event that is waited on by WSAWaitForMultipleEvents
std::mutex mutex; ///< This mutex is used to ensure that wait has woken up
};
#else
using event_t = decltype(pollfd::events);
using watcher_t = pollfd;
using tasks_t = std::vector<Task>;
struct notifier_t {
fd_t recv{-1}; ///< This is the file descriptor that is waited on by poll
fd_t send{-1}; ///< This is the file descriptor that is written to to wake up the poll command
std::mutex mutex; ///< This mutex is used to ensure that a write to poll has worked
/// Armed by NotifierWakeGuard during the wake-then-lock handoff; checked under mutex before ::poll().
std::atomic<bool> wake_requested{false};
fd_t recv{-1}; ///< This is the file descriptor that is waited on by poll
fd_t send{-1}; ///< This is the file descriptor that is written to to wake up the poll command
};
#endif

Expand Down Expand Up @@ -95,6 +91,13 @@ namespace extension {
};

private:
/// Dedicated single-consumer pool for scheduler-driven IO polling.
struct IOPool {
static constexpr const char* name = "IOController";
static constexpr int concurrency = 1;
static constexpr bool counts_for_idle = false;
};

/**
* Rebuilds the list of file descriptors to poll.
*
Expand All @@ -121,8 +124,7 @@ namespace extension {
* If the poll command is waiting it will wait forever if something doesn't happen.
* When trying to update what to poll or shut down we need to wake it up so it can.
*/
// NOLINTNEXTLINE(readability-make-member-function-const) this changes states
void bump();
void bump() const;

public:
explicit IOController(std::unique_ptr<NUClear::Environment> environment);
Expand All @@ -133,8 +135,6 @@ namespace extension {
/// If the IOController should continue running
std::atomic<bool> running{true};

/// The mutex that protects the tasks list
std::mutex tasks_mutex;
/// Whether or not the list of file descriptors is dirty compared to tasks
std::atomic<bool> dirty{true};
/// The list of events that are being watched
Expand Down
187 changes: 44 additions & 143 deletions src/extension/IOController_Posix.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -22,78 +22,12 @@

#include "IOController.hpp"

#include "../threading/ReactionTask.hpp"

namespace NUClear {
namespace extension {

namespace {

/**
* RAII wake-then-lock handoff for the poll notifier pipe.
*
* Arms wake_requested, writes the notify pipe, then (via lock()) acquires notifier.mutex
* and clears wake_requested so the poll thread cannot re-enter ::poll() mid-handoff.
*/
class NotifierWakeGuard {
public:
explicit NotifierWakeGuard(IOController::notifier_t& notifier) : notifier_(notifier) {
notifier_.wake_requested.store(true, std::memory_order_release);
}

NotifierWakeGuard(const NotifierWakeGuard&) = delete;
NotifierWakeGuard& operator=(const NotifierWakeGuard&) = delete;
NotifierWakeGuard(NotifierWakeGuard&&) = delete;
NotifierWakeGuard& operator=(NotifierWakeGuard&&) = delete;

void signal() const {
uint8_t val = 1;
if (::write(notifier_.send, &val, sizeof(val)) < 0) {
throw std::system_error(network_errno,
std::system_category(),
"There was an error while writing to the notification pipe");
}
}

/// Acquire notifier mutex and clear wake_requested for the handoff to poll.
std::unique_lock<std::mutex> lock() {
cleared_ = true;
std::unique_lock<std::mutex> l(notifier_.mutex);
notifier_.wake_requested.store(false, std::memory_order_release);
return l;
}

~NotifierWakeGuard() {
if (!cleared_) {
notifier_.wake_requested.store(false, std::memory_order_release);
}
}

private:
IOController::notifier_t& notifier_;
bool cleared_{false};
};

/// Holds notifier.mutex while the poll thread decides whether to enter ::poll().
class NotifierPollScope {
public:
explicit NotifierPollScope(IOController::notifier_t& notifier)
: lock_(notifier.mutex)
, notifier_(notifier) {}

bool wake_pending() const {
return notifier_.wake_requested.load(std::memory_order_acquire);
}

private:
std::lock_guard<std::mutex> lock_;
IOController::notifier_t& notifier_;
};

} // namespace

void IOController::rebuild_list() {
// Get the lock so we don't concurrently modify the list
const std::lock_guard<std::mutex> lock(tasks_mutex);

// Clear our fds to be rebuilt
watches.resize(0);

Expand Down Expand Up @@ -211,11 +145,23 @@ namespace extension {
event.revents = 0;
}

void IOController::bump() {
NotifierWakeGuard wake(notifier);
wake.signal();
// Locking here will ensure we won't return until poll is not running
const auto lock = wake.lock();
void IOController::bump() const {
pollfd pfd{notifier.recv, POLLIN, 0};
if (::poll(&pfd, 1, 0) > 0 && (pfd.revents & POLLIN) != 0) {
return;
}

uint8_t val = 1;
ssize_t written = 0;
do {
written = ::write(notifier.send, &val, sizeof(val));
} while (written < 0 && network_errno == EINTR);

if (written < 0) {
throw std::system_error(network_errno,
std::system_category(),
"There was an error while writing to the notification pipe");
}
}

IOController::IOController(std::unique_ptr<NUClear::Environment> environment) : Reactor(std::move(environment)) {
Expand All @@ -233,51 +179,28 @@ namespace extension {
// Start by rebuilding the list
rebuild_list();

on<Trigger<dsl::word::IOConfiguration>>().then(
on<Trigger<dsl::word::IOConfiguration>, Pool<IOPool>, Priority::HIGH, Inline::NEVER>().then(
"Configure IO Reaction",
[this](const dsl::word::IOConfiguration& config) {
// Lock our mutex to avoid concurrent modification
const std::lock_guard<std::mutex> lock(tasks_mutex);

// NOLINTNEXTLINE(google-runtime-int)
tasks.emplace_back(config.fd, event_t(config.events), config.reaction);

// Resort our list
std::sort(tasks.begin(), tasks.end());

// Let the poll command know that stuff happened
dirty.store(true, std::memory_order_release);
bump();
});
on<Trigger<dsl::word::IOConfiguration>, Inline::ALWAYS>().then("Configure IO bump", [this] { bump(); });

on<Trigger<dsl::word::IOFinished>>().then("IO Finished", [this](const dsl::word::IOFinished& event) {
// Get the lock so we don't concurrently modify the list
const std::lock_guard<std::mutex> lock(tasks_mutex);

// Find the reaction that finished processing
on<Trigger<dsl::word::IOFinished>, Pool<IOPool>, Priority::HIGH, Inline::NEVER>().then("IO Finished", [this](const dsl::word::IOFinished& event) {
auto task = std::find_if(tasks.begin(), tasks.end(), [&event](const Task& t) {
return t.reaction->id == event.id;
});

if (task != tasks.end()) {
// If the events we were processing included close remove it from the list
if ((task->processing_events & IO::CLOSE) != 0) {
dirty.store(true, std::memory_order_release);
tasks.erase(task);
}
else {
// We are about to mutate `watches[].events`, which the poll thread reads
// from inside ::poll(). Write to the notify pipe to kick poll out, then
// hold notifier.mutex for the duration of the mutation so the poll thread
// cannot re-enter ::poll() against a half-updated entry. This is the same
// wake-then-lock pattern bump() uses, but we keep the lock held until the
// watches update (and the follow-up fire_event, which can also touch
// watches[].events) is finished.
NotifierWakeGuard wake(notifier);
wake.signal();
const auto notifier_lock = wake.lock();

// Unmask the events that were just processed
auto it = std::lower_bound(watches.begin(),
watches.end(),
task->fd,
Expand All @@ -286,22 +209,16 @@ namespace extension {
it->events = event_t(it->events | task->processing_events);
}

// No longer processing events
task->processing_events = 0;

// Try to fire again which will check if there are any waiting events
fire_event(*task);
}
}
});
on<Trigger<dsl::word::IOFinished>, Inline::ALWAYS>().then("IO Finished bump", [this] { bump(); });

on<Trigger<dsl::operation::Unbind<IO>>>().then(
on<Trigger<dsl::operation::Unbind<IO>>, Pool<IOPool>, Priority::HIGH, Inline::NEVER>().then(
"Unbind IO Reaction",
[this](const dsl::operation::Unbind<IO>& unbind) {
// Lock our mutex to avoid concurrent modification
const std::lock_guard<std::mutex> lock(tasks_mutex);

// Find our reaction
auto reaction = std::find_if(tasks.begin(), tasks.end(), [&unbind](const Task& t) {
return t.reaction->id == unbind.id;
});
Expand All @@ -310,53 +227,37 @@ namespace extension {
tasks.erase(reaction);
}

// Let the poll command know that stuff happened
dirty.store(true, std::memory_order_release);
bump();
});
on<Trigger<dsl::operation::Unbind<IO>>, Inline::ALWAYS>().then("Unbind IO bump", [this] { bump(); });

on<Shutdown>().then("Shutdown IO Controller", [this] {
on<Shutdown, Pool<IOPool>, Priority::HIGH, Inline::NEVER>().then("Shutdown IO Controller", [this] {
running.store(false, std::memory_order_release);
bump();
});
on<Shutdown, Inline::ALWAYS>().then("Shutdown IO bump", [this] { bump(); });

on<Always>().then("IO Controller", [this] {
// Stay in this reaction to improve the performance without going back/fourth between reactions
if (running.load(std::memory_order_acquire)) {
// Rebuild the list if something changed
if (dirty.load(std::memory_order_acquire)) {
rebuild_list();
}

// Wait for an event to happen on one of our file descriptors
bool polled = false;
/* mutex scope */ {
const NotifierPollScope poll(notifier);
if (!poll.wake_pending()) {
if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) {
throw std::system_error(network_errno,
std::system_category(),
"There was an IO error while attempting to poll the file descriptors");
}
polled = true;
}
}
on<Startup, Pool<IOPool>, Priority::NORMAL, Inline::NEVER>().then("IO Poll", [this] {
if (!running.load(std::memory_order_acquire)) {
return;
}

if (!polled) {
return;
}
if (dirty.load(std::memory_order_acquire)) {
rebuild_list();
}

// Get the lock so we don't concurrently modify the list
const std::lock_guard<std::mutex> lock(tasks_mutex);
for (auto& fd : watches) {
if (::poll(watches.data(), nfds_t(watches.size()), -1) < 0) {
throw std::system_error(network_errno,
std::system_category(),
"There was an IO error while attempting to poll the file descriptors");
}

// Collect the events that happened into the tasks list
// Something happened
if (fd.revents != 0) {
process_event(fd);
}
for (auto& fd : watches) {
if (fd.revents != 0) {
process_event(fd);
}
}

powerplant.submit(threading::ReactionTask::get_current_task()->parent->get_task());
});
}

Expand Down
Loading
Loading