Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions pinecone/adapters/response_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from __future__ import annotations

from multiprocessing.pool import ApplyResult
from concurrent.futures import Future
from typing import TYPE_CHECKING, Any

from pinecone.adapters.protocols import (
Expand Down Expand Up @@ -123,21 +123,21 @@ def adapt_fetch_response(openapi_response: FetchResponseAdapter) -> FetchRespons


class UpsertResponseTransformer:
"""Transformer for converting ApplyResult[OpenAPIUpsertResponse] to UpsertResponse.
"""Transformer for converting a Future[OpenAPIUpsertResponse] to UpsertResponse.

This wrapper transforms the OpenAPI response to our dataclass when .get() is called,
while delegating other methods to the underlying ApplyResult.
while delegating other methods to the underlying Future.

Example:
>>> transformer = UpsertResponseTransformer(async_result)
>>> transformer = UpsertResponseTransformer(future)
>>> response = transformer.get() # Returns UpsertResponse
"""

_apply_result: ApplyResult
_future: Future
""" :meta private: """

def __init__(self, apply_result: ApplyResult) -> None:
self._apply_result = apply_result
def __init__(self, future: Future) -> None:
self._future = future

def get(self, timeout: float | None = None) -> UpsertResponse:
"""Get the transformed UpsertResponse.
Expand All @@ -148,9 +148,15 @@ def get(self, timeout: float | None = None) -> UpsertResponse:
Returns:
The SDK UpsertResponse dataclass.
"""
openapi_response = self._apply_result.get(timeout)
openapi_response = self._future.result(timeout)
return adapt_upsert_response(openapi_response)

def result(self, timeout: float | None = None) -> UpsertResponse:
"""Alias for :meth:`get` to match :class:`concurrent.futures.Future`."""
return self.get(timeout)

def __getattr__(self, name: str) -> Any:
# Delegate other methods to the underlying ApplyResult
return getattr(self._apply_result, name)
# Delegate other methods to the underlying Future.
# .result is intentionally defined above so it returns the transformed
# UpsertResponse rather than the raw OpenAPI response.
return getattr(self._future, name)
Comment thread
cursor[bot] marked this conversation as resolved.
59 changes: 19 additions & 40 deletions pinecone/openapi_support/api_client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from __future__ import annotations

import atexit
import io

from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
from multiprocessing.pool import ThreadPool
from concurrent.futures import ThreadPoolExecutor

from .rest_urllib3 import Urllib3RestClient
Expand All @@ -33,7 +31,6 @@ class ApiClient(object):
to the API. More threads means more concurrent API requests.
"""

_pool: "ThreadPool" | None = None
_threadpool_executor: "ThreadPoolExecutor" | None = None

def __init__(
Expand All @@ -59,24 +56,6 @@ def close(self):
if self._threadpool_executor:
self._threadpool_executor.shutdown()
self._threadpool_executor = None
if self._pool:
self._pool.close()
self._pool.join()
self._pool = None
if hasattr(atexit, "unregister"):
atexit.unregister(self.close)

@property
def pool(self) -> "ThreadPool":
"""Create thread pool on first request
avoids instantiating unused threadpool for blocking clients.
"""
if self._pool is None:
from multiprocessing.pool import ThreadPool

atexit.register(self.close)
self._pool = ThreadPool(self.pool_threads)
return self._pool

@property
def threadpool_executor(self) -> "ThreadPoolExecutor":
Expand Down Expand Up @@ -336,27 +315,27 @@ def call_api(
_check_type,
)

return self.pool.apply_async(
future = self.threadpool_executor.submit(
self.__call_api,
(
resource_path,
method,
path_params,
query_params,
header_params,
body,
post_params,
files,
response_type,
auth_settings,
_return_http_data_only,
collection_formats,
_preload_content,
_request_timeout,
_host,
_check_type,
),
resource_path,
method,
path_params,
query_params,
header_params,
body,
post_params,
files,
response_type,
auth_settings,
_return_http_data_only,
collection_formats,
_preload_content,
_request_timeout,
_host,
_check_type,
)
future.get = future.result
return future

def request(
self,
Expand Down