Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions lib/mixpanel-ruby/flags/local_flags_provider.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,37 +31,58 @@ def initialize(token, config, tracker_callback, error_handler)

@flag_definitions = {}
@polling_thread = nil
@stop_polling = false
# Per-thread stop signal. Each polling thread captures its own
# `[false]` cell by closure, so a subsequent start can't clear
# the previous thread's stop signal.
@polling_stop = nil
# Guards the polling-thread lifecycle. Without this, two concurrent
# start_polling_for_definitions! callers could both pass the
# !@polling_thread check before either commits and spawn duplicate
# threads — the earlier one becomes orphaned but keeps running.
@polling_mutex = Mutex.new
end

# Start polling for flag definitions
# Fetches immediately, then at regular intervals if polling enabled
def start_polling_for_definitions!
fetch_flag_definitions

if @config[:enable_polling] && !@polling_thread
@stop_polling = false
return unless @config[:enable_polling]

@polling_mutex.synchronize do
return if @polling_thread

stop_flag = [false]
@polling_stop = stop_flag
@polling_thread = Thread.new do
Thread.current.name = 'mixpanel-flags-poller'
loop do
sleep @config[:polling_interval_in_seconds]
break if @stop_polling
break if stop_flag[0]

begin
fetch_flag_definitions
rescue StandardError => e
@error_handler.handle(e) if @error_handler
log_polling_error(e)
end
end
end
end
Comment thread
tylerjroach marked this conversation as resolved.
rescue StandardError => e
@error_handler.handle(e) if @error_handler
log_polling_error(e)
end

def stop_polling_for_definitions!
@stop_polling = true
@polling_thread&.join
@polling_thread = nil
thread_to_join = @polling_mutex.synchronize do
@polling_stop[0] = true if @polling_stop
@polling_stop = nil
t = @polling_thread
@polling_thread = nil
t
end
# Join outside the mutex so a long-running join can't block a
# concurrent start_polling_for_definitions! for the full poll interval.
thread_to_join&.join
end

def shutdown
Expand Down Expand Up @@ -141,6 +162,19 @@ def get_all_variants(context)

private

# Surface polling-loop failures unconditionally. The default
# Mixpanel::ErrorHandler is a no-op, so dispatching only via
# @error_handler swallows schema drift (NoMethodError,
# JSON::ParserError, etc.) — the loop runs forever undetected.
# Always warn so the failure is visible without an error_handler
# being configured. Matches the convention in mixpanel-python /
# mixpanel-java / mixpanel-go / mixpanel-node, all of which log
# unconditionally and keep polling.
def log_polling_error(error)
warn "[Mixpanel] Failed to fetch flag definitions: #{error.class}: #{error.message}"
@error_handler.handle(error) if @error_handler
end

def fetch_flag_definitions
response = call_flags_endpoint

Expand Down
105 changes: 105 additions & 0 deletions spec/mixpanel-ruby/flags/local_flags_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -755,5 +755,110 @@ def user_context_with_properties(properties)
polling_provider.stop_polling_for_definitions!
end
end

# SDK-81: concurrent start! callers must not both pass the !@polling_thread
# check and spawn duplicate threads — the earlier one becomes orphaned.
it 'is idempotent under concurrent start calls and does not leak threads' do
stub_flag_definitions([create_test_flag])

polling_provider = Mixpanel::Flags::LocalFlagsProvider.new(
test_token,
# Short interval so stop_polling's join doesn't wait long for the
# poller thread to wake from sleep.
{ enable_polling: true, polling_interval_in_seconds: 0.05 },
mock_tracker,
mock_error_handler
)

begin
gate = Queue.new
threads = Array.new(8) do
Thread.new do
gate.pop
polling_provider.start_polling_for_definitions!
end
end

# Release all contenders at once to maximize the chance of a race.
8.times { gate << :go }
threads.each(&:join)

# Exactly one poller thread should be alive, not eight. Without
# the mutex, each contender that passed the check would spawn
# its own. Filter by name so unrelated background threads (RSpec
# workers, GC finalizers, etc.) can't perturb the count.
poller_threads = Thread.list.select { |t| t.name == 'mixpanel-flags-poller' }
expect(poller_threads.size).to eq(1)
ensure
polling_provider.stop_polling_for_definitions!
end
end

# SDK-81: previously stop_polling_for_definitions! flipped a shared
# @stop_polling flag, so a subsequent start could reset it while the
# old thread was still asleep — the old thread would wake, see false,
# and keep polling forever. With per-thread stop signals the old
# thread breaks on its captured flag regardless of subsequent starts.
it 'stop+start does not leave the previous poller running' do
stub_flag_definitions([create_test_flag])

polling_provider = Mixpanel::Flags::LocalFlagsProvider.new(
test_token,
{ enable_polling: true, polling_interval_in_seconds: 0.05 },
mock_tracker,
mock_error_handler
)

begin
polling_provider.start_polling_for_definitions!
polling_provider.stop_polling_for_definitions!
polling_provider.start_polling_for_definitions!

# Give the previous thread ample time to wake from its sleep and
# check its stop flag. With a per-thread flag it will break; with
# the old shared flag it would see the new start's `false` and
# keep going.
sleep 0.2

poller_threads = Thread.list.select { |t| t.name == 'mixpanel-flags-poller' && t.alive? }
expect(poller_threads.size).to eq(1)
ensure
polling_provider.stop_polling_for_definitions!
end
end

# SDK-78: the polling loop used to dispatch errors only to @error_handler,
# whose default Mixpanel::ErrorHandler#handle is a no-op — schema drift
# (NoMethodError, JSON::ParserError, etc.) looped forever undetected.
it 'warns to stderr when fetch raises and no error_handler is configured' do
stub_request(:get, endpoint_url_regex).to_return(status: 500, body: 'server down')

polling_provider = Mixpanel::Flags::LocalFlagsProvider.new(
test_token,
{ enable_polling: false },
mock_tracker,
nil
)

expect { polling_provider.start_polling_for_definitions! }
.to output(/\[Mixpanel\] Failed to fetch flag definitions: Mixpanel::ServerError/).to_stderr
end

it 'surfaces unexpected errors (schema drift) instead of swallowing them silently' do
stub_flag_definitions([create_test_flag])

polling_provider = Mixpanel::Flags::LocalFlagsProvider.new(
test_token,
{ enable_polling: false },
mock_tracker,
mock_error_handler
)

allow(polling_provider).to receive(:fetch_flag_definitions).and_raise(NoMethodError, "undefined method `[]' for nil:NilClass")

expect(mock_error_handler).to receive(:handle).with(an_instance_of(NoMethodError))
expect { polling_provider.start_polling_for_definitions! }
.to output(/\[Mixpanel\] Failed to fetch flag definitions: NoMethodError/).to_stderr
end
end
end