From 874a5b248dbe5f74a35e76d081eceaa22238d8e0 Mon Sep 17 00:00:00 2001 From: Volodymyr Nazarkevych Date: Thu, 23 Apr 2026 17:21:53 +0300 Subject: [PATCH] fix: make cache and feature update handling thread-safe --- growthbook/growthbook.py | 80 ++++++++++++++++++++++----------- growthbook/growthbook_client.py | 23 ++++------ 2 files changed, 61 insertions(+), 42 deletions(-) diff --git a/growthbook/growthbook.py b/growthbook/growthbook.py index d4f4fa4..d0a8434 100644 --- a/growthbook/growthbook.py +++ b/growthbook/growthbook.py @@ -90,22 +90,26 @@ def update(self, value: Dict): class InMemoryFeatureCache(AbstractFeatureCache): def __init__(self) -> None: self.cache: Dict[str, CacheEntry] = {} + self._lock = threading.Lock() def get(self, key: str) -> Optional[Dict]: - if key in self.cache: - entry = self.cache[key] - if entry.expires >= time(): - return entry.value + with self._lock: + if key in self.cache: + entry=self.cache[key] + if entry.expires>=time(): + return entry.value return None def set(self, key: str, value: Dict, ttl: int) -> None: - if key in self.cache: - self.cache[key].update(value) - else: - self.cache[key] = CacheEntry(value, ttl) + with self._lock: + if key in self.cache: + self.cache[key].update(value) + else: + self.cache[key]=CacheEntry(value,ttl) def clear(self) -> None: - self.cache.clear() + with self._lock: + self.cache.clear() class InMemoryStickyBucketService(AbstractStickyBucketService): def __init__(self) -> None: @@ -349,6 +353,9 @@ def __init__(self) -> None: self._max_etag_entries = 100 self._etag_lock = threading.Lock() + self._async_load_lock = asyncio.Lock() + self._load_lock = threading.Lock() + def set_cache(self, cache: AbstractFeatureCache) -> None: self.cache = cache @@ -360,17 +367,21 @@ def save_in_cache(self, key: str, res, ttl: int = 600): def add_feature_update_callback(self, callback: Callable[[Dict], None]) -> None: """Add a callback to be notified when features are updated due to cache expiry""" - if callback not in self._feature_update_callbacks: - self._feature_update_callbacks.append(callback) + with self._refresh_lock: + if callback not in self._feature_update_callbacks: + self._feature_update_callbacks.append(callback) def remove_feature_update_callback(self, callback: Callable[[Dict], None]) -> None: """Remove a feature update callback""" - if callback in self._feature_update_callbacks: - self._feature_update_callbacks.remove(callback) + with self._refresh_lock: + if callback in self._feature_update_callbacks: + self._feature_update_callbacks.remove(callback) def _notify_feature_update_callbacks(self, features_data: Dict) -> None: """Notify all registered callbacks about feature updates""" - for callback in self._feature_update_callbacks: + with self._refresh_lock: + callbacks = self._feature_update_callbacks.copy() + for callback in callbacks: try: callback(features_data) except Exception as e: @@ -384,34 +395,48 @@ def load_features( raise ValueError("Must specify `client_key` to refresh features") key = api_host + "::" + client_key - cached = self.cache.get(key) - if not cached: - res = self._fetch_features(api_host, client_key, decryption_key) + + if cached: + return cached + + with self._load_lock: + cached = self.cache.get(key) + if cached: + return cached + res=self._fetch_features(api_host,client_key,decryption_key) if res is not None: - self.cache.set(key, res, ttl) + self.cache.set(key,res,ttl) logger.debug("Fetched features from API, stored in cache") # Notify callbacks about fresh features self._notify_feature_update_callbacks(res) return res - return cached - - + + return None + async def load_features_async( self, api_host: str, client_key: str, decryption_key: str = "", ttl: int = 600 ) -> Optional[Dict]: key = api_host + "::" + client_key - cached = self.cache.get(key) - if not cached: - res = await self._fetch_features_async(api_host, client_key, decryption_key) + + if cached: + return cached + + async with self._async_load_lock: + cached=self.cache.get(key) + if cached: + return cached + res=await self._fetch_features_async(api_host,client_key,decryption_key) + if res is not None: - self.cache.set(key, res, ttl) + self.cache.set(key,res,ttl) logger.debug("Fetched features from API, stored in cache") # Notify callbacks about fresh features self._notify_feature_update_callbacks(res) return res - return cached + + return None @property def user_agent_suffix(self) -> Optional[str]: @@ -848,9 +873,10 @@ async def load_features_async(self) -> None: def _features_event_handler(self, features): decoded = json.loads(features) if not decoded: - return None + return data = feature_repo.decrypt_response(decoded, self._decryption_key) + key=self._api_host+"::"+self._client_key key = self._api_host + "::" + self._client_key if data is not None: diff --git a/growthbook/growthbook_client.py b/growthbook/growthbook_client.py index a4b56c2..da08b41 100644 --- a/growthbook/growthbook_client.py +++ b/growthbook/growthbook_client.py @@ -98,7 +98,7 @@ def get_current_state(self) -> Dict[str, Any]: with self._lock: return { "features": dict(self._cache['features']), - "savedGroups": self._cache['savedGroups'] + "savedGroups": dict(self._cache['savedGroups']) } class EnhancedFeatureRepository(FeatureRepository, metaclass=SingletonMeta): @@ -380,7 +380,7 @@ def __init__( 'attributes': {}, 'assignments': {} } - self._sticky_bucket_cache_lock = False + self._sticky_bucket_cache_lock = asyncio.Lock() # Plugin support self._tracking_plugins: List[Any] = self.options.tracking_plugins or [] @@ -496,21 +496,14 @@ async def _refresh_sticky_buckets(self, attributes: Dict[str, Any]) -> Dict[str, return {} # Use compare-and-swap pattern - while not self._sticky_bucket_cache_lock: + async with self._sticky_bucket_cache_lock: if attributes == self._sticky_bucket_cache['attributes']: return self._sticky_bucket_cache['assignments'] - - self._sticky_bucket_cache_lock = True - try: - assignments = self.options.sticky_bucket_service.get_all_assignments(attributes) - self._sticky_bucket_cache['attributes'] = attributes.copy() - self._sticky_bucket_cache['assignments'] = assignments - return assignments - finally: - self._sticky_bucket_cache_lock = False - - # Fallback return for edge case where loop condition is never satisfied - return {} + + assignments=self.options.sticky_bucket_service.get_all_assignments(attributes) + self._sticky_bucket_cache['attributes']=attributes.copy() + self._sticky_bucket_cache['assignments']=assignments + return assignments async def initialize(self) -> bool: """Initialize client with features and start refresh"""