From a2caa4edd830a2506d898b7d5277e9f46b341369 Mon Sep 17 00:00:00 2001 From: Michael Osthege Date: Wed, 13 May 2026 10:44:30 +0200 Subject: [PATCH] Use async events --- cri_lib/cri_controller.py | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/cri_lib/cri_controller.py b/cri_lib/cri_controller.py index 2dbb407..1f2e9cb 100644 --- a/cri_lib/cri_controller.py +++ b/cri_lib/cri_controller.py @@ -33,6 +33,15 @@ class MotionType(Enum): Platform = "Platform" +async def wait_event_with_timeout(event: asyncio.Event, timeout: float): + t_start = time() + while not event.is_set(): + await asyncio.sleep(0.001) + if time() - t_start > timeout: + raise TimeoutError + return + + class CRIClient: """Client with implementations for read-only communication.""" @@ -66,7 +75,7 @@ def __init__(self) -> None: self.sent_command_counter_lock = threading.Lock() self.sent_command_counter = 0 self.answer_events_lock = threading.Lock() - self.answer_events: dict[str, threading.Event] = {} + self.answer_events: dict[str, asyncio.Event] = {} self.error_messages: dict[str, str] = {} self.status_callback: Callable | None = None @@ -153,7 +162,7 @@ def close(self) -> None: def _register_answer(self, answer_id: str) -> None: with self.answer_events_lock: - self.answer_events[answer_id] = threading.Event() + self.answer_events[answer_id] = asyncio.Event() def _send_command( self, @@ -199,9 +208,9 @@ def _send_command( if register_answer: with self.answer_events_lock: if fixed_answer_name is not None: - self.answer_events[fixed_answer_name] = threading.Event() + self.answer_events[fixed_answer_name] = asyncio.Event() else: - self.answer_events[str(command_counter)] = threading.Event() + self.answer_events[str(command_counter)] = asyncio.Event() try: with self.socket_write_lock: @@ -280,6 +289,14 @@ def _wait_for_answer( self, message_id: str | int, timeout: float | None = DEFAULT, # type: ignore + ) -> None | str: + loop = asyncio.get_event_loop() + return loop.run_until_complete(self._wait_for_answer_async(message_id, timeout)) + + async def _wait_for_answer_async( + self, + message_id: str | int, + timeout: float | None = DEFAULT, # type: ignore ) -> None | str: """Waits for an answer to a message. The answer event will be removed after the call, even if there was a timeout. Choose timeout accordingly. @@ -312,9 +329,9 @@ def _wait_for_answer( return None wait_event = self.answer_events[message_id] - if timeout is DEFAULT: + if timeout is DEFAULT or timeout is None: timeout = self.DEFAULT_ANSWER_TIMEOUT - success = wait_event.wait(timeout=timeout) + success = await wait_event_with_timeout(wait_event, timeout=timeout) if not success: raise CRICommandTimeOutError()