From fcd89d864b89e44830d178ebde1e42c9d7c28c3b Mon Sep 17 00:00:00 2001 From: Zach <54722467+zabarn@users.noreply.github.com> Date: Mon, 14 Apr 2025 01:46:18 -0500 Subject: [PATCH 1/2] Revert "fix: Initialize ThreadPoolExecutor in sql registry (#210)" This reverts commit 727446034e59b55e21332788005c248b3714d23e. --- sdk/python/feast/infra/registry/sql.py | 31 ++++++-------------------- 1 file changed, 7 insertions(+), 24 deletions(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index ba0501b7211..7322b288e0f 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1,6 +1,4 @@ -import atexit import logging -import threading import uuid from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone @@ -287,11 +285,6 @@ def __init__( self.thread_pool_executor_worker_count = ( registry_config.thread_pool_executor_worker_count ) - if self.thread_pool_executor_worker_count > 0: - self._executor = ThreadPoolExecutor( - max_workers=self.thread_pool_executor_worker_count - ) - atexit.register(self._exit_handler) self.purge_feast_metadata = registry_config.purge_feast_metadata # Sync feast_metadata to projects table # when purge_feast_metadata is set to True, Delete data from @@ -990,17 +983,14 @@ def process_project(project: Project): r.infra.CopyFrom(self.get_infra(project_name).to_proto()) projects_list = self.list_projects(allow_cache=False) - if self._executor: - logger.info( - f"Thread count before executor.map: {len(threading.enumerate())}" - ) - self._executor.map(process_project, projects_list) - logger.info( - f"Thread count after executor.map: {len(threading.enumerate())}" - ) + if self.thread_pool_executor_worker_count == 0: + for project in projects_list: + process_project(project) else: - for p in projects_list: - process_project(p) + with ThreadPoolExecutor( + max_workers=self.thread_pool_executor_worker_count + ) as executor: + executor.map(process_project, projects_list) if last_updated_timestamps: r.last_updated.FromDatetime(max(last_updated_timestamps)) @@ -1427,10 +1417,3 @@ def get_project_metadata( datetime.utcfromtimestamp(int(metadata_value)) ) return project_metadata_model - - def _exit_handler(self): - if self._executor: - logger.info("Shutting down SqlRegistry's ThreadPoolExecutor...") - self._executor.shutdown(wait=False, cancel_futures=True) - logger.info("ThreadPoolExecutor shut down successfully.") - self._executor = None From ae232f2dc97e3ea324465e5c67317bd2cd6327ed Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Mon, 14 Apr 2025 09:46:32 -0500 Subject: [PATCH 2/2] fix: Hold lock in caching registry refresh --- sdk/python/feast/infra/registry/caching_registry.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 0915f4721fe..0bb8dc8d2f1 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -462,8 +462,9 @@ def list_projects( return self._list_projects(tags) def refresh(self, project: Optional[str] = None): - self.cached_registry_proto = self.proto() - self.cached_registry_proto_created = _utc_now() + with self._refresh_lock: + self.cached_registry_proto = self.proto() + self.cached_registry_proto_created = _utc_now() def _refresh_cached_registry_if_necessary(self): if self.cache_mode == "sync":