diff --git a/README.md b/README.md index 53e1e16..62014ef 100644 --- a/README.md +++ b/README.md @@ -202,6 +202,8 @@ must be called to make others available: │ └── edit ├── startEdit() ├── stopEdit() + ├── startSimulation() + ├── stopSimulation() ├── heartbeat(heartbeatInterval) ├── doAutoHeartbeat() ├── cancelSession() @@ -219,7 +221,10 @@ must be called to make others available: ├── readApplicationConfigFile(applicationFile) ├── readDeviceConfigFile(configFile) └── setOperatingMode(mode) - └── Edit (object) + └── Simulation (object) # mode: 2 + ├── createPayloadList(imagePaths) + ├── processImageSequence(imageSequence, forceTrigger) + └── Edit (object) # mode: 1 ├── @property │ └── application ├── stopEditingApplication() @@ -308,3 +313,4 @@ You can run the tests for the PCIC and RPC modules separately with following com $ python -m unittest tests/test_imager.py -vvv $ python -m unittest tests/test_imageQualityCheck.py -vvv $ python -m unittest tests/test_device.py -vvv + $ python -m unittest tests/test_simulation.py -vvv diff --git a/examples/O2X5xx/simulation_runner_FTP.py b/examples/O2X5xx/simulation_runner_FTP.py new file mode 100644 index 0000000..9abd618 --- /dev/null +++ b/examples/O2X5xx/simulation_runner_FTP.py @@ -0,0 +1,203 @@ +""" +simulation_ftp_images.py +------------------------ +This script replays batches of images that were previously saved to a local FTP folder +by an ifm O2x5xx sensor (FTP-push feature) and feeds them back into the sensor's +simulation mode. + +Workflow overview: + 1. Scan a local FTP folder for JPEG images pushed by the sensor. + 2. Group the images into "batches" based on the encoded filename information + (timestamp, sensor name, application name, image index, and expected PASS/FAIL result). + 3. Connect to the sensor via RPC (configuration) and PCIC (result output). + 4. Switch the sensor to simulation mode. + 5. For every batch, send the ordered image sequence to the sensor and read back the + application result. + 6. Compare the application result against the PASS/FAIL label that was stored in the + filename and print a warning if the two values do not match. + +Expected filename format (set in the sensor's FTP-push configuration): + ______.jpg + Example: 20000101_002456_106_Demo Sensor_Advanced application_01_PASS.jpg +""" + +import os.path +import sys +from pathlib import Path +import glob +import json + +# --------------------------------------------------------------------------- +# Import the o2x5xx library. +# When the package is installed the top-level imports are +# used. When running the script directly from the repository the source folder +# is added to sys.path so the local source code is used instead. +# --------------------------------------------------------------------------- +try: + from o2x5xx import O2x5xxRPCDevice + from o2x5xx import O2x5xxPCICDevice + if not hasattr(sys.modules[__name__], "Simulation"): + raise ModuleNotFoundError +except (ModuleNotFoundError, ImportError): + sys.path.append(str(Path(__file__).resolve().parents[2])) + from source.rpc.client import O2x5xxRPCDevice + from source.pcic.client import O2x5xxPCICDevice + + +def check_or_make_directory(path) -> None: + """Create *path* (including any missing parent directories) if it does not exist yet.""" + if not os.path.exists(path): + os.makedirs(path) + + +def generate_dataset_from_ftp_push(ftp_folder_path, read_res_files=True) -> dict: + """Scan *ftp_folder_path* for JPEG images and group them into batches. + + The sensor's FTP-push feature saves one JPEG file per image per trigger + cycle. All images that belong to the same trigger cycle share the same + timestamp, sensor name, and application name – they are grouped together + as a *batch*. + + Parameters + ---------- + ftp_folder_path : str + Path to the local folder that contains the JPEG files pushed by the + sensor. + read_res_files : bool, optional + If True, reads and includes .res files for each batch. Default is True. + + Returns + ------- + dict + A dictionary whose keys are unique batch identifiers of the form + ``"__"`` and whose values are + dictionaries with the following keys: + + * ``"timestamp"`` – recording timestamp encoded in the filename + (e.g. ``"20000101_002456_106"``). + * ``"sensor_name"`` – name of the sensor that captured the images. + * ``"application_name"`` – name of the active application at capture + time. + * ``"images"`` – list of dicts, each containing ``"image"`` (full file + path) and ``"image_number"`` (index within the batch, e.g. ``"01"``). + * ``"batch_pass_fail_result"`` – expected result encoded in the + filename (``"PASS"`` or ``"FAIL"``). + * ``"res_data"`` – result data loaded from the corresponding .res file. + """ + # Collect all JPEG files in the given folder. + all_jpg_images = glob.glob(r"{folder}\*.jpg".format(folder=ftp_folder_path)) + dataset = {} + + for image in all_jpg_images: + # Extract metadata that the sensor encoded in the filename. + filename = os.path.basename(image) + batch_timestamp = filename.split("_")[0] + "_" + filename.split("_")[1] + "_" + filename.split("_")[2] # e.g. 20000101_002456_106 + batch_sensor_name = filename.split("_")[3] # e.g. "Demo Sensor" + batch_application_name = filename.split("_")[4] # e.g. "Advanced application" + batch_image_number = filename.split("_")[5] # e.g. "01", "02", … + batch_pass_fail_result = filename.split("_")[6].split(".")[0] # e.g. "PASS" or "FAIL" + + # Build the unique key that groups images belonging to the same trigger cycle. + batch_identifier = batch_timestamp + "_" + batch_sensor_name + "_" + batch_application_name + + # Check if a res file for the filename exists - if not skip reading the res file and image. + # The res file looks like following: 20000101_002448_150_Demo Sensor_Advanced application_PASS.res + # Compared to an image file: 20000101_002448_150_Demo Sensor_Advanced application_01_PASS.jpg + res_filename = "_".join(filename.split("_")[:-2]) + "_" + batch_pass_fail_result + ".res" + res_file = os.path.join(ftp_folder_path, res_filename) + + if read_res_files: + if not os.path.isfile(res_file): + print("WARNING: Missing .res file for image {}, skipping this image.".format(image)) + continue + else: + with open(res_file, "r") as f: + res_data = json.load(f) + + + # Create the batch entry the first time this identifier is encountered. + if batch_identifier not in dataset: + dataset[batch_identifier] = { + "timestamp": batch_timestamp, + "sensor_name": batch_sensor_name, + "application_name": batch_application_name, + "images": [], + "batch_pass_fail_result": batch_pass_fail_result, + "res_data": res_data + } + + # Append this image to the batch so we can later replay the whole sequence. + dataset[batch_identifier]["images"].append({"image": image, "image_number": batch_image_number}) + + return dataset + + +if __name__ == "__main__": + + # --------------------------------------------------------------------------- + # Step 1 – Build the dataset from the FTP folder. + # --------------------------------------------------------------------------- + FTP_FOLDER_PATH = r"C:\ftp\test\FTPUser" # Enter your FTP folder path here (the folder that contains the JPEG files pushed by the sensor). + SENSOR_IP_ADDRESS = "192.168.0.69" # Enter your sensor's IP address here. Make sure to use the correct IP address if you have multiple sensors in your network. + SENSOR_PORT = 50010 # Enter the PCIC port configured on your sensor for outputting results (default is 50010). + my_dataset = generate_dataset_from_ftp_push(ftp_folder_path=FTP_FOLDER_PATH) + + # Print total number of batches found in the dataset. + print("Total number of batches in dataset: {}".format(len(my_dataset))) + + # --------------------------------------------------------------------------- + # Step 2 – Connect to the sensor and replay each batch in simulation mode. + # --------------------------------------------------------------------------- + # Open a PCIC connection (port 50010) to receive application results and an + # RPC connection (port 80) to control the sensor configuration. + with O2x5xxPCICDevice(address=SENSOR_IP_ADDRESS, port=SENSOR_PORT) as pcic_device: + with O2x5xxRPCDevice(address=SENSOR_IP_ADDRESS) as rpc: + # Request an exclusive session so we can change the operating mode. + with rpc.mainProxy.requestSession(): + # Switch to operating mode 2 (simulation mode) – this allows + # sending arbitrary image sequences to the sensor's vision pipeline + # instead of relying on the live camera image. + with rpc.sessionProxy.setOperatingMode(mode=2): + # Obtain the simulation proxy that provides the FTP-replay API. + simulation = rpc.simulation + + for batch_number, (batch_identifier, batch_data) in enumerate(my_dataset.items(), start=1): + # Sort the images by their index so they are fed into the + # pipeline in the correct order (01, 02, 03, …). + image_paths_ordered = [ + image_info["image"] + for image_info in sorted(batch_data["images"], key=lambda x: x["image_number"]) + ] + + # Build the binary payload list expected by the simulation API. + payloadList = simulation.createPayloadList(image_paths_ordered) + + # Send the image sequence to the sensor and trigger processing. + # forceTrigger=True ensures the pipeline runs even if no + # external trigger signal is present. + simulation.processImageSequence(imageSequence=payloadList, forceTrigger=True) + + # Read the application result returned by the sensor via PCIC. + # The sensor is configured to output results in the format: + # ";" + # where is "1" for PASS and "0" for FAIL. + ticket, answer = pcic_device.read_next_answer() + + # TODO: Enter your index here for your own PASS/FAIL result from the PCIC interface. + application_result = answer.decode().split(";")[1].strip() # raw value: "1" or "0" + + # Convert the numeric result to a human-readable label. + application_result = "PASS" if application_result == "1" else "FAIL" if application_result == "0" else application_result + + print("Processed batch {} of {}: {} with result: {}".format( + batch_number, len(my_dataset), batch_identifier, answer.decode() + )) + + # Verify that the result matches the PASS/FAIL label that was + # encoded in the original filename. A mismatch indicates that + # the sensor's detection logic produced a different outcome when + # replaying the image compared to the live capture. + if application_result != batch_data["batch_pass_fail_result"]: + print("WARNING: Result mismatch for batch {}: expected {}, got {}".format( + batch_identifier, batch_data["batch_pass_fail_result"], application_result + )) diff --git a/setup.py b/setup.py index 81dcb0e..ea4e7ff 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ def read_requires(): setup( name='o2x5xx', - version='0.4.3-pre', + version='0.5.0-pre', description='A Python library for ifm O2x5xx (O2D5xx / O2I5xx) devices', author='Michael Gann', author_email='support.efector.object-ident@ifm.com', diff --git a/source/rpc/client.py b/source/rpc/client.py index af31e83..3563097 100644 --- a/source/rpc/client.py +++ b/source/rpc/client.py @@ -1,5 +1,7 @@ -from .proxy import MainProxy, SessionProxy, EditProxy, ApplicationProxy, ImagerProxy +from typing import LiteralString +from .proxy import MainProxy, SessionProxy, EditProxy, ApplicationProxy, ImagerProxy, SimulationProxy from .session import Session +from .simulation import Simulation from .edit import Edit from .application import Application from .imager import Imager @@ -38,13 +40,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): @property def sessionProxy(self) -> SessionProxy: return getattr(self, "_sessionProxy") + + @property + def simulationProxy(self) -> SimulationProxy: + return getattr(self, "_simulationProxy") @property - def editProxy(self) -> [EditProxy, None]: - try: - return getattr(self, "_editProxy") - except AttributeError: - return None + def editProxy(self) -> EditProxy: + return getattr(self, "_editProxy") @property def applicationProxy(self) -> ApplicationProxy: @@ -58,6 +61,17 @@ def imagerProxy(self) -> ImagerProxy: def session(self) -> Session: if self.sessionProxy: return Session(sessionProxy=self.sessionProxy, device=self) + else: + raise AttributeError("No sessionProxy available! Please first create a sessionProxy " + f"Checkout the parametrization structure here: \n {self._getParametrizationStructure()}.") + + @property + def simulation(self) -> Simulation: + if self.simulationProxy: + return Simulation(simulationProxy=self.simulationProxy, device=self) + else: + raise AttributeError("No simulationProxy available! Please first create a simulationProxy " + f"Checkout the parametrization structure here: \n {self._getParametrizationStructure()}.") @property def edit(self) -> Edit: @@ -65,18 +79,38 @@ def edit(self) -> Edit: return Edit(editProxy=self.editProxy, device=self) else: raise AttributeError("No editProxy available! Please first create an editProxy " - "with method self.device.session.requestOperatingMode(Mode=1) before using Edit!") + f"Checkout the parametrization structure here: \n {self._getParametrizationStructure()}.") @property def application(self) -> Application: if self.applicationProxy: return Application(applicationProxy=self.applicationProxy, device=self) + else: + raise AttributeError("No applicationProxy available! Please first create an applicationProxy " + f"Checkout the parametrization structure here: \n {self._getParametrizationStructure()}.") @property def imager(self) -> Imager: if self.imagerProxy: return Imager(imagerProxy=self.imagerProxy, device=self) - + else: + raise AttributeError("No imagerProxy available! Please first create an imagerProxy " + f"Checkout the parametrization structure here: \n {self._getParametrizationStructure()}.") + + def _getParametrizationStructure(self) -> LiteralString: + structure = """ + # Main API (object) (O2x5xxRPCDevice) + # └── requestSession(password, sessionID) + # └── Session (object) + # └── setOperatingMode(mode) + # └── Edit (object) + # └── editApplication(applicationIndex) + # └── Application (object) + # └── editImage(imageIndex) + # └── Imager (object) + """ + return structure + def _getDeviceMeta(self): _deviceType = self.getParameter(value="DeviceType") result = DevicesMeta.getData(key="DeviceType", value=_deviceType) diff --git a/source/rpc/proxy.py b/source/rpc/proxy.py index d9ca506..73f9025 100644 --- a/source/rpc/proxy.py +++ b/source/rpc/proxy.py @@ -149,20 +149,32 @@ def setOperatingMode(self, mode, timeout=SOCKET_TIMEOUT): Args: mode (int): operating mode + 0: run mode, 1: edit mode, 2: simulation mode timeout (float): Timeout values which is valid for the EditProxy. Argument can be a non-negative floating point number expressing seconds, or None. If None, SOCKET_TIMEOUT value is used as default """ try: self.proxy.setOperatingMode(mode) - self.device._editURL = self.baseURL + 'edit/' - self.device._editProxy = EditProxy(url=self.device._editURL, device=self.device, timeout=timeout) - yield + if mode == 1: + self.device._editURL = self.baseURL + 'edit/' + self.device._editProxy = EditProxy(url=self.device._editURL, device=self.device, timeout=timeout) + yield + if mode == 2: + self.device._simulationURL = self.baseURL + 'simulation/' + self.device._simulationProxy = SimulationProxy(url=self.device._simulationURL, device=self.device, + timeout=timeout) + yield finally: self.proxy.setOperatingMode(0) - self.device._editProxy.close() - self.device._editURL = None - self.device._editProxy = None + if mode == 1: + self.device._editProxy.close() + self.device._editURL = None + self.device._editProxy = None + if mode == 2: + self.device._simulationProxy.close() + self.device._simulationURL = None + self.device._simulationProxy = None class EditProxy(BaseProxy): @@ -240,3 +252,12 @@ def __init__(self, url, device, timeout=SOCKET_TIMEOUT): self.device = device super().__init__(url, device, timeout) + +class SimulationProxy(BaseProxy): + """Proxy representing simulation mode.""" + + def __init__(self, url, device, timeout=SOCKET_TIMEOUT): + self.baseURL = url + self.device = device + + super().__init__(url, device, timeout) diff --git a/source/rpc/session.py b/source/rpc/session.py index f01a3d2..5d7b0d5 100644 --- a/source/rpc/session.py +++ b/source/rpc/session.py @@ -6,6 +6,7 @@ import base64 from .edit import Edit from .proxy import EditProxy +from .proxy import SimulationProxy class Session(object): @@ -40,15 +41,38 @@ def stopEdit(self) -> None: self._device._editURL = None self._device._editProxy = None - def setOperatingMode(self, mode) -> [None, Edit]: + def startSimulation(self) -> None: + """ + Starting the simulation mode and requesting a Simulation object. + + :return: + """ + self.setOperatingMode(2) + _simulationURL = self._sessionProxy.baseURL + 'simulation/' + setattr(self._device, "_simulationURL", _simulationURL) + simulationProxy = SimulationProxy(url=_simulationURL, device=self._device) + setattr(self._device, "_simulationProxy", simulationProxy) + return self._device.simulation + + def stopSimulation(self) -> None: + """ + Stopping the simulation mode. + + :return: None + """ + self.setOperatingMode(0) + self._device._simulationURL = None + self._device._simulationProxy = None + + def setOperatingMode(self, mode) -> None: """ Changes the operation mode of the device. Setting this to "edit" will enable the "EditMode"-object on RPC. :param mode: 1 digit 0: run mode 1: edit mode - 2: simulation mode (Not implemented!) - :return: None or Edit object + 2: simulation mode + :return: None """ self._sessionProxy.proxy.setOperatingMode(mode) diff --git a/source/rpc/simulation.py b/source/rpc/simulation.py index 20d174e..622820c 100644 --- a/source/rpc/simulation.py +++ b/source/rpc/simulation.py @@ -1,38 +1,36 @@ import xmlrpc.client - class Simulation(object): """ Simulation Mode object """ - def __init__(self, editURL, sessionAPI, mainAPI): - self.url = editURL - self.sessionAPI = sessionAPI - self.mainAPI = mainAPI - self.rpc = xmlrpc.client.ServerProxy(self.url) + def __init__(self, simulationProxy, device): + self._simulationProxy = simulationProxy + self._device = device @staticmethod - def createPayloadList(image_paths: [str, list]) -> list: + def createPayloadList(imagePaths: list = [str, list]) -> list: """ - TODO + Creates a list of xmlrpc.client.Binary objects from a list of image paths or a single image path. + The images should be in the format as generated in the service report (JPEG, 1280 x 960, grayscale 8 bit). - :param image_paths: + :param imagePaths: list of image paths or a single image path :return: """ payloadList = [] - if isinstance(image_paths, list): - for path in image_paths: + if isinstance(imagePaths, list): + for path in imagePaths: with open(r"{p}".format(p=path), "rb") as fh: buf = fh.read() payloadList.append(xmlrpc.client.Binary(buf)) else: - with open(r"{p}".format(p=image_paths), "rb") as fh: + with open(r"{p}".format(p=imagePaths), "rb") as fh: buf = fh.read() payloadList.append(xmlrpc.client.Binary(buf)) return payloadList - def processImageSequence(self, image_sequence: list, force_trigger: bool) -> None: + def processImageSequence(self, imageSequence: list, forceTrigger: bool = True) -> None: """ Sends an Image Sequence and decode them. An additional parameter exists to define if a software trigger should be generated. @@ -49,8 +47,4 @@ def processImageSequence(self, image_sequence: list, force_trigger: bool) -> Non :return: None """ - self.rpc.processImageSequence(image_sequence, force_trigger) - - def __getattr__(self, name): - # Forward otherwise undefined method calls to XMLRPC proxy - return getattr(self.rpc, name) + self._simulationProxy.proxy.processImageSequence(imageSequence, forceTrigger) diff --git a/tests/deviceConfig/Unittest5PolDeviceConfig.o2d5xxcfg b/tests/resources/deviceConfig/Unittest5PolDeviceConfig.o2d5xxcfg similarity index 100% rename from tests/deviceConfig/Unittest5PolDeviceConfig.o2d5xxcfg rename to tests/resources/deviceConfig/Unittest5PolDeviceConfig.o2d5xxcfg diff --git a/tests/resources/deviceConfig/Unittest5PolDeviceConfig.o2u5xxcfg b/tests/resources/deviceConfig/Unittest5PolDeviceConfig.o2u5xxcfg new file mode 100644 index 0000000..727165f Binary files /dev/null and b/tests/resources/deviceConfig/Unittest5PolDeviceConfig.o2u5xxcfg differ diff --git a/tests/deviceConfig/Unittest8PolDeviceConfig.o2d5xxcfg b/tests/resources/deviceConfig/Unittest8PolDeviceConfig.o2d5xxcfg similarity index 100% rename from tests/deviceConfig/Unittest8PolDeviceConfig.o2d5xxcfg rename to tests/resources/deviceConfig/Unittest8PolDeviceConfig.o2d5xxcfg diff --git a/tests/deviceConfig/UnittestApplicationImport.o2d5xxapp b/tests/resources/deviceConfig/UnittestApplicationImport.o2d5xxapp similarity index 100% rename from tests/deviceConfig/UnittestApplicationImport.o2d5xxapp rename to tests/resources/deviceConfig/UnittestApplicationImport.o2d5xxapp diff --git a/tests/resources/deviceConfig/UnittestApplicationImport.o2u5xxapp b/tests/resources/deviceConfig/UnittestApplicationImport.o2u5xxapp new file mode 100644 index 0000000..4ccff6a Binary files /dev/null and b/tests/resources/deviceConfig/UnittestApplicationImport.o2u5xxapp differ diff --git a/tests/resources/simulation/image_01.jpg b/tests/resources/simulation/image_01.jpg new file mode 100644 index 0000000..55f1035 Binary files /dev/null and b/tests/resources/simulation/image_01.jpg differ diff --git a/tests/resources/simulation/image_02.jpg b/tests/resources/simulation/image_02.jpg new file mode 100644 index 0000000..6f0d3f6 Binary files /dev/null and b/tests/resources/simulation/image_02.jpg differ diff --git a/tests/resources/simulation/image_03.jpg b/tests/resources/simulation/image_03.jpg new file mode 100644 index 0000000..1367b83 Binary files /dev/null and b/tests/resources/simulation/image_03.jpg differ diff --git a/tests/resources/simulation/image_04.jpg b/tests/resources/simulation/image_04.jpg new file mode 100644 index 0000000..2fe0ae3 Binary files /dev/null and b/tests/resources/simulation/image_04.jpg differ diff --git a/tests/resources/simulation/image_05.jpg b/tests/resources/simulation/image_05.jpg new file mode 100644 index 0000000..caf8b8b Binary files /dev/null and b/tests/resources/simulation/image_05.jpg differ diff --git a/tests/test_simulation.py b/tests/test_simulation.py new file mode 100644 index 0000000..7ccfc61 --- /dev/null +++ b/tests/test_simulation.py @@ -0,0 +1,66 @@ +from unittest import TestCase +from source import O2x5xxRPCDevice +from tests.utils import * +from .config import * +import xmlrpc + + +class TestRPC_SimulationObject(TestCase): + config_backup = None + config_file = None + app_import_file = None + active_application_backup = None + image_paths = [r"tests\resources\simulation\image_01.jpg", + r"tests\resources\simulation\image_02.jpg", + r"tests\resources\simulation\image_03.jpg", + r"tests\resources\simulation\image_04.jpg", + r"tests\resources\simulation\image_05.jpg",] + + @classmethod + def setUpClass(cls) -> None: + with O2x5xxRPCDevice(deviceAddress) as rpc: + cls.config_file = getImportSetupByPinLayout(rpc=rpc)['config_file'] + cls.app_import_file = getImportSetupByPinLayout(rpc=rpc)['app_import_file'] + if importDeviceConfigUnittests: + cls.active_application_backup = rpc.getParameter("ActiveApplication") + with rpc.mainProxy.requestSession(): + cls.config_backup = rpc.session.exportConfig() + _configFile = rpc.session.readDeviceConfigFile(configFile=cls.config_file) + rpc.session.importConfig(_configFile, global_settings=True, network_settings=False, + applications=True) + + @classmethod + def tearDownClass(cls) -> None: + if importDeviceConfigUnittests: + with O2x5xxRPCDevice(deviceAddress) as rpc: + with rpc.mainProxy.requestSession(): + rpc.session.importConfig(cls.config_backup, global_settings=True, network_settings=False, + applications=True) + if cls.active_application_backup != "0": + rpc.switchApplication(cls.active_application_backup) + + def setUp(self) -> None: + with O2x5xxRPCDevice(deviceAddress) as self.rpc: + self.rpc.switchApplication(4) + + def tearDown(self) -> None: + pass + + def test_createPayloadList(self): + with O2x5xxRPCDevice(deviceAddress) as self.rpc, self.rpc.mainProxy.requestSession(): + with self.rpc.sessionProxy.setOperatingMode(mode=2): + simulation = self.rpc.simulation + payloadList = simulation.createPayloadList(image_paths=self.image_paths) + self.assertEqual(len(payloadList), len(self.image_paths)) + for payload in payloadList: + self.assertIsInstance(payload, xmlrpc.client.Binary) + + def test_processImageSequence(self): + with O2x5xxRPCDevice(deviceAddress) as self.rpc, self.rpc.mainProxy.requestSession(): + with self.rpc.sessionProxy.setOperatingMode(mode=2): + simulation = self.rpc.simulation + payloadList = simulation.createPayloadList(image_paths=self.image_paths) + try: + simulation.processImageSequence(image_sequence=payloadList, force_trigger=True) + except Exception as e: + self.fail(f"processImageSequence raised an exception: {e}") \ No newline at end of file diff --git a/tests/utils.py b/tests/utils.py index ba0bb0f..cd1baf0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -7,8 +7,8 @@ def getImportSetupByPinLayout(rpc): config_file_path = os.path.join(os.getcwd(), "deviceConfig") app_import_file_path = os.path.join(os.getcwd(), "deviceConfig", "UnittestApplicationImport.o2d5xxapp") else: - config_file_path = os.path.join(os.getcwd(), "tests", "deviceConfig") - app_import_file_path = os.path.join(os.getcwd(), "tests", "deviceConfig", "UnittestApplicationImport.o2d5xxapp") + config_file_path = os.path.join(os.getcwd(), "tests", "resources", "deviceConfig") + app_import_file_path = os.path.join(os.getcwd(), "tests", "resources", "UnittestApplicationImport.o2d5xxapp") if pin_layout == 3: result = {'config_file': os.path.join(config_file_path, "Unittest8PolDeviceConfig.o2d5xxcfg"), 'app_import_file': app_import_file_path}