Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions lib/mixpanel-ruby/flags/flags_provider.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Flags
# Base class for feature flags providers
# Provides common HTTP handling and exposure event tracking
class FlagsProvider
# @param provider_config [Hash] Configuration with :token, :api_host, :request_timeout_in_seconds
# @param provider_config [Hash] Configuration with :token, :api_host, :request_timeout_in_seconds, :exposure_executor
# @param endpoint [String] API endpoint path (e.g., '/flags' or '/flags/definitions')
# @param tracker_callback [Proc] Function used to track events (bound tracker.track method)
# @param evaluation_mode [String] The feature flag evaluation mode. This is either 'local' or 'remote'
Expand All @@ -23,6 +23,7 @@ def initialize(provider_config, endpoint, tracker_callback, evaluation_mode, err
@tracker_callback = tracker_callback
@evaluation_mode = evaluation_mode
@error_handler = error_handler
@exposure_executor = provider_config[:exposure_executor]
end

# Make HTTP request to flags API endpoint
Expand Down Expand Up @@ -104,12 +105,36 @@ def track_exposure_event(flag_key, selected_variant, context, latency_ms = nil)
properties['$is_experiment_active'] = selected_variant.is_experiment_active unless selected_variant.is_experiment_active.nil?
properties['$is_qa_tester'] = selected_variant.is_qa_tester unless selected_variant.is_qa_tester.nil?

begin
@tracker_callback.call(distinct_id, Utils::EXPOSURE_EVENT, properties)
rescue MixpanelError => e
@error_handler.handle(e)
dispatch_exposure(distinct_id, properties)
end

private

# Dispatch the tracker call inline or via the configured executor.
# The executor is duck-typed — anything that responds to #post(&block)
# works (Concurrent::ExecutorService, or a Thread.new wrapper).
def dispatch_exposure(distinct_id, properties)
if @exposure_executor
begin
@exposure_executor.post { invoke_tracker(distinct_id, properties) }
rescue StandardError => e
@error_handler.handle(MixpanelError.new("Exposure event dropped — executor refused to accept task: #{e.message}")) if @error_handler
end
else
invoke_tracker(distinct_id, properties)
end
end

def invoke_tracker(distinct_id, properties)
@tracker_callback.call(distinct_id, Utils::EXPOSURE_EVENT, properties)
rescue MixpanelError => e
@error_handler.handle(e) if @error_handler
rescue StandardError => e
# On the async path a bare `rescue MixpanelError` would let any
# other exception terminate the executor thread silently. Wrap
# into MixpanelError so error_handler sees a consistent type.
@error_handler.handle(MixpanelError.new("Exposure event failed: #{e.class}: #{e.message}")) if @error_handler
end
Comment thread
tylerjroach marked this conversation as resolved.
end
end
end
6 changes: 4 additions & 2 deletions lib/mixpanel-ruby/flags/local_flags_provider.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class LocalFlagsProvider < FlagsProvider
api_host: 'api.mixpanel.com',
request_timeout_in_seconds: 10,
enable_polling: true,
polling_interval_in_seconds: 60
polling_interval_in_seconds: 60,
exposure_executor: nil
}.freeze

# @param token [String] Mixpanel project token
Expand All @@ -24,7 +25,8 @@ def initialize(token, config, tracker_callback, error_handler)
provider_config = {
token: token,
api_host: @config[:api_host],
request_timeout_in_seconds: @config[:request_timeout_in_seconds]
request_timeout_in_seconds: @config[:request_timeout_in_seconds],
exposure_executor: @config[:exposure_executor]
}

super(provider_config, '/flags/definitions', tracker_callback, 'local', error_handler)
Expand Down
6 changes: 4 additions & 2 deletions lib/mixpanel-ruby/flags/remote_flags_provider.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ module Flags
class RemoteFlagsProvider < FlagsProvider
DEFAULT_CONFIG = {
api_host: 'api.mixpanel.com',
request_timeout_in_seconds: 10
request_timeout_in_seconds: 10,
exposure_executor: nil
}.freeze

# @param token [String] Mixpanel project token
Expand All @@ -20,7 +21,8 @@ def initialize(token, config, tracker_callback, error_handler)
provider_config = {
token: token,
api_host: merged_config[:api_host],
request_timeout_in_seconds: merged_config[:request_timeout_in_seconds]
request_timeout_in_seconds: merged_config[:request_timeout_in_seconds],
exposure_executor: merged_config[:exposure_executor]
}

super(provider_config, '/flags', tracker_callback, 'remote', error_handler)
Expand Down
26 changes: 26 additions & 0 deletions openfeature-provider/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,32 @@ When you are done using the provider, shut it down to stop any background pollin
provider.shutdown
```

## Async Exposure Tracking

By default, every flag evaluation tracks an exposure event inline — the `/track` HTTP round trip happens on the calling thread before `fetch_*_value` returns. For latency-sensitive code paths, pass an `:exposure_executor` so exposure tracking runs off-thread. The executor is duck-typed — anything that responds to `#post(&block)` works:

```ruby
# With concurrent-ruby (recommended)
require 'concurrent'
exposure_executor = Concurrent::FixedThreadPool.new(1)

provider = Mixpanel::OpenFeature::Provider.from_local(
'YOUR_PROJECT_TOKEN',
{ exposure_executor: exposure_executor },
)

# Without concurrent-ruby (minimal wrapper)
class ThreadPerCall
def post(&block) = Thread.new(&block)
end
provider = Mixpanel::OpenFeature::Provider.from_local(
'YOUR_PROJECT_TOKEN',
{ exposure_executor: ThreadPerCall.new },
)
```

Available on both local and remote flag configs. Defaults to `nil` (inline behavior); existing setups are unaffected.

## Error Handling

The provider uses OpenFeature's standard error codes to indicate issues during flag evaluation:
Expand Down
79 changes: 79 additions & 0 deletions spec/mixpanel-ruby/flags/local_flags_spec.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'json'
require 'timeout'
require 'mixpanel-ruby/flags/local_flags_provider'
require 'mixpanel-ruby/flags/types'
require 'webmock/rspec'
Expand Down Expand Up @@ -718,6 +719,84 @@ def user_context_with_properties(properties)

provider.send(:track_exposure_event, 'test_flag', variant, test_context)
end

it 'runs the tracker inline by default (no executor configured)' do
flag = create_test_flag
stub_flag_definitions([flag])
provider.start_polling_for_definitions!

variant = Mixpanel::Flags::SelectedVariant.new(
variant_key: 'treatment', variant_value: 'treatment'
)

calling_thread = Thread.current
tracker_thread = nil
allow(mock_tracker).to receive(:call) { tracker_thread = Thread.current }

provider.send(:track_exposure_event, 'test_flag', variant, test_context)
expect(tracker_thread).to be(calling_thread)
end

it 'dispatches the tracker via :exposure_executor when configured' do
executor = Object.new
def executor.post(&block)
Thread.new(&block)
end

tracker_thread = nil
tracker_ran = Queue.new
tracker = ->(_distinct_id, _event, _properties) {
tracker_thread = Thread.current
tracker_ran << :done
}

provider = Mixpanel::Flags::LocalFlagsProvider.new(
test_token,
{ enable_polling: false, exposure_executor: executor },
tracker,
mock_error_handler
)

variant = Mixpanel::Flags::SelectedVariant.new(
variant_key: 'treatment', variant_value: 'treatment'
)
provider.send(:track_exposure_event, 'test_flag', variant, test_context)

# Bounded wait — a bare Queue#pop would hang CI forever if the
# tracker block raised before pushing :done.
Timeout.timeout(2) { tracker_ran.pop }
expect(tracker_thread).not_to be(Thread.current)
end

it 'reports non-MixpanelError exceptions from the async tracker to error_handler' do
handler_called = Queue.new
handler = double('error_handler')
allow(handler).to receive(:handle) { |e| handler_called << e }

executor = Object.new
def executor.post(&block)
Thread.new(&block)
end

tracker = ->(_distinct_id, _event, _properties) { raise NoMethodError, 'boom' }

provider = Mixpanel::Flags::LocalFlagsProvider.new(
test_token,
{ enable_polling: false, exposure_executor: executor },
tracker,
handler
)

variant = Mixpanel::Flags::SelectedVariant.new(
variant_key: 'treatment', variant_value: 'treatment'
)
provider.send(:track_exposure_event, 'test_flag', variant, test_context)

err = Timeout.timeout(2) { handler_called.pop }
expect(err).to be_a(Mixpanel::MixpanelError)
expect(err.message).to include('NoMethodError')
expect(err.message).to include('boom')
end
end

describe 'polling' do
Expand Down
56 changes: 56 additions & 0 deletions spec/mixpanel-ruby/flags/remote_flags_spec.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'json'
require 'timeout'
require 'mixpanel-ruby/flags/remote_flags_provider'
require 'mixpanel-ruby/flags/types'
require 'webmock/rspec'
Expand Down Expand Up @@ -116,6 +117,61 @@ def stub_flags_request_error(error)
provider.get_variant_value('test_flag', 'control', test_context)
end

it 'runs the tracker inline by default (no executor configured)' do
response = create_success_response({
'test_flag' => {
'variant_key' => 'treatment',
'variant_value' => 'treatment'
}
})
stub_flags_request(response)

calling_thread = Thread.current
tracker_thread = nil
allow(mock_tracker).to receive(:call) { tracker_thread = Thread.current }

provider.get_variant_value('test_flag', 'control', test_context)
expect(tracker_thread).to be(calling_thread)
end

it 'dispatches the tracker via the configured exposure_executor off the calling thread' do
response = create_success_response({
'test_flag' => {
'variant_key' => 'treatment',
'variant_value' => 'treatment'
}
})
stub_flags_request(response)

calling_thread = Thread.current
tracker_thread = nil
tracker_ran = Queue.new
tracker = ->(_distinct_id, _event, _properties) {
tracker_thread = Thread.current
tracker_ran << :done
}

# Minimal duck-typed executor: spawn a thread per call.
executor = Object.new
def executor.post(&block)
Thread.new(&block)
end

Comment thread
tylerjroach marked this conversation as resolved.
provider = Mixpanel::Flags::RemoteFlagsProvider.new(
test_token,
{ exposure_executor: executor },
tracker,
mock_error_handler
)

provider.get_variant_value('test_flag', 'control', test_context)

# Bounded wait — a bare Queue#pop would hang CI forever if the
# tracker block raised before pushing :done.
Timeout.timeout(2) { tracker_ran.pop }
expect(tracker_thread).not_to be(calling_thread)
end

it 'does not track exposure event when report_exposure is false' do
response = create_success_response({
'test_flag' => {
Expand Down
Loading