Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions cri_lib/cri_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ class MotionType(Enum):
Platform = "Platform"


async def wait_event_with_timeout(event: asyncio.Event, timeout: float):
t_start = time()
while not event.is_set():
await asyncio.sleep(0.001)
if time() - t_start > timeout:
raise TimeoutError
return


class CRIClient:
"""Client with implementations for read-only communication."""

Expand Down Expand Up @@ -66,7 +75,7 @@ def __init__(self) -> None:
self.sent_command_counter_lock = threading.Lock()
self.sent_command_counter = 0
self.answer_events_lock = threading.Lock()
self.answer_events: dict[str, threading.Event] = {}
self.answer_events: dict[str, asyncio.Event] = {}
self.error_messages: dict[str, str] = {}

self.status_callback: Callable | None = None
Expand Down Expand Up @@ -153,7 +162,7 @@ def close(self) -> None:

def _register_answer(self, answer_id: str) -> None:
with self.answer_events_lock:
self.answer_events[answer_id] = threading.Event()
self.answer_events[answer_id] = asyncio.Event()

def _send_command(
self,
Expand Down Expand Up @@ -199,9 +208,9 @@ def _send_command(
if register_answer:
with self.answer_events_lock:
if fixed_answer_name is not None:
self.answer_events[fixed_answer_name] = threading.Event()
self.answer_events[fixed_answer_name] = asyncio.Event()
else:
self.answer_events[str(command_counter)] = threading.Event()
self.answer_events[str(command_counter)] = asyncio.Event()

try:
with self.socket_write_lock:
Expand Down Expand Up @@ -280,6 +289,14 @@ def _wait_for_answer(
self,
message_id: str | int,
timeout: float | None = DEFAULT, # type: ignore
) -> None | str:
loop = asyncio.get_event_loop()
return loop.run_until_complete(self._wait_for_answer_async(message_id, timeout))

async def _wait_for_answer_async(
self,
message_id: str | int,
timeout: float | None = DEFAULT, # type: ignore
) -> None | str:
"""Waits for an answer to a message.
The answer event will be removed after the call, even if there was a timeout. Choose timeout accordingly.
Expand Down Expand Up @@ -312,9 +329,9 @@ def _wait_for_answer(
return None
wait_event = self.answer_events[message_id]

if timeout is DEFAULT:
if timeout is DEFAULT or timeout is None:
timeout = self.DEFAULT_ANSWER_TIMEOUT
success = wait_event.wait(timeout=timeout)
success = await wait_event_with_timeout(wait_event, timeout=timeout)

if not success:
raise CRICommandTimeOutError()
Expand Down
Loading