Skip to content

write_env and starts_on don't preserve sequence_sender semantics #2053

@solarispika

Description

@solarispika

I was building an FSEvents wrapper as a sequence_sender_tag and wanted the consumer side to express "run this on libdispatch queue X" with a stdexec idiom. starts_on was the obvious first try, then write_env after I noticed starts_on does scheduling on top of env injection. Both fail in the same way for the same reason. Filing in case it's useful.

Tested against main @ 8c5eedd0.

The bug

When the child of stdexec::write_env(child, env) is a sequence sender, the resulting wrapper isn't. __write_env_impl inherits __sexpr_defaults and provides only __get_attrs / __get_env / __get_completion_signatures — no subscribe, no item_types, enable_sequence_sender = false. So exec::subscribe, finding no subscribe customization and enable_sequence_sender = false but __next_connectable true (the receiver is a sequence receiver), drops into the "sender as sequence of one" branch: set_next(rcvr, wrapper) is invoked exactly once and per-item type information collapses.

stdexec::starts_on(sched, child) fails the same way through a different route: it rewrites to __sequence(continues_on(just(), sched), child), and __sequence(regular, sequence) is also not a sequence sender.

The user-visible symptom: transform_each(then([](int){...})) downstream gets the lambda invoked with () instead of (int) because the wrapper's value channel collapsed to set_value_t().

Repro

#include "exec/sequence/transform_each.hpp"
#include "exec/sequence/ignore_all_values.hpp"
#include "exec/sequence_senders.hpp"
#include "exec/static_thread_pool.hpp"
#include "stdexec/execution.hpp"

struct my_seq_sender
{
    using sender_concept        = exec::sequence_sender_tag;
    using __item_t              = decltype(stdexec::just(int{}));
    using item_types            = exec::item_types<__item_t>;
    using completion_signatures = stdexec::completion_signatures<stdexec::set_value_t(),
                                                                 stdexec::set_stopped_t()>;

    template <class Rcvr>
    struct op
    {
        using operation_state_concept = stdexec::operation_state_t;
        Rcvr rcvr_;
        void start() noexcept
        {
            auto next = exec::set_next(rcvr_, stdexec::just(42));
            (void) next;
            stdexec::set_value(static_cast<Rcvr&&>(rcvr_));
        }
    };

    template <stdexec::receiver Rcvr>
    auto subscribe(Rcvr r) const -> op<Rcvr> { return {std::move(r)}; }
};

// Baseline (compiles): plain sequence_sender → transform_each.
auto baseline()
{
    return my_seq_sender{}
         | exec::transform_each(stdexec::then([](int){ /* fine */ }));
}

// Bug A (fails to compile): write_env between sequence_sender and transform_each.
auto bug_write_env()
{
    return stdexec::write_env(my_seq_sender{}, stdexec::env<>{})
         | exec::transform_each(stdexec::then([](int){ /* never sees int */ }));
}

// Bug B (fails to compile when forced through sync_wait): starts_on.
void bug_starts_on()
{
    exec::static_thread_pool pool{1};
    stdexec::sync_wait(stdexec::starts_on(pool.get_scheduler(), my_seq_sender{})
                     | exec::transform_each(stdexec::then([](int){ /* never sees int */ }))
                     | exec::ignore_all_values());
}

Build:

clang++ -std=c++20 -I/path/to/stdexec/include -c repro.cpp

Errors I see (clang 16, C++20):

  • bug_write_env: fails at __then.hpp:88deduced type '__sexpr_t<then_t, lambda, __sexpr<just()>>' does not satisfy '__well_formed_sender'. The trace runs through transform_each's get_item_types, which deduced a single just() item from the write_env wrapper rather than the original just(int).
  • bug_starts_on: fails inside __sender_concepts.hpp:159 with the readable diagnostic _FUNCTION_IS_NOT_CALLABLE_WITH_THE_GIVEN_ARGUMENTS_, _IN_ALGORITHM_ then_t, _WITH_FUNCTION_ (lambda taking int), _WITH_ARGUMENTS_ () — i.e. the lambda was asked to handle () instead of (int).

What I think the fix is, and where it gets hard

write_env is the easy half. It doesn't change scheduling, only augments the env the inner subscribe sees. Sequence-sender-aware version is mechanical: detect enable_sequence_sender<child_t>, pick a different __sexpr_impl specialization that declares sender_concept = sequence_sender_tag, forwards item_types and the appropriate completion signatures, and provides a subscribe that wraps the receiver with a __sched_env-style joined env before forwarding to child. The four completion CPOs (set_next, set_value, set_error, set_stopped) just pass through. Same write_env(snd, env) call, no API change for users.

starts_on is the harder half. The strict analogy with regular starts_on ("start on sched, then become snd"; completion happens wherever the inner sender puts it) is mostly useless for sequences — for a push-driven sequence like FSEvents, items come from the source's own thread regardless of where subscribe() ran. So the defensible default is "all items run on sched". But that still leaves a real implementation choice with different cost profiles:

  • Push: rewrite as write_env(seq, env_with_get_scheduler=sched) | transform_each(continues_on(_, sched)) (plus wrapping the outer sequence completion). Strong guarantee: every item completes on sched even if the source ignores the env. Cost: an extra hop per item even when the source is already on sched.
  • Pull: stop at write_env(seq, env_with_get_scheduler=sched) and rely on the source to read get_scheduler and dispatch accordingly. Zero overhead when honored, no effect when ignored.

For my use case (FSEvents) pull is right and free — the wrapper's receiver dispatches each item via libdispatch from the env-supplied scheduler. Other sources may want push. I'd suggest fixing write_env first (that gives users pull explicitly) and treating starts_on over a sequence as a separate decision once there's consensus on which guarantee is canonical.

Workaround in user code

About 80 lines of custom adapter. Sketch:

template <class Snd, class Sched>
struct on_queue_sender {
    using sender_concept = exec::sequence_sender_tag;
    using item_types = exec::__item_types_of_t<Snd>;
    using completion_signatures = stdexec::__completion_signatures_of_t<Snd>;

    template <stdexec::receiver R>
    auto subscribe(R r) && {
        return exec::subscribe(std::move(snd_),
                               wrap_rcvr<R, Sched>{std::move(r), sched_});
    }
    Snd snd_; Sched sched_;
};

wrap_rcvr::get_env() returns a joined env exposing get_scheduler -> Sched. One thing that bit me: stdexec::prop{stdexec::get_scheduler, sched} returns _Value const& from query, which fails any same_as<...> constraint in the wrapped sender's subscribe template. I had to write a small env fragment that returns by value. Worth knowing if anyone else hits this.

Full standalone adapter (~85 lines)
// on_queue: env-injection adapter that exposes a scheduler via
// get_scheduler in the receiver env. Needed because stdexec::starts_on
// rewrites a sequence_sender child via the regular-sender path
// (__sequence(continues_on(just(), sched), child)) and loses item_types.

template <class _Sched>
struct __sched_prop
{
  _Sched __sched_;

  [[nodiscard]]
  constexpr auto query(stdexec::get_scheduler_t) const noexcept -> _Sched
  {
    return __sched_;
  }
};

template <class _Rcvr, class _Sched>
struct __on_queue_rcvr
{
  using receiver_concept = stdexec::receiver_tag;

  _Rcvr  __rcvr_;
  _Sched __sched_;

  [[nodiscard]]
  auto get_env() const noexcept
  {
    return stdexec::env{__sched_prop<_Sched>{__sched_}, stdexec::get_env(__rcvr_)};
  }

  template <class _Item>
  auto set_next(_Item&& __item) -> exec::next_sender_of_t<_Rcvr, _Item>
  {
    return exec::set_next(__rcvr_, static_cast<_Item&&>(__item));
  }

  void set_value() noexcept
  {
    stdexec::set_value(static_cast<_Rcvr&&>(__rcvr_));
  }

  void set_stopped() noexcept
  {
    stdexec::set_stopped(static_cast<_Rcvr&&>(__rcvr_));
  }

  template <class _E>
  void set_error(_E&& __e) noexcept
  {
    stdexec::set_error(static_cast<_Rcvr&&>(__rcvr_), static_cast<_E&&>(__e));
  }
};

template <class _Snd, class _Sched>
struct __on_queue_sender
{
  using sender_concept        = exec::sequence_sender_tag;
  using item_types            = exec::__item_types_of_t<_Snd>;
  using completion_signatures = stdexec::__completion_signatures_of_t<_Snd>;

  _Snd   __snd_;
  _Sched __sched_;

  template <stdexec::receiver _Rcvr>
  auto
  subscribe(_Rcvr __rcvr) && -> exec::subscribe_result_t<_Snd, __on_queue_rcvr<_Rcvr, _Sched>>
  {
    return exec::subscribe(static_cast<_Snd&&>(__snd_),
                           __on_queue_rcvr<_Rcvr, _Sched>{std::move(__rcvr),
                                                          std::move(__sched_)});
  }
};

struct __on_queue_t
{
  template <stdexec::scheduler _Sched, class _Snd>
  auto operator()(_Sched __sched, _Snd __snd) const -> __on_queue_sender<_Snd, _Sched>
  {
    return {std::move(__snd), std::move(__sched)};
  }
};

inline constexpr __on_queue_t on_queue{};

Environment

stdexec @ main 8c5eedd0 (verified against this exact commit).
clang 16, C++20, macOS arm64. Pure template-instantiation issue; reproduces independent of platform.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions