From 205e4bee0770823309bd0b36884b4ab2361e0eb9 Mon Sep 17 00:00:00 2001 From: Tyler Roach Date: Mon, 29 Jun 2026 13:42:21 -0400 Subject: [PATCH 1/3] fix(flags): guard polling start so concurrent callers don't leak executors startPollingForDefinitions had no lock around the pollingExecutor create-and-assign. Two concurrent callers would each allocate a fresh ScheduledExecutorService and the earlier one's worker thread + queue would leak (still alive, still scheduled to poll, no way to shut it down because the field had been overwritten). Guard the executor lifecycle with a private lock and bail early if a poller is already scheduled. Snapshot the executor under the lock in stop, then run the (potentially blocking) shutdown / awaitTermination outside the lock so a long-running shutdown can't block a concurrent start for 5s. Linear: SDK-85 Co-Authored-By: Claude Opus 4.7 --- .../flags/local_flags_provider.rb | 23 ++++++++++-- spec/mixpanel-ruby/flags/local_flags_spec.rb | 37 +++++++++++++++++++ 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/lib/mixpanel-ruby/flags/local_flags_provider.rb b/lib/mixpanel-ruby/flags/local_flags_provider.rb index 9bf1bb0..686a733 100644 --- a/lib/mixpanel-ruby/flags/local_flags_provider.rb +++ b/lib/mixpanel-ruby/flags/local_flags_provider.rb @@ -32,6 +32,11 @@ def initialize(token, config, tracker_callback, error_handler) @flag_definitions = {} @polling_thread = nil @stop_polling = false + # Guards the polling-thread lifecycle. Without this, two concurrent + # start_polling_for_definitions! callers could both pass the + # !@polling_thread check before either commits and spawn duplicate + # threads — the earlier one becomes orphaned but keeps running. + @polling_mutex = Mutex.new end # Start polling for flag definitions @@ -39,7 +44,11 @@ def initialize(token, config, tracker_callback, error_handler) def start_polling_for_definitions! fetch_flag_definitions - if @config[:enable_polling] && !@polling_thread + return unless @config[:enable_polling] + + @polling_mutex.synchronize do + return if @polling_thread + @stop_polling = false @polling_thread = Thread.new do loop do @@ -59,9 +68,15 @@ def start_polling_for_definitions! end def stop_polling_for_definitions! - @stop_polling = true - @polling_thread&.join - @polling_thread = nil + thread_to_join = @polling_mutex.synchronize do + @stop_polling = true + t = @polling_thread + @polling_thread = nil + t + end + # Join outside the mutex so a long-running join can't block a + # concurrent start_polling_for_definitions! for the full poll interval. + thread_to_join&.join end def shutdown diff --git a/spec/mixpanel-ruby/flags/local_flags_spec.rb b/spec/mixpanel-ruby/flags/local_flags_spec.rb index 6969521..f6279b4 100644 --- a/spec/mixpanel-ruby/flags/local_flags_spec.rb +++ b/spec/mixpanel-ruby/flags/local_flags_spec.rb @@ -755,5 +755,42 @@ def user_context_with_properties(properties) polling_provider.stop_polling_for_definitions! end end + + # SDK-85: concurrent start! callers must not both pass the !@polling_thread + # check and spawn duplicate threads — the earlier one becomes orphaned. + it 'is idempotent under concurrent start calls and does not leak threads' do + stub_flag_definitions([create_test_flag]) + + polling_provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + # Short interval so stop_polling's join doesn't wait long for the + # poller thread to wake from sleep. + { enable_polling: true, polling_interval_in_seconds: 0.05 }, + mock_tracker, + mock_error_handler + ) + + begin + baseline = Thread.list.size + gate = Queue.new + threads = Array.new(8) do + Thread.new do + gate.pop + polling_provider.start_polling_for_definitions! + end + end + + # Release all contenders at once to maximize the chance of a race. + 8.times { gate << :go } + threads.each(&:join) + + # Exactly one new background thread (the poller) should be alive, + # not eight. Without the mutex, each contender that passed the + # check would spawn its own. + expect(Thread.list.size - baseline).to eq(1) + ensure + polling_provider.stop_polling_for_definitions! + end + end end end From 18d2131306fe0f0c892f401ecaf642023bf0924a Mon Sep 17 00:00:00 2001 From: Tyler Roach Date: Mon, 29 Jun 2026 14:33:37 -0400 Subject: [PATCH 2/3] fix(analytics): surface polling-loop errors so schema drift can't loop silently MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit start_polling_for_definitions! caught StandardError and dispatched only to @error_handler (whose default ErrorHandler#handle is a no-op), so NoMethodError / JSON::ParserError / etc. from schema drift would loop forever undetected unless the user had configured a custom handler. Add log_polling_error which always warns to STDERR before dispatching to @error_handler — visibility no longer depends on configuration. Matches the convention in mixpanel-python, mixpanel-java, mixpanel-go, and mixpanel-node, all of which log unconditionally and continue polling. Linear: SDK-78 Co-Authored-By: Claude Opus 4.7 --- .../flags/local_flags_provider.rb | 17 +++++++-- spec/mixpanel-ruby/flags/local_flags_spec.rb | 36 ++++++++++++++++++- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/lib/mixpanel-ruby/flags/local_flags_provider.rb b/lib/mixpanel-ruby/flags/local_flags_provider.rb index 686a733..57ac6fc 100644 --- a/lib/mixpanel-ruby/flags/local_flags_provider.rb +++ b/lib/mixpanel-ruby/flags/local_flags_provider.rb @@ -58,13 +58,13 @@ def start_polling_for_definitions! begin fetch_flag_definitions rescue StandardError => e - @error_handler.handle(e) if @error_handler + log_polling_error(e) end end end end rescue StandardError => e - @error_handler.handle(e) if @error_handler + log_polling_error(e) end def stop_polling_for_definitions! @@ -156,6 +156,19 @@ def get_all_variants(context) private + # Surface polling-loop failures unconditionally. The default + # Mixpanel::ErrorHandler is a no-op, so dispatching only via + # @error_handler swallows schema drift (NoMethodError, + # JSON::ParserError, etc.) — the loop runs forever undetected. + # Always warn so the failure is visible without an error_handler + # being configured. Matches the convention in mixpanel-python / + # mixpanel-java / mixpanel-go / mixpanel-node, all of which log + # unconditionally and keep polling. + def log_polling_error(error) + warn "[Mixpanel] Failed to fetch flag definitions: #{error.class}: #{error.message}" + @error_handler.handle(error) if @error_handler + end + def fetch_flag_definitions response = call_flags_endpoint diff --git a/spec/mixpanel-ruby/flags/local_flags_spec.rb b/spec/mixpanel-ruby/flags/local_flags_spec.rb index f6279b4..bdd88de 100644 --- a/spec/mixpanel-ruby/flags/local_flags_spec.rb +++ b/spec/mixpanel-ruby/flags/local_flags_spec.rb @@ -756,7 +756,7 @@ def user_context_with_properties(properties) end end - # SDK-85: concurrent start! callers must not both pass the !@polling_thread + # SDK-81: concurrent start! callers must not both pass the !@polling_thread # check and spawn duplicate threads — the earlier one becomes orphaned. it 'is idempotent under concurrent start calls and does not leak threads' do stub_flag_definitions([create_test_flag]) @@ -792,5 +792,39 @@ def user_context_with_properties(properties) polling_provider.stop_polling_for_definitions! end end + + # SDK-78: the polling loop used to dispatch errors only to @error_handler, + # whose default Mixpanel::ErrorHandler#handle is a no-op — schema drift + # (NoMethodError, JSON::ParserError, etc.) looped forever undetected. + it 'warns to stderr when fetch raises and no error_handler is configured' do + stub_request(:get, endpoint_url_regex).to_return(status: 500, body: 'server down') + + polling_provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + { enable_polling: false }, + mock_tracker, + nil + ) + + expect { polling_provider.start_polling_for_definitions! } + .to output(/\[Mixpanel\] Failed to fetch flag definitions: Mixpanel::ServerError/).to_stderr + end + + it 'surfaces unexpected errors (schema drift) instead of swallowing them silently' do + stub_flag_definitions([create_test_flag]) + + polling_provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + { enable_polling: false }, + mock_tracker, + mock_error_handler + ) + + allow(polling_provider).to receive(:fetch_flag_definitions).and_raise(NoMethodError, "undefined method `[]' for nil:NilClass") + + expect(mock_error_handler).to receive(:handle).with(an_instance_of(NoMethodError)) + expect { polling_provider.start_polling_for_definitions! } + .to output(/\[Mixpanel\] Failed to fetch flag definitions: NoMethodError/).to_stderr + end end end From d8682c5305c1a17d9613fa183fad3d8fc1821b9b Mon Sep 17 00:00:00 2001 From: Tyler Roach Date: Thu, 2 Jul 2026 12:03:23 -0400 Subject: [PATCH 3/3] fix(flags): per-thread stop signal so stop+start can't zombie the old poller stop_polling_for_definitions! flipped a shared @stop_polling boolean. A concurrent start_polling_for_definitions! could then reset it to false before the old thread woke from its sleep, leaving the old poller running indefinitely and orphaned (no longer tracked by @polling_thread, so no future stop could join it). Each polling thread now captures its own [false] cell by closure. A subsequent start allocates a fresh cell, so the previous thread still observes true and exits on its next wake. Also name the polling thread ('mixpanel-flags-poller') so the concurrent- start regression test can count our threads by name instead of diffing Thread.list.size, which is fragile under other background threads in the same process. --- .../flags/local_flags_provider.rb | 14 ++++-- spec/mixpanel-ruby/flags/local_flags_spec.rb | 44 ++++++++++++++++--- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/lib/mixpanel-ruby/flags/local_flags_provider.rb b/lib/mixpanel-ruby/flags/local_flags_provider.rb index 57ac6fc..fbe203e 100644 --- a/lib/mixpanel-ruby/flags/local_flags_provider.rb +++ b/lib/mixpanel-ruby/flags/local_flags_provider.rb @@ -31,7 +31,10 @@ def initialize(token, config, tracker_callback, error_handler) @flag_definitions = {} @polling_thread = nil - @stop_polling = false + # Per-thread stop signal. Each polling thread captures its own + # `[false]` cell by closure, so a subsequent start can't clear + # the previous thread's stop signal. + @polling_stop = nil # Guards the polling-thread lifecycle. Without this, two concurrent # start_polling_for_definitions! callers could both pass the # !@polling_thread check before either commits and spawn duplicate @@ -49,11 +52,13 @@ def start_polling_for_definitions! @polling_mutex.synchronize do return if @polling_thread - @stop_polling = false + stop_flag = [false] + @polling_stop = stop_flag @polling_thread = Thread.new do + Thread.current.name = 'mixpanel-flags-poller' loop do sleep @config[:polling_interval_in_seconds] - break if @stop_polling + break if stop_flag[0] begin fetch_flag_definitions @@ -69,7 +74,8 @@ def start_polling_for_definitions! def stop_polling_for_definitions! thread_to_join = @polling_mutex.synchronize do - @stop_polling = true + @polling_stop[0] = true if @polling_stop + @polling_stop = nil t = @polling_thread @polling_thread = nil t diff --git a/spec/mixpanel-ruby/flags/local_flags_spec.rb b/spec/mixpanel-ruby/flags/local_flags_spec.rb index bdd88de..040d6cf 100644 --- a/spec/mixpanel-ruby/flags/local_flags_spec.rb +++ b/spec/mixpanel-ruby/flags/local_flags_spec.rb @@ -771,7 +771,6 @@ def user_context_with_properties(properties) ) begin - baseline = Thread.list.size gate = Queue.new threads = Array.new(8) do Thread.new do @@ -784,10 +783,45 @@ def user_context_with_properties(properties) 8.times { gate << :go } threads.each(&:join) - # Exactly one new background thread (the poller) should be alive, - # not eight. Without the mutex, each contender that passed the - # check would spawn its own. - expect(Thread.list.size - baseline).to eq(1) + # Exactly one poller thread should be alive, not eight. Without + # the mutex, each contender that passed the check would spawn + # its own. Filter by name so unrelated background threads (RSpec + # workers, GC finalizers, etc.) can't perturb the count. + poller_threads = Thread.list.select { |t| t.name == 'mixpanel-flags-poller' } + expect(poller_threads.size).to eq(1) + ensure + polling_provider.stop_polling_for_definitions! + end + end + + # SDK-81: previously stop_polling_for_definitions! flipped a shared + # @stop_polling flag, so a subsequent start could reset it while the + # old thread was still asleep — the old thread would wake, see false, + # and keep polling forever. With per-thread stop signals the old + # thread breaks on its captured flag regardless of subsequent starts. + it 'stop+start does not leave the previous poller running' do + stub_flag_definitions([create_test_flag]) + + polling_provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + { enable_polling: true, polling_interval_in_seconds: 0.05 }, + mock_tracker, + mock_error_handler + ) + + begin + polling_provider.start_polling_for_definitions! + polling_provider.stop_polling_for_definitions! + polling_provider.start_polling_for_definitions! + + # Give the previous thread ample time to wake from its sleep and + # check its stop flag. With a per-thread flag it will break; with + # the old shared flag it would see the new start's `false` and + # keep going. + sleep 0.2 + + poller_threads = Thread.list.select { |t| t.name == 'mixpanel-flags-poller' && t.alive? } + expect(poller_threads.size).to eq(1) ensure polling_provider.stop_polling_for_definitions! end