diff --git a/lib/mixpanel-ruby/flags/flags_provider.rb b/lib/mixpanel-ruby/flags/flags_provider.rb index 116f011..a7235e2 100644 --- a/lib/mixpanel-ruby/flags/flags_provider.rb +++ b/lib/mixpanel-ruby/flags/flags_provider.rb @@ -12,7 +12,7 @@ module Flags # Base class for feature flags providers # Provides common HTTP handling and exposure event tracking class FlagsProvider - # @param provider_config [Hash] Configuration with :token, :api_host, :request_timeout_in_seconds + # @param provider_config [Hash] Configuration with :token, :api_host, :request_timeout_in_seconds, :exposure_executor # @param endpoint [String] API endpoint path (e.g., '/flags' or '/flags/definitions') # @param tracker_callback [Proc] Function used to track events (bound tracker.track method) # @param evaluation_mode [String] The feature flag evaluation mode. This is either 'local' or 'remote' @@ -23,6 +23,7 @@ def initialize(provider_config, endpoint, tracker_callback, evaluation_mode, err @tracker_callback = tracker_callback @evaluation_mode = evaluation_mode @error_handler = error_handler + @exposure_executor = provider_config[:exposure_executor] end # Make HTTP request to flags API endpoint @@ -104,12 +105,36 @@ def track_exposure_event(flag_key, selected_variant, context, latency_ms = nil) properties['$is_experiment_active'] = selected_variant.is_experiment_active unless selected_variant.is_experiment_active.nil? properties['$is_qa_tester'] = selected_variant.is_qa_tester unless selected_variant.is_qa_tester.nil? - begin - @tracker_callback.call(distinct_id, Utils::EXPOSURE_EVENT, properties) - rescue MixpanelError => e - @error_handler.handle(e) + dispatch_exposure(distinct_id, properties) + end + + private + + # Dispatch the tracker call inline or via the configured executor. + # The executor is duck-typed — anything that responds to #post(&block) + # works (Concurrent::ExecutorService, or a Thread.new wrapper). + def dispatch_exposure(distinct_id, properties) + if @exposure_executor + begin + @exposure_executor.post { invoke_tracker(distinct_id, properties) } + rescue StandardError => e + @error_handler.handle(MixpanelError.new("Exposure event dropped — executor refused to accept task: #{e.message}")) if @error_handler + end + else + invoke_tracker(distinct_id, properties) end end + + def invoke_tracker(distinct_id, properties) + @tracker_callback.call(distinct_id, Utils::EXPOSURE_EVENT, properties) + rescue MixpanelError => e + @error_handler.handle(e) if @error_handler + rescue StandardError => e + # On the async path a bare `rescue MixpanelError` would let any + # other exception terminate the executor thread silently. Wrap + # into MixpanelError so error_handler sees a consistent type. + @error_handler.handle(MixpanelError.new("Exposure event failed: #{e.class}: #{e.message}")) if @error_handler + end end end end diff --git a/lib/mixpanel-ruby/flags/local_flags_provider.rb b/lib/mixpanel-ruby/flags/local_flags_provider.rb index 9bf1bb0..d90eb22 100644 --- a/lib/mixpanel-ruby/flags/local_flags_provider.rb +++ b/lib/mixpanel-ruby/flags/local_flags_provider.rb @@ -11,7 +11,8 @@ class LocalFlagsProvider < FlagsProvider api_host: 'api.mixpanel.com', request_timeout_in_seconds: 10, enable_polling: true, - polling_interval_in_seconds: 60 + polling_interval_in_seconds: 60, + exposure_executor: nil }.freeze # @param token [String] Mixpanel project token @@ -24,7 +25,8 @@ def initialize(token, config, tracker_callback, error_handler) provider_config = { token: token, api_host: @config[:api_host], - request_timeout_in_seconds: @config[:request_timeout_in_seconds] + request_timeout_in_seconds: @config[:request_timeout_in_seconds], + exposure_executor: @config[:exposure_executor] } super(provider_config, '/flags/definitions', tracker_callback, 'local', error_handler) diff --git a/lib/mixpanel-ruby/flags/remote_flags_provider.rb b/lib/mixpanel-ruby/flags/remote_flags_provider.rb index eaa6471..321a1c3 100644 --- a/lib/mixpanel-ruby/flags/remote_flags_provider.rb +++ b/lib/mixpanel-ruby/flags/remote_flags_provider.rb @@ -7,7 +7,8 @@ module Flags class RemoteFlagsProvider < FlagsProvider DEFAULT_CONFIG = { api_host: 'api.mixpanel.com', - request_timeout_in_seconds: 10 + request_timeout_in_seconds: 10, + exposure_executor: nil }.freeze # @param token [String] Mixpanel project token @@ -20,7 +21,8 @@ def initialize(token, config, tracker_callback, error_handler) provider_config = { token: token, api_host: merged_config[:api_host], - request_timeout_in_seconds: merged_config[:request_timeout_in_seconds] + request_timeout_in_seconds: merged_config[:request_timeout_in_seconds], + exposure_executor: merged_config[:exposure_executor] } super(provider_config, '/flags', tracker_callback, 'remote', error_handler) diff --git a/openfeature-provider/README.md b/openfeature-provider/README.md index 8a833c7..84c6bbf 100644 --- a/openfeature-provider/README.md +++ b/openfeature-provider/README.md @@ -201,6 +201,32 @@ When you are done using the provider, shut it down to stop any background pollin provider.shutdown ``` +## Async Exposure Tracking + +By default, every flag evaluation tracks an exposure event inline — the `/track` HTTP round trip happens on the calling thread before `fetch_*_value` returns. For latency-sensitive code paths, pass an `:exposure_executor` so exposure tracking runs off-thread. The executor is duck-typed — anything that responds to `#post(&block)` works: + +```ruby +# With concurrent-ruby (recommended) +require 'concurrent' +exposure_executor = Concurrent::FixedThreadPool.new(1) + +provider = Mixpanel::OpenFeature::Provider.from_local( + 'YOUR_PROJECT_TOKEN', + { exposure_executor: exposure_executor }, +) + +# Without concurrent-ruby (minimal wrapper) +class ThreadPerCall + def post(&block) = Thread.new(&block) +end +provider = Mixpanel::OpenFeature::Provider.from_local( + 'YOUR_PROJECT_TOKEN', + { exposure_executor: ThreadPerCall.new }, +) +``` + +Available on both local and remote flag configs. Defaults to `nil` (inline behavior); existing setups are unaffected. + ## Error Handling The provider uses OpenFeature's standard error codes to indicate issues during flag evaluation: diff --git a/spec/mixpanel-ruby/flags/local_flags_spec.rb b/spec/mixpanel-ruby/flags/local_flags_spec.rb index 6969521..45e113b 100644 --- a/spec/mixpanel-ruby/flags/local_flags_spec.rb +++ b/spec/mixpanel-ruby/flags/local_flags_spec.rb @@ -1,4 +1,5 @@ require 'json' +require 'timeout' require 'mixpanel-ruby/flags/local_flags_provider' require 'mixpanel-ruby/flags/types' require 'webmock/rspec' @@ -718,6 +719,84 @@ def user_context_with_properties(properties) provider.send(:track_exposure_event, 'test_flag', variant, test_context) end + + it 'runs the tracker inline by default (no executor configured)' do + flag = create_test_flag + stub_flag_definitions([flag]) + provider.start_polling_for_definitions! + + variant = Mixpanel::Flags::SelectedVariant.new( + variant_key: 'treatment', variant_value: 'treatment' + ) + + calling_thread = Thread.current + tracker_thread = nil + allow(mock_tracker).to receive(:call) { tracker_thread = Thread.current } + + provider.send(:track_exposure_event, 'test_flag', variant, test_context) + expect(tracker_thread).to be(calling_thread) + end + + it 'dispatches the tracker via :exposure_executor when configured' do + executor = Object.new + def executor.post(&block) + Thread.new(&block) + end + + tracker_thread = nil + tracker_ran = Queue.new + tracker = ->(_distinct_id, _event, _properties) { + tracker_thread = Thread.current + tracker_ran << :done + } + + provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + { enable_polling: false, exposure_executor: executor }, + tracker, + mock_error_handler + ) + + variant = Mixpanel::Flags::SelectedVariant.new( + variant_key: 'treatment', variant_value: 'treatment' + ) + provider.send(:track_exposure_event, 'test_flag', variant, test_context) + + # Bounded wait — a bare Queue#pop would hang CI forever if the + # tracker block raised before pushing :done. + Timeout.timeout(2) { tracker_ran.pop } + expect(tracker_thread).not_to be(Thread.current) + end + + it 'reports non-MixpanelError exceptions from the async tracker to error_handler' do + handler_called = Queue.new + handler = double('error_handler') + allow(handler).to receive(:handle) { |e| handler_called << e } + + executor = Object.new + def executor.post(&block) + Thread.new(&block) + end + + tracker = ->(_distinct_id, _event, _properties) { raise NoMethodError, 'boom' } + + provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + { enable_polling: false, exposure_executor: executor }, + tracker, + handler + ) + + variant = Mixpanel::Flags::SelectedVariant.new( + variant_key: 'treatment', variant_value: 'treatment' + ) + provider.send(:track_exposure_event, 'test_flag', variant, test_context) + + err = Timeout.timeout(2) { handler_called.pop } + expect(err).to be_a(Mixpanel::MixpanelError) + expect(err.message).to include('NoMethodError') + expect(err.message).to include('boom') + end end describe 'polling' do diff --git a/spec/mixpanel-ruby/flags/remote_flags_spec.rb b/spec/mixpanel-ruby/flags/remote_flags_spec.rb index 8e7308b..815ce52 100644 --- a/spec/mixpanel-ruby/flags/remote_flags_spec.rb +++ b/spec/mixpanel-ruby/flags/remote_flags_spec.rb @@ -1,4 +1,5 @@ require 'json' +require 'timeout' require 'mixpanel-ruby/flags/remote_flags_provider' require 'mixpanel-ruby/flags/types' require 'webmock/rspec' @@ -116,6 +117,61 @@ def stub_flags_request_error(error) provider.get_variant_value('test_flag', 'control', test_context) end + it 'runs the tracker inline by default (no executor configured)' do + response = create_success_response({ + 'test_flag' => { + 'variant_key' => 'treatment', + 'variant_value' => 'treatment' + } + }) + stub_flags_request(response) + + calling_thread = Thread.current + tracker_thread = nil + allow(mock_tracker).to receive(:call) { tracker_thread = Thread.current } + + provider.get_variant_value('test_flag', 'control', test_context) + expect(tracker_thread).to be(calling_thread) + end + + it 'dispatches the tracker via the configured exposure_executor off the calling thread' do + response = create_success_response({ + 'test_flag' => { + 'variant_key' => 'treatment', + 'variant_value' => 'treatment' + } + }) + stub_flags_request(response) + + calling_thread = Thread.current + tracker_thread = nil + tracker_ran = Queue.new + tracker = ->(_distinct_id, _event, _properties) { + tracker_thread = Thread.current + tracker_ran << :done + } + + # Minimal duck-typed executor: spawn a thread per call. + executor = Object.new + def executor.post(&block) + Thread.new(&block) + end + + provider = Mixpanel::Flags::RemoteFlagsProvider.new( + test_token, + { exposure_executor: executor }, + tracker, + mock_error_handler + ) + + provider.get_variant_value('test_flag', 'control', test_context) + + # Bounded wait — a bare Queue#pop would hang CI forever if the + # tracker block raised before pushing :done. + Timeout.timeout(2) { tracker_ran.pop } + expect(tracker_thread).not_to be(calling_thread) + end + it 'does not track exposure event when report_exposure is false' do response = create_success_response({ 'test_flag' => {