diff --git a/lib/mixpanel-ruby/flags/local_flags_provider.rb b/lib/mixpanel-ruby/flags/local_flags_provider.rb index 9bf1bb0..fbe203e 100644 --- a/lib/mixpanel-ruby/flags/local_flags_provider.rb +++ b/lib/mixpanel-ruby/flags/local_flags_provider.rb @@ -31,7 +31,15 @@ def initialize(token, config, tracker_callback, error_handler) @flag_definitions = {} @polling_thread = nil - @stop_polling = false + # Per-thread stop signal. Each polling thread captures its own + # `[false]` cell by closure, so a subsequent start can't clear + # the previous thread's stop signal. + @polling_stop = nil + # Guards the polling-thread lifecycle. Without this, two concurrent + # start_polling_for_definitions! callers could both pass the + # !@polling_thread check before either commits and spawn duplicate + # threads — the earlier one becomes orphaned but keeps running. + @polling_mutex = Mutex.new end # Start polling for flag definitions @@ -39,29 +47,42 @@ def initialize(token, config, tracker_callback, error_handler) def start_polling_for_definitions! fetch_flag_definitions - if @config[:enable_polling] && !@polling_thread - @stop_polling = false + return unless @config[:enable_polling] + + @polling_mutex.synchronize do + return if @polling_thread + + stop_flag = [false] + @polling_stop = stop_flag @polling_thread = Thread.new do + Thread.current.name = 'mixpanel-flags-poller' loop do sleep @config[:polling_interval_in_seconds] - break if @stop_polling + break if stop_flag[0] begin fetch_flag_definitions rescue StandardError => e - @error_handler.handle(e) if @error_handler + log_polling_error(e) end end end end rescue StandardError => e - @error_handler.handle(e) if @error_handler + log_polling_error(e) end def stop_polling_for_definitions! - @stop_polling = true - @polling_thread&.join - @polling_thread = nil + thread_to_join = @polling_mutex.synchronize do + @polling_stop[0] = true if @polling_stop + @polling_stop = nil + t = @polling_thread + @polling_thread = nil + t + end + # Join outside the mutex so a long-running join can't block a + # concurrent start_polling_for_definitions! for the full poll interval. + thread_to_join&.join end def shutdown @@ -141,6 +162,19 @@ def get_all_variants(context) private + # Surface polling-loop failures unconditionally. The default + # Mixpanel::ErrorHandler is a no-op, so dispatching only via + # @error_handler swallows schema drift (NoMethodError, + # JSON::ParserError, etc.) — the loop runs forever undetected. + # Always warn so the failure is visible without an error_handler + # being configured. Matches the convention in mixpanel-python / + # mixpanel-java / mixpanel-go / mixpanel-node, all of which log + # unconditionally and keep polling. + def log_polling_error(error) + warn "[Mixpanel] Failed to fetch flag definitions: #{error.class}: #{error.message}" + @error_handler.handle(error) if @error_handler + end + def fetch_flag_definitions response = call_flags_endpoint diff --git a/spec/mixpanel-ruby/flags/local_flags_spec.rb b/spec/mixpanel-ruby/flags/local_flags_spec.rb index 6969521..040d6cf 100644 --- a/spec/mixpanel-ruby/flags/local_flags_spec.rb +++ b/spec/mixpanel-ruby/flags/local_flags_spec.rb @@ -755,5 +755,110 @@ def user_context_with_properties(properties) polling_provider.stop_polling_for_definitions! end end + + # SDK-81: concurrent start! callers must not both pass the !@polling_thread + # check and spawn duplicate threads — the earlier one becomes orphaned. + it 'is idempotent under concurrent start calls and does not leak threads' do + stub_flag_definitions([create_test_flag]) + + polling_provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + # Short interval so stop_polling's join doesn't wait long for the + # poller thread to wake from sleep. + { enable_polling: true, polling_interval_in_seconds: 0.05 }, + mock_tracker, + mock_error_handler + ) + + begin + gate = Queue.new + threads = Array.new(8) do + Thread.new do + gate.pop + polling_provider.start_polling_for_definitions! + end + end + + # Release all contenders at once to maximize the chance of a race. + 8.times { gate << :go } + threads.each(&:join) + + # Exactly one poller thread should be alive, not eight. Without + # the mutex, each contender that passed the check would spawn + # its own. Filter by name so unrelated background threads (RSpec + # workers, GC finalizers, etc.) can't perturb the count. + poller_threads = Thread.list.select { |t| t.name == 'mixpanel-flags-poller' } + expect(poller_threads.size).to eq(1) + ensure + polling_provider.stop_polling_for_definitions! + end + end + + # SDK-81: previously stop_polling_for_definitions! flipped a shared + # @stop_polling flag, so a subsequent start could reset it while the + # old thread was still asleep — the old thread would wake, see false, + # and keep polling forever. With per-thread stop signals the old + # thread breaks on its captured flag regardless of subsequent starts. + it 'stop+start does not leave the previous poller running' do + stub_flag_definitions([create_test_flag]) + + polling_provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + { enable_polling: true, polling_interval_in_seconds: 0.05 }, + mock_tracker, + mock_error_handler + ) + + begin + polling_provider.start_polling_for_definitions! + polling_provider.stop_polling_for_definitions! + polling_provider.start_polling_for_definitions! + + # Give the previous thread ample time to wake from its sleep and + # check its stop flag. With a per-thread flag it will break; with + # the old shared flag it would see the new start's `false` and + # keep going. + sleep 0.2 + + poller_threads = Thread.list.select { |t| t.name == 'mixpanel-flags-poller' && t.alive? } + expect(poller_threads.size).to eq(1) + ensure + polling_provider.stop_polling_for_definitions! + end + end + + # SDK-78: the polling loop used to dispatch errors only to @error_handler, + # whose default Mixpanel::ErrorHandler#handle is a no-op — schema drift + # (NoMethodError, JSON::ParserError, etc.) looped forever undetected. + it 'warns to stderr when fetch raises and no error_handler is configured' do + stub_request(:get, endpoint_url_regex).to_return(status: 500, body: 'server down') + + polling_provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + { enable_polling: false }, + mock_tracker, + nil + ) + + expect { polling_provider.start_polling_for_definitions! } + .to output(/\[Mixpanel\] Failed to fetch flag definitions: Mixpanel::ServerError/).to_stderr + end + + it 'surfaces unexpected errors (schema drift) instead of swallowing them silently' do + stub_flag_definitions([create_test_flag]) + + polling_provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + { enable_polling: false }, + mock_tracker, + mock_error_handler + ) + + allow(polling_provider).to receive(:fetch_flag_definitions).and_raise(NoMethodError, "undefined method `[]' for nil:NilClass") + + expect(mock_error_handler).to receive(:handle).with(an_instance_of(NoMethodError)) + expect { polling_provider.start_polling_for_definitions! } + .to output(/\[Mixpanel\] Failed to fetch flag definitions: NoMethodError/).to_stderr + end end end