From a790fda5aedad3a3e3c97e36104be87f82a45545 Mon Sep 17 00:00:00 2001 From: Eugene&Monty <136487467+EugeneSukharev1988@users.noreply.github.com> Date: Tue, 31 Mar 2026 10:48:13 -0500 Subject: [PATCH 1/2] refactor: pass query parameters directly to convert_to_binary_query instead of setting class attributes --- montycat/store_classes/inmemory.py | 26 ++--- montycat/store_classes/kv.py | 104 ++++++++++-------- montycat/store_classes/persistent.py | 96 +++++----------- .../store_generic_functions.py | 13 ++- 4 files changed, 105 insertions(+), 134 deletions(-) diff --git a/montycat/store_classes/inmemory.py b/montycat/store_classes/inmemory.py index 152b064..c669ceb 100644 --- a/montycat/store_classes/inmemory.py +++ b/montycat/store_classes/inmemory.py @@ -81,9 +81,8 @@ async def insert_custom_key(cls, custom_key: str, expire_sec: int = 0): raise ValueError("No custom key provided for insertion.") custom_key_converted = convert_custom_key(custom_key) - cls.command = "insert_custom_key" - query = convert_to_binary_query(cls, key=custom_key_converted, expire_sec=expire_sec) + query = convert_to_binary_query(cls, command="insert_custom_key", key=custom_key_converted, expire_sec=expire_sec) return await cls._run_query(query) @classmethod @@ -103,9 +102,8 @@ async def insert_custom_key_value(cls, custom_key: str, value: dict, expire_sec: raise ValueError("No custom key provided for insertion.") custom_key_converted = convert_custom_key(custom_key) - cls.command = "insert_custom_key_value" - query = convert_to_binary_query(cls, key=custom_key_converted, value=value, expire_sec=expire_sec) + query = convert_to_binary_query(cls, command="insert_custom_key_value", key=custom_key_converted, value=value, expire_sec=expire_sec) return await cls._run_query(query) @classmethod @@ -120,9 +118,7 @@ async def insert_value(cls, value: dict, expire_sec: int = 0): if not value: raise ValueError("No value provided for insertion.") - cls.command = "insert_value" - - query = convert_to_binary_query(cls, value=value, expire_sec=expire_sec) + query = convert_to_binary_query(cls, command="insert_value", value=value, expire_sec=expire_sec) return await cls._run_query(query) @classmethod @@ -144,6 +140,9 @@ async def update_value(cls, key: Union[str, None] = None, custom_key: Union[str, or a string message if the update was unsuccessful. """ + if key and custom_key: + raise ValueError("Provide either key or custom_key, not both.") + if custom_key and len(custom_key) > 0: key = convert_custom_key(custom_key) @@ -152,9 +151,7 @@ async def update_value(cls, key: Union[str, None] = None, custom_key: Union[str, if not key: raise ValueError("No key provided") - cls.command = "update_value" - - query = convert_to_binary_query(cls, key=key, value=filters, expire_sec=expire_sec) + query = convert_to_binary_query(cls, command="update_value", key=key, value=filters, expire_sec=expire_sec) return await cls._run_query(query) @classmethod @@ -172,8 +169,7 @@ async def insert_bulk(cls, bulk_values: list, expire_sec: int = 0): if not bulk_values: raise ValueError("No values provided for bulk insertion.") - cls.command = "insert_bulk" - query = convert_to_binary_query(cls, bulk_values=bulk_values, expire_sec=expire_sec) + query = convert_to_binary_query(cls, command="insert_bulk", bulk_values=bulk_values, expire_sec=expire_sec) return await cls._run_query(query) @classmethod @@ -186,11 +182,9 @@ async def get_keys(cls, volumes: list = [], latest_volume: bool = False): """ if not latest_volume and not volumes: - raise ValueError("Please provide keys or volumes/latest volume.") - - cls.command = "get_keys" + raise ValueError("Please provide volumes/latest volume.") - query = convert_to_binary_query(cls, volumes=volumes, latest_volume=latest_volume) + query = convert_to_binary_query(cls, command="get_keys", volumes=volumes, latest_volume=latest_volume) return await cls._run_query(query) @classmethod diff --git a/montycat/store_classes/kv.py b/montycat/store_classes/kv.py index f693406..3a024e9 100644 --- a/montycat/store_classes/kv.py +++ b/montycat/store_classes/kv.py @@ -10,13 +10,46 @@ class generic_kv: store: str = "" - command: str = "" - limit_output: dict = {} - schema = None @classmethod - async def _run_query(cls, query: str, callback=None, stop_event: Union[asyncio.Event, None] = None): - port = cls.port + 1 if callback else cls.port + async def subscribe(cls, key: Union[str, None] = None, custom_key: Union[str, None] = None, callback=None, subscription_port: Union[int, None] = None): + """ + Subscribe to real-time changes for a key or the entire keyspace. + Works for both in-memory and persistent keyspaces. + + Args: + key: The key to subscribe to. If None, subscribes to the entire keyspace. + custom_key: A custom key to subscribe to. Converted to internal format. + callback: A function called with response data as it is received. + Returns: + A tuple of (task, stop_event). Call stop_event.set() to stop the subscription. + """ + if not callback: + raise ValueError("Callback function is not provided") + + if key and custom_key: + raise ValueError("Provide either key or custom_key, not both.") + + stop_subscription = asyncio.Event() + + query_dict = { + "subscribe": True, + "store": cls.store, + "keyspace": cls.keyspace, + "key": convert_custom_key(custom_key) if custom_key else key, + "persistent": cls.persistent, + "username": cls.username, + "password": cls.password + } + + query = orjson.dumps(query_dict) + task = asyncio.create_task(cls._run_query(query, callback=callback, stop_event=stop_subscription, subscription_port=subscription_port)) + + return task, stop_subscription + + @classmethod + async def _run_query(cls, query: str, callback=None, stop_event: Union[asyncio.Event, None] = None, subscription_port: Union[int, None] = None): + port = subscription_port if subscription_port else (cls.port + 1 if callback else cls.port) return await send_data(cls.host, port, query, callback=callback, stop_event=stop_event, tls=cls.tls) @classmethod @@ -131,8 +164,8 @@ async def get_value(cls, key: Union[str, None] = None, custom_key: Union[str, No Returns: The value associated with the key or custom key. Class 'str' if the get operation failed. """ - if pointers_metadata and with_pointers: - raise ValueError("You select both pointers value and pointers metadata. Choose one") + if key and custom_key: + raise ValueError("Provide either key or custom_key, not both.") if custom_key and len(custom_key) > 0: key = convert_custom_key(custom_key) @@ -140,9 +173,7 @@ async def get_value(cls, key: Union[str, None] = None, custom_key: Union[str, No if not key: raise ValueError("No key provided") - cls.command = "get_value" - - query = convert_to_binary_query(cls, key=key, with_pointers=with_pointers, key_included=key_included, pointers_metadata=pointers_metadata) + query = convert_to_binary_query(cls, command="get_value", key=key, with_pointers=with_pointers, key_included=key_included, pointers_metadata=pointers_metadata) return await cls._run_query(query) @classmethod @@ -161,15 +192,16 @@ async def delete_key(cls, key: Union[str, None] = None, custom_key: Union[str, N bool | str: Returns a boolean indicating success (True) or failure (False), or a string message if the deletion was unsuccessful. """ + if key and custom_key: + raise ValueError("Provide either key or custom_key, not both.") + if custom_key and len(custom_key) > 0: key = convert_custom_key(custom_key) if not key: raise ValueError("No key provided") - cls.command = "delete_key" - - query = convert_to_binary_query(cls, key=key) + query = convert_to_binary_query(cls, command="delete_key", key=key) return await cls._run_query(query) @classmethod @@ -191,7 +223,6 @@ async def delete_bulk(cls, bulk_keys: list = [], bulk_custom_keys: list = []): Raises: ValueError: If both `bulk_keys` and `bulk_custom_keys` are empty. - ValueError: If both `pointers_metadata` and `with_pointers` are True. """ if len(bulk_custom_keys) > 0: bulk_custom_keys = convert_custom_keys(bulk_custom_keys) @@ -200,13 +231,12 @@ async def delete_bulk(cls, bulk_keys: list = [], bulk_custom_keys: list = []): if not bulk_keys: raise ValueError("No keys provided for deletion.") - cls.command = "delete_bulk" - query = convert_to_binary_query(cls, bulk_keys=bulk_keys) + query = convert_to_binary_query(cls, command="delete_bulk", bulk_keys=bulk_keys) return await cls._run_query(query) @classmethod async def get_bulk( - cls, bulk_keys: list = [], bulk_custom_keys: list = [], limit: list = [], with_pointers: bool = False, key_included: bool = False, pointers_metadata: bool = False, volumes: list[str] = [], latest_volume: bool = False): + cls, bulk_keys: list = [], bulk_custom_keys: list = [], limit: list[int] = [], with_pointers: bool = False, key_included: bool = False, pointers_metadata: bool = False, volumes: list[str] = [], latest_volume: bool = False): """ Retrieve multiple keys in bulk. Custom keys can be converted and added to the bulk retrieval list. Additionally, a limit on the number of records to retrieve can be applied, and whether to include pointers @@ -242,15 +272,13 @@ async def get_bulk( selected_options = sum([ bool(bulk_keys), - bool(volumes and len(volumes) > 0) or latest_volume or bool(limit and len(limit) > 0 and (limit[0] != 0 or limit[1] != 0)), + bool(volumes) or latest_volume or bool(limit and limit != [0, 0]), ]) if selected_options != 1: raise ValueError("Please provide keys or volumes/latest volume or limit.") - cls.command = "get_bulk" - cls.limit_output = handle_limit(limit) - query = convert_to_binary_query(cls, bulk_keys=bulk_keys, with_pointers=with_pointers, key_included=key_included, pointers_metadata=pointers_metadata, volumes=volumes, latest_volume=latest_volume) + query = convert_to_binary_query(cls, command="get_bulk", limit_output=handle_limit(limit), bulk_keys=bulk_keys, with_pointers=with_pointers, key_included=key_included, pointers_metadata=pointers_metadata, volumes=volumes, latest_volume=latest_volume) return await cls._run_query(query) @classmethod @@ -281,8 +309,7 @@ async def update_bulk(cls, bulk_keys_values: dict = {}, bulk_custom_keys_values: bulk_custom_keys_values = convert_custom_keys_values(bulk_custom_keys_values) bulk_keys_values = {**bulk_keys_values, **bulk_custom_keys_values} - cls.command = "update_bulk" - query = convert_to_binary_query(cls, bulk_keys_values=bulk_keys_values) + query = convert_to_binary_query(cls, command="update_bulk", bulk_keys_values=bulk_keys_values) return await cls._run_query(query) @classmethod @@ -301,14 +328,7 @@ async def lookup_keys_where(cls, limit: Union[int, list] = 0, schema: Union[str, ValueError: If no filters are provided. """ - if schema: - cls.schema = str(schema) - else: - cls.schema = None - - cls.command = "lookup_keys" - cls.limit_output = handle_limit(limit) - query = convert_to_binary_query(cls, search_criteria=filters) + query = convert_to_binary_query(cls, command="lookup_keys", limit_output=handle_limit(limit), search_criteria=filters, schema=str(schema) if schema else None) return await cls._run_query(query) @classmethod @@ -330,14 +350,7 @@ async def lookup_values_where(cls, limit: Union[int, list] = 0, with_pointers: b ValueError: If no filters are provided. """ - if schema: - cls.schema = str(schema) - else: - cls.schema = None - - cls.command = "lookup_values" - cls.limit_output = handle_limit(limit) - query = convert_to_binary_query(cls, search_criteria=filters, with_pointers=with_pointers, key_included=key_included, pointers_metadata=pointers_metadata) + query = convert_to_binary_query(cls, command="lookup_values", limit_output=handle_limit(limit), search_criteria=filters, with_pointers=with_pointers, key_included=key_included, pointers_metadata=pointers_metadata, schema=str(schema) if schema else None) return await cls._run_query(query) @classmethod @@ -363,21 +376,21 @@ async def list_all_depending_keys(cls, key: Union[str, None] = None, custom_key: ValueError: If both `key` and `custom_key` are empty, as one of them is required to form a valid query. """ + if key and custom_key: + raise ValueError("Provide either key or custom_key, not both.") + if custom_key and len(custom_key) > 0: key = convert_custom_key(custom_key) if not key: raise ValueError("No key provided") - cls.command = "list_all_depending_keys" - - query = convert_to_binary_query(cls, key=key) + query = convert_to_binary_query(cls, command="list_all_depending_keys", key=key) return await cls._run_query(query) @classmethod async def get_len(cls): - cls.command = "get_len" - query = convert_to_binary_query(cls) + query = convert_to_binary_query(cls, command="get_len") return await cls._run_query(query) @classmethod @@ -416,8 +429,7 @@ async def remove_keyspace(cls): @classmethod async def list_all_schemas_in_keyspace(cls): - cls.command = "list_all_schemas_in_keyspace" - query = convert_to_binary_query(cls) + query = convert_to_binary_query(cls, command="list_all_schemas_in_keyspace") return await cls._run_query(query) @classmethod diff --git a/montycat/store_classes/persistent.py b/montycat/store_classes/persistent.py index 23f5e7b..2f1bf14 100644 --- a/montycat/store_classes/persistent.py +++ b/montycat/store_classes/persistent.py @@ -1,42 +1,10 @@ from ..store_functions.store_generic_functions import convert_to_binary_query, convert_custom_key, handle_limit from typing import Union import orjson -import asyncio class persistent_kv: persistent: bool = True - cache: Union[int, None] = None - compression: bool = False - - @classmethod - async def subscribe(cls, key: Union[str, None]=None, custom_key: Union[str, None]=None, callback=None): - """ - Args: - callback: A function that will be called with the response data as it is received. - Returns: - A generator that yields the server's responses as they are received. - """ - - stop_subscription = asyncio.Event() - - if not callback: - raise ValueError("Callback function is not provided") - - query_dict = { - "subscribe": True, - "store": cls.store, - "keyspace": cls.keyspace, - "key": convert_custom_key(custom_key) if custom_key else key, - "username": cls.username, - "password": cls.password - } - - query = orjson.dumps(query_dict) - - task = asyncio.create_task(cls._run_query(query, callback=callback, stop_event=stop_subscription)) - - return task, stop_subscription @classmethod async def insert_custom_key(cls, custom_key: str): @@ -50,9 +18,7 @@ async def insert_custom_key(cls, custom_key: str): raise ValueError("No custom key provided for insertion.") custom_key_converted = convert_custom_key(custom_key) - cls.command = "insert_custom_key" - - query = convert_to_binary_query(cls, key=custom_key_converted) + query = convert_to_binary_query(cls, command="insert_custom_key", key=custom_key_converted) return await cls._run_query(query) @classmethod @@ -71,9 +37,7 @@ async def insert_custom_key_value(cls, custom_key: str, value: dict): raise ValueError("No custom key provided for insertion.") custom_key_converted = convert_custom_key(custom_key) - cls.command = "insert_custom_key_value" - - query = convert_to_binary_query(cls, key=custom_key_converted, value=value) + query = convert_to_binary_query(cls, command="insert_custom_key_value", key=custom_key_converted, value=value) return await cls._run_query(query) @classmethod @@ -87,9 +51,7 @@ async def insert_value(cls, value: dict): if not value: raise ValueError("No value provided for insertion.") - cls.command = "insert_value" - - query = convert_to_binary_query(cls, value=value) + query = convert_to_binary_query(cls, command="insert_value", value=value) return await cls._run_query(query) @classmethod @@ -111,6 +73,9 @@ async def update_value(cls, key: Union[str, None] = None, custom_key: Union[str, or a string message if the update was unsuccessful. """ + if key and custom_key: + raise ValueError("Provide either key or custom_key, not both.") + if custom_key and len(custom_key) > 0: key = convert_custom_key(custom_key) @@ -119,9 +84,7 @@ async def update_value(cls, key: Union[str, None] = None, custom_key: Union[str, if not key: raise ValueError("No key provided") - cls.command = "update_value" - - query = convert_to_binary_query(cls, key=key, value=filters) + query = convert_to_binary_query(cls, command="update_value", key=key, value=filters) return await cls._run_query(query) @classmethod @@ -137,40 +100,39 @@ async def insert_bulk(cls, bulk_values: list): if not bulk_values: raise ValueError("No values provided for bulk insertion.") - cls.command = "insert_bulk" - query = convert_to_binary_query(cls, bulk_values=bulk_values) + query = convert_to_binary_query(cls, command="insert_bulk", bulk_values=bulk_values) return await cls._run_query(query) @classmethod - async def get_keys(cls, limit: Union[list, int] = [], volumes: list[str] = [], latest_volume: bool = False): + async def get_keys(cls, limit: list[int] = [], volumes: list[str] = [], latest_volume: bool = False): """ Args: - Limit: A list of two integers that determine the range of keys to retrieve. - Example: limit = [10, 20] will retrieve keys 10 to 20. + limit: A list of two integers [start, stop] to retrieve keys in range. volumes (list[str], optional): A list of volumes to retrieve. Default is an empty list. latest_volume (bool, optional): Whether to retrieve keys from the latest volume. Default is False. Returns: A list of keys in the store. Class 'str' if the get operation failed. """ - if not volumes and not latest_volume and not limit and (not isinstance(limit, list) or len(limit) != 2 or (limit[0] == 0 and limit[1] == 0)): - raise ValueError("Please provide keys or volumes/latest volume or limit.") - - cls.limit_output = handle_limit(limit) - cls.command = "get_keys" + if not volumes and not latest_volume: + if not limit or limit == [0, 0]: + raise ValueError("Please provide volumes/latest volume or limit.") - query = convert_to_binary_query(cls, volumes=volumes, latest_volume=latest_volume) + query = convert_to_binary_query(cls, command="get_keys", limit_output=handle_limit(limit), volumes=volumes, latest_volume=latest_volume) return await cls._run_query(query) @classmethod - async def create_keyspace(cls): + async def create_keyspace(cls, cache: Union[int, None] = None, compression: bool = False): """ Creates a new keyspace in the store with the specified settings for persistence, distribution, caching, and compression. + + Args: + cache: Optional cache size in bytes. If None, no cache is used. + compression: Whether to enable compression. Default is False. + Returns: bool: True if the keyspace was created successfully, False otherwise. - Raises: - ValueError: If the keyspace is not set to be persistent. """ query = orjson.dumps({ "raw": [ @@ -179,8 +141,8 @@ async def create_keyspace(cls): "keyspace", cls.keyspace, "persistent", "y", "distributed", "y" if cls.distributed else "n", - "cache", cls.cache if cls.cache else "0", - "compression", "y" if cls.compression else "n" + "cache", cache if cache else "0", + "compression", "y" if compression else "n" ], "credentials": [cls.username, cls.password] }) @@ -188,24 +150,24 @@ async def create_keyspace(cls): return await cls._run_query(query) @classmethod - async def update_cache_and_compression(cls): + async def update_cache_and_compression(cls, cache: Union[int, None] = None, compression: bool = False): """ Updates the cache size and compression settings for the current store. + Args: + cache: Optional cache size in bytes. If None, no cache is used. + compression: Whether to enable compression. Default is False. + Returns: bool """ - - if not cls.persistent: - raise ValueError("Cache and compression settings can only be updated for persistent keyspaces.") - query = orjson.dumps({ "raw": [ 'update-cache-compression', "store", cls.store, "keyspace", cls.keyspace, - "cache", cls.cache if cls.cache else "0", - "compression", "y" if cls.compression else "n" + "cache", cache if cache else "0", + "compression", "y" if compression else "n" ], "credentials": [cls.username, cls.password] }) diff --git a/montycat/store_functions/store_generic_functions.py b/montycat/store_functions/store_generic_functions.py index 7c50e74..8764cf1 100644 --- a/montycat/store_functions/store_generic_functions.py +++ b/montycat/store_functions/store_generic_functions.py @@ -103,6 +103,7 @@ def normalize_bools(s): def convert_to_binary_query( cls: Type, + command: str = "", key: Union[str, None] = None, search_criteria: Dict[str, Any] = None, value: Dict[str, Any] = None, @@ -115,6 +116,8 @@ def convert_to_binary_query( latest_volume: bool = False, key_included: bool = False, pointers_metadata: bool = False, + limit_output: dict = {}, + schema: Union[str, None] = None, ) -> bytes: """ Converts parameters into a binary query format suitable for transmission. @@ -158,7 +161,7 @@ def convert_to_binary_query( if len(unique_schemas) > 1: raise ValueError("Bulk values should fit only one schema") - cls.schema = schemas[0] if schemas else None + schema = schemas[0] if schemas else None bulk_values = [ modify_pointers({k: v for k, v in item.items() if k != 'schema'}) for item in bulk_values @@ -174,23 +177,23 @@ def convert_to_binary_query( bulk_keys = [str(k) for k in bulk_keys] if 'schema' in value: - cls.schema = value.pop('schema') + schema = value.pop('schema') search_criteria = handle_timestamps_and_pointers(search_criteria) value = handle_timestamps_and_pointers(value) query_dict = { - "schema": cls.schema, + "schema": schema, "username": cls.username, "password": cls.password, "keyspace": cls.keyspace, "store": cls.store, "persistent": cls.persistent, "distributed": cls.distributed, - "limit_output": cls.limit_output, + "limit_output": limit_output, "key": key if key == None else str(key), "value": normalize_bools(value), - "command": cls.command, + "command": command, "expire": expire_sec, "bulk_values": [normalize_bools(v) for v in bulk_values], "bulk_keys": bulk_keys, From dedfbb8e809dc908c3025c1839cdfab3bad877b0 Mon Sep 17 00:00:00 2001 From: Eugene&Monty <136487467+EugeneSukharev1988@users.noreply.github.com> Date: Mon, 11 May 2026 14:08:32 -0500 Subject: [PATCH 2/2] refactor: improve bulk key handling by using explicit concatenation --- montycat/core/utils.py | 33 ++++++++++++++++++++++----------- montycat/store_classes/kv.py | 4 ++-- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/montycat/core/utils.py b/montycat/core/utils.py index d6680cb..ab9fe53 100644 --- a/montycat/core/utils.py +++ b/montycat/core/utils.py @@ -20,15 +20,16 @@ async def send_data(host: str, port: int, query: bytes, callback=None, stop_even ConnectionRefusedError: If the server refuses the connection. """ CHUNK_SIZE = 1024 * 256 - try: - if tls: - context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) - context.check_hostname = False - context.verify_mode = ssl.CERT_NONE - else: - context = None + if tls: + context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + else: + context = None + writer = None + try: reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port, ssl=context), timeout=10.0 @@ -56,8 +57,6 @@ async def send_data(host: str, port: int, query: bytes, callback=None, stop_even callback(response) data.clear() - writer.close() - await writer.wait_closed() return None # subscription ended else: @@ -67,11 +66,23 @@ async def send_data(host: str, port: int, query: bytes, callback=None, stop_even data.extend(chunk) break data.extend(chunk) - writer.close() - await writer.wait_closed() return recursive_parse_orjson(data.decode().strip()) except Exception as e: return f"Error: {e}" + finally: + # Always close the connection — including on asyncio.CancelledError + # (which doesn't inherit from Exception), so the server-side + # subscription handler sees EOF and tears down its watchers. Without + # this, the cancelled subscription leaves the TCP socket open until + # GC, and the server's sled subscribers stay alive — which then + # deadlocks any subsequent remove_keyspace/remove_store on the same + # store. + if writer is not None: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass def recursive_parse_orjson(data): """ diff --git a/montycat/store_classes/kv.py b/montycat/store_classes/kv.py index 3a024e9..e5a3d07 100644 --- a/montycat/store_classes/kv.py +++ b/montycat/store_classes/kv.py @@ -226,7 +226,7 @@ async def delete_bulk(cls, bulk_keys: list = [], bulk_custom_keys: list = []): """ if len(bulk_custom_keys) > 0: bulk_custom_keys = convert_custom_keys(bulk_custom_keys) - bulk_keys += bulk_custom_keys + bulk_keys = bulk_keys + bulk_custom_keys if not bulk_keys: raise ValueError("No keys provided for deletion.") @@ -268,7 +268,7 @@ async def get_bulk( if len(bulk_custom_keys) > 0: bulk_custom_keys = convert_custom_keys(bulk_custom_keys) - bulk_keys += bulk_custom_keys + bulk_keys = bulk_keys + bulk_custom_keys selected_options = sum([ bool(bulk_keys),