Skip to content

[fix] functions: Run worker leader-election off the consumer event-listener thread#26059

Open
merlimat wants to merge 1 commit into
apache:masterfrom
merlimat:mmerli/fix-functions-leaderservice-listener-offload
Open

[fix] functions: Run worker leader-election off the consumer event-listener thread#26059
merlimat wants to merge 1 commit into
apache:masterfrom
merlimat:mmerli/fix-functions-leaderservice-listener-offload

Conversation

@merlimat

Copy link
Copy Markdown
Contributor

Motivation

LeaderService implements ConsumerEventListener for the failover-subscription leader election. Its becameActive / becameInactive callbacks ran the full blocking leader-election routine inline:

  • functionMetaDataManager / functionRuntimeManager getIsInitialized().get() (unbounded),
  • schedulerManager / functionMetaDataManager acquireExclusiveWrite(...) — a retry loop doing createAsync().get(10s) + Thread.sleep while still leader,
  • functionAssignmentTailer.triggerReadToTheEndAndExit().get(),
  • and on becameInactive, schedulerManager.close() (which blocks on schedulerLock for an in-flight schedule).

These callbacks are dispatched by the Pulsar client on its shared consumer-listener executor (ConsumerImpl.activeConsumerChangedexternalPinnedExecutor), so this potentially-unbounded leader handshake blocked a thread shared by all consumers/readers on that client — including the worker's own tailers.

Modifications

Run becameActive / becameInactive on a dedicated single-threaded executor and return from the listener callbacks immediately. The single thread preserves the becameActivebecameInactive ordering. The callback bodies are unchanged, moved into becameActiveInternal / becameInactiveInternal. close() shuts the executor down. A @VisibleForTesting joinPendingEventTasks() barrier keeps LeaderServiceTest deterministic.

Verifying this change

Covered by the existing LeaderServiceTest (updated to await the offloaded tasks via the barrier) — 4/4 pass.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

…stener thread

LeaderService implements ConsumerEventListener for the failover-subscription
leader election. Its becameActive/becameInactive callbacks ran the full blocking
leader-election routine inline: functionMetaDataManager/functionRuntimeManager
getIsInitialized().get(), schedulerManager/functionMetaDataManager
acquireExclusiveWrite (a retry loop doing createAsync().get(10s) + Thread.sleep
while still leader), functionAssignmentTailer.triggerReadToTheEndAndExit().get(),
and on becameInactive schedulerManager.close() (which blocks on schedulerLock for
an in-flight schedule). These callbacks are dispatched by the Pulsar client on
its shared consumer-listener executor (ConsumerImpl.activeConsumerChanged ->
externalPinnedExecutor), so this potentially-unbounded leader handshake blocked a
thread shared by all consumers/readers on that client, including the worker's own
tailers.

Run becameActive/becameInactive on a dedicated single-threaded executor and
return from the listener callbacks immediately. The single thread preserves the
becameActive -> becameInactive ordering. The callback bodies are unchanged, moved
into becameActiveInternal/becameInactiveInternal. close() shuts the executor
down. A @VisibleForTesting joinPendingEventTasks() barrier keeps LeaderServiceTest
deterministic.
@merlimat merlimat changed the title [fix][functions] Run worker leader-election off the consumer event-listener thread [fix] functions: Run worker leader-election off the consumer event-listener thread Jun 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant