diff --git a/.github/workflows/sbom.yml b/.github/workflows/sbom.yml index 54c3ecc..409b805 100644 --- a/.github/workflows/sbom.yml +++ b/.github/workflows/sbom.yml @@ -42,4 +42,4 @@ jobs: - name: Scan for vulnerabilities uses: anchore/scan-action@v7 with: - sbom: ~/output/sbom.json \ No newline at end of file + sbom: ~/output/sbom.json diff --git a/cri_lib/cri_controller.py b/cri_lib/cri_controller.py index 2dbb407..7a76a1f 100644 --- a/cri_lib/cri_controller.py +++ b/cri_lib/cri_controller.py @@ -48,6 +48,9 @@ def __init__(self) -> None: self.robot_state: RobotState = RobotState() self.robot_state_lock = threading.Lock() + self.file_list: list = [] + self.file_list_lock: threading.Lock = threading.Lock() + self.parser = CRIProtocolParser(self.robot_state, self.robot_state_lock) self.connected = False @@ -342,6 +345,9 @@ def _parse_message(self, message: str) -> None: if notification["answer"] == "CAN": self.can_queue.put_nowait(notification["can"]) + if notification["answer"] == "info_filelist": + self.file_list = self.parser.file_list + with self.answer_events_lock: msg_id = notification["answer"] @@ -455,6 +461,39 @@ def get_motor_temperatures( else: return True + def list_files(self, target_directory: str = "Programs") -> bool: + """Request a list of all files in the directory, which is relative to the /Data/ directory. + + Parameters + ---------- + directory : str + directory on iRC `/Data/` in which files are located, e.g. `Programs` for normal robot programs + + Returns + ------- + a list of files + """ + + command = f"CMD ListFiles {target_directory}" + + if ( + self._send_command( + command=command, register_answer=True, fixed_answer_name="info_filelist" + ) + is not None + ): + if ( + error_msg := self._wait_for_answer( + "info_filelist", timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in ListFiles command: %s", error_msg) + return False + else: + return True + else: + return False + class CRIController(CRIClient): """A connected ``CRIClient`` with control capabilities.""" @@ -476,6 +515,7 @@ def __init__(self) -> None: "E3": 0.0, } self.jog_speeds_lock = threading.Lock() + self.file_list: list = [] super().__init__() def _bg_alivejog_thread(self) -> None: diff --git a/cri_lib/cri_protocol_parser.py b/cri_lib/cri_protocol_parser.py index b383b4d..c2e94e9 100644 --- a/cri_lib/cri_protocol_parser.py +++ b/cri_lib/cri_protocol_parser.py @@ -1,4 +1,5 @@ import logging +import threading import time from threading import Lock from typing import Any, Sequence @@ -29,6 +30,8 @@ class CRIProtocolParser: def __init__(self, robot_state: RobotState, robot_state_lock: Lock): self.robot_state = robot_state self.robot_state_lock = robot_state_lock + self.file_list: list = [] + self.file_list_lock: Lock = threading.Lock() def parse_message( self, message: str @@ -582,6 +585,7 @@ def _parse_info(self, parameters: list[str]) -> str | None: self.robot_state.referencing_state = ref_state return "info_referencing" + elif parameters[0] == "BoardTemp": temperatures = [float(param) for param in parameters[1:]] @@ -589,6 +593,7 @@ def _parse_info(self, parameters: list[str]) -> str | None: self.robot_state.board_temps = temperatures return "info_boardtemp" + elif parameters[0] == "MotorTemp": temperatures = [float(param) for param in parameters[1:]] @@ -596,6 +601,16 @@ def _parse_info(self, parameters: list[str]) -> str | None: self.robot_state.motor_temps = temperatures return "info_motortemp" + + elif parameters[0] == "FileList": + with self.file_list_lock: + self.file_list.clear() # direct point to parameters[] will break the shared reference + self.file_list.extend( + parameters[2:] + ) # first element is the target_folder + print(self.file_list) + return "info_filelist" + else: return None diff --git a/tests/test_cri_parser.py b/tests/test_cri_parser.py index 4e15f92..58f9b7c 100644 --- a/tests/test_cri_parser.py +++ b/tests/test_cri_parser.py @@ -613,3 +613,17 @@ def test_info_motortemps(): assert controller.answer_events["info_motortemp"].is_set() assert controller.robot_state.motor_temps == pytest.approx(motor_temps) + + +def test_list_files(): + test_message = ( + "CRISTART 1234 INFO FileList BaseDir FirstFile.xml SecondFile.txt CRIEND" + ) + + controller = CRIController() + controller.answer_events["info_filelist"] = threading.Event() + controller._parse_message(test_message) + + assert controller.answer_events["info_filelist"].is_set() + assert "FirstFile.xml" in controller.file_list + assert "SecondFile.txt" in controller.file_list