Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ must be called to make others available:
│ └── edit
├── startEdit()
├── stopEdit()
├── startSimulation()
├── stopSimulation()
├── heartbeat(heartbeatInterval)
├── doAutoHeartbeat()
├── cancelSession()
Expand All @@ -219,7 +221,10 @@ must be called to make others available:
├── readApplicationConfigFile(applicationFile)
├── readDeviceConfigFile(configFile)
└── setOperatingMode(mode)
└── Edit (object)
└── Simulation (object) # mode: 2
├── createPayloadList(imagePaths)
├── processImageSequence(imageSequence, forceTrigger)
└── Edit (object) # mode: 1
├── @property
│ └── application
├── stopEditingApplication()
Expand Down Expand Up @@ -308,3 +313,4 @@ You can run the tests for the PCIC and RPC modules separately with following com
$ python -m unittest tests/test_imager.py -vvv
$ python -m unittest tests/test_imageQualityCheck.py -vvv
$ python -m unittest tests/test_device.py -vvv
$ python -m unittest tests/test_simulation.py -vvv
203 changes: 203 additions & 0 deletions examples/O2X5xx/simulation_runner_FTP.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""
simulation_ftp_images.py
------------------------
This script replays batches of images that were previously saved to a local FTP folder
by an ifm O2x5xx sensor (FTP-push feature) and feeds them back into the sensor's
simulation mode.

Workflow overview:
1. Scan a local FTP folder for JPEG images pushed by the sensor.
2. Group the images into "batches" based on the encoded filename information
(timestamp, sensor name, application name, image index, and expected PASS/FAIL result).
3. Connect to the sensor via RPC (configuration) and PCIC (result output).
4. Switch the sensor to simulation mode.
5. For every batch, send the ordered image sequence to the sensor and read back the
application result.
6. Compare the application result against the PASS/FAIL label that was stored in the
filename and print a warning if the two values do not match.

Expected filename format (set in the sensor's FTP-push configuration):
<YYYYMMDD>_<HHMMSS>_<ms>_<SensorName>_<ApplicationName>_<ImageIndex>_<PASS|FAIL>.jpg
Example: 20000101_002456_106_Demo Sensor_Advanced application_01_PASS.jpg
"""

import os.path
import sys
from pathlib import Path
import glob
import json

# ---------------------------------------------------------------------------
# Import the o2x5xx library.
# When the package is installed the top-level imports are
# used. When running the script directly from the repository the source folder
# is added to sys.path so the local source code is used instead.
# ---------------------------------------------------------------------------
try:
from o2x5xx import O2x5xxRPCDevice
from o2x5xx import O2x5xxPCICDevice
if not hasattr(sys.modules[__name__], "Simulation"):
raise ModuleNotFoundError
except (ModuleNotFoundError, ImportError):
sys.path.append(str(Path(__file__).resolve().parents[2]))
from source.rpc.client import O2x5xxRPCDevice
from source.pcic.client import O2x5xxPCICDevice


def check_or_make_directory(path) -> None:
"""Create *path* (including any missing parent directories) if it does not exist yet."""
if not os.path.exists(path):
os.makedirs(path)


def generate_dataset_from_ftp_push(ftp_folder_path, read_res_files=True) -> dict:
"""Scan *ftp_folder_path* for JPEG images and group them into batches.

The sensor's FTP-push feature saves one JPEG file per image per trigger
cycle. All images that belong to the same trigger cycle share the same
timestamp, sensor name, and application name – they are grouped together
as a *batch*.

Parameters
----------
ftp_folder_path : str
Path to the local folder that contains the JPEG files pushed by the
sensor.
read_res_files : bool, optional
If True, reads and includes .res files for each batch. Default is True.

Returns
-------
dict
A dictionary whose keys are unique batch identifiers of the form
``"<timestamp>_<SensorName>_<ApplicationName>"`` and whose values are
dictionaries with the following keys:

* ``"timestamp"`` – recording timestamp encoded in the filename
(e.g. ``"20000101_002456_106"``).
* ``"sensor_name"`` – name of the sensor that captured the images.
* ``"application_name"`` – name of the active application at capture
time.
* ``"images"`` – list of dicts, each containing ``"image"`` (full file
path) and ``"image_number"`` (index within the batch, e.g. ``"01"``).
* ``"batch_pass_fail_result"`` – expected result encoded in the
filename (``"PASS"`` or ``"FAIL"``).
* ``"res_data"`` – result data loaded from the corresponding .res file.
"""
# Collect all JPEG files in the given folder.
all_jpg_images = glob.glob(r"{folder}\*.jpg".format(folder=ftp_folder_path))
dataset = {}

for image in all_jpg_images:
# Extract metadata that the sensor encoded in the filename.
filename = os.path.basename(image)
batch_timestamp = filename.split("_")[0] + "_" + filename.split("_")[1] + "_" + filename.split("_")[2] # e.g. 20000101_002456_106
batch_sensor_name = filename.split("_")[3] # e.g. "Demo Sensor"
batch_application_name = filename.split("_")[4] # e.g. "Advanced application"
batch_image_number = filename.split("_")[5] # e.g. "01", "02", …
batch_pass_fail_result = filename.split("_")[6].split(".")[0] # e.g. "PASS" or "FAIL"

# Build the unique key that groups images belonging to the same trigger cycle.
batch_identifier = batch_timestamp + "_" + batch_sensor_name + "_" + batch_application_name

# Check if a res file for the filename exists - if not skip reading the res file and image.
# The res file looks like following: 20000101_002448_150_Demo Sensor_Advanced application_PASS.res
# Compared to an image file: 20000101_002448_150_Demo Sensor_Advanced application_01_PASS.jpg
res_filename = "_".join(filename.split("_")[:-2]) + "_" + batch_pass_fail_result + ".res"
res_file = os.path.join(ftp_folder_path, res_filename)

if read_res_files:
if not os.path.isfile(res_file):
print("WARNING: Missing .res file for image {}, skipping this image.".format(image))
continue
else:
with open(res_file, "r") as f:
res_data = json.load(f)


# Create the batch entry the first time this identifier is encountered.
if batch_identifier not in dataset:
dataset[batch_identifier] = {
"timestamp": batch_timestamp,
"sensor_name": batch_sensor_name,
"application_name": batch_application_name,
"images": [],
"batch_pass_fail_result": batch_pass_fail_result,
"res_data": res_data
}

# Append this image to the batch so we can later replay the whole sequence.
dataset[batch_identifier]["images"].append({"image": image, "image_number": batch_image_number})

return dataset


if __name__ == "__main__":

# ---------------------------------------------------------------------------
# Step 1 – Build the dataset from the FTP folder.
# ---------------------------------------------------------------------------
FTP_FOLDER_PATH = r"C:\ftp\test\FTPUser" # Enter your FTP folder path here (the folder that contains the JPEG files pushed by the sensor).
SENSOR_IP_ADDRESS = "192.168.0.69" # Enter your sensor's IP address here. Make sure to use the correct IP address if you have multiple sensors in your network.
SENSOR_PORT = 50010 # Enter the PCIC port configured on your sensor for outputting results (default is 50010).
my_dataset = generate_dataset_from_ftp_push(ftp_folder_path=FTP_FOLDER_PATH)

# Print total number of batches found in the dataset.
print("Total number of batches in dataset: {}".format(len(my_dataset)))

# ---------------------------------------------------------------------------
# Step 2 – Connect to the sensor and replay each batch in simulation mode.
# ---------------------------------------------------------------------------
# Open a PCIC connection (port 50010) to receive application results and an
# RPC connection (port 80) to control the sensor configuration.
with O2x5xxPCICDevice(address=SENSOR_IP_ADDRESS, port=SENSOR_PORT) as pcic_device:
with O2x5xxRPCDevice(address=SENSOR_IP_ADDRESS) as rpc:
# Request an exclusive session so we can change the operating mode.
with rpc.mainProxy.requestSession():
# Switch to operating mode 2 (simulation mode) – this allows
# sending arbitrary image sequences to the sensor's vision pipeline
# instead of relying on the live camera image.
with rpc.sessionProxy.setOperatingMode(mode=2):
# Obtain the simulation proxy that provides the FTP-replay API.
simulation = rpc.simulation

for batch_number, (batch_identifier, batch_data) in enumerate(my_dataset.items(), start=1):
# Sort the images by their index so they are fed into the
# pipeline in the correct order (01, 02, 03, …).
image_paths_ordered = [
image_info["image"]
for image_info in sorted(batch_data["images"], key=lambda x: x["image_number"])
]

# Build the binary payload list expected by the simulation API.
payloadList = simulation.createPayloadList(image_paths_ordered)

# Send the image sequence to the sensor and trigger processing.
# forceTrigger=True ensures the pipeline runs even if no
# external trigger signal is present.
simulation.processImageSequence(imageSequence=payloadList, forceTrigger=True)

# Read the application result returned by the sensor via PCIC.
# The sensor is configured to output results in the format:
# "<BatchIdentifier>;<Result>"
# where <Result> is "1" for PASS and "0" for FAIL.
ticket, answer = pcic_device.read_next_answer()

# TODO: Enter your index here for your own PASS/FAIL result from the PCIC interface.
application_result = answer.decode().split(";")[1].strip() # raw value: "1" or "0"

# Convert the numeric result to a human-readable label.
application_result = "PASS" if application_result == "1" else "FAIL" if application_result == "0" else application_result

print("Processed batch {} of {}: {} with result: {}".format(
batch_number, len(my_dataset), batch_identifier, answer.decode()
))

# Verify that the result matches the PASS/FAIL label that was
# encoded in the original filename. A mismatch indicates that
# the sensor's detection logic produced a different outcome when
# replaying the image compared to the live capture.
if application_result != batch_data["batch_pass_fail_result"]:
print("WARNING: Result mismatch for batch {}: expected {}, got {}".format(
batch_identifier, batch_data["batch_pass_fail_result"], application_result
))
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def read_requires():

setup(
name='o2x5xx',
version='0.4.3-pre',
version='0.5.0-pre',
description='A Python library for ifm O2x5xx (O2D5xx / O2I5xx) devices',
author='Michael Gann',
author_email='support.efector.object-ident@ifm.com',
Expand Down
50 changes: 42 additions & 8 deletions source/rpc/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .proxy import MainProxy, SessionProxy, EditProxy, ApplicationProxy, ImagerProxy
from typing import LiteralString
from .proxy import MainProxy, SessionProxy, EditProxy, ApplicationProxy, ImagerProxy, SimulationProxy
from .session import Session
from .simulation import Simulation
from .edit import Edit
from .application import Application
from .imager import Imager
Expand Down Expand Up @@ -38,13 +40,14 @@ def __exit__(self, exc_type, exc_val, exc_tb):
@property
def sessionProxy(self) -> SessionProxy:
return getattr(self, "_sessionProxy")

@property
def simulationProxy(self) -> SimulationProxy:
return getattr(self, "_simulationProxy")

@property
def editProxy(self) -> [EditProxy, None]:
try:
return getattr(self, "_editProxy")
except AttributeError:
return None
def editProxy(self) -> EditProxy:
return getattr(self, "_editProxy")

@property
def applicationProxy(self) -> ApplicationProxy:
Expand All @@ -58,25 +61,56 @@ def imagerProxy(self) -> ImagerProxy:
def session(self) -> Session:
if self.sessionProxy:
return Session(sessionProxy=self.sessionProxy, device=self)
else:
raise AttributeError("No sessionProxy available! Please first create a sessionProxy "
f"Checkout the parametrization structure here: \n {self._getParametrizationStructure()}.")

@property
def simulation(self) -> Simulation:
if self.simulationProxy:
return Simulation(simulationProxy=self.simulationProxy, device=self)
else:
raise AttributeError("No simulationProxy available! Please first create a simulationProxy "
f"Checkout the parametrization structure here: \n {self._getParametrizationStructure()}.")

@property
def edit(self) -> Edit:
if self.editProxy:
return Edit(editProxy=self.editProxy, device=self)
else:
raise AttributeError("No editProxy available! Please first create an editProxy "
"with method self.device.session.requestOperatingMode(Mode=1) before using Edit!")
f"Checkout the parametrization structure here: \n {self._getParametrizationStructure()}.")

@property
def application(self) -> Application:
if self.applicationProxy:
return Application(applicationProxy=self.applicationProxy, device=self)
else:
raise AttributeError("No applicationProxy available! Please first create an applicationProxy "
f"Checkout the parametrization structure here: \n {self._getParametrizationStructure()}.")

@property
def imager(self) -> Imager:
if self.imagerProxy:
return Imager(imagerProxy=self.imagerProxy, device=self)

else:
raise AttributeError("No imagerProxy available! Please first create an imagerProxy "
f"Checkout the parametrization structure here: \n {self._getParametrizationStructure()}.")

def _getParametrizationStructure(self) -> LiteralString:
structure = """
# Main API (object) (O2x5xxRPCDevice)
# └── requestSession(password, sessionID)
# └── Session (object)
# └── setOperatingMode(mode)
# └── Edit (object)
# └── editApplication(applicationIndex)
# └── Application (object)
# └── editImage(imageIndex)
# └── Imager (object)
"""
return structure

def _getDeviceMeta(self):
_deviceType = self.getParameter(value="DeviceType")
result = DevicesMeta.getData(key="DeviceType", value=_deviceType)
Expand Down
33 changes: 27 additions & 6 deletions source/rpc/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,32 @@ def setOperatingMode(self, mode, timeout=SOCKET_TIMEOUT):

Args:
mode (int): operating mode
0: run mode, 1: edit mode, 2: simulation mode
timeout (float): Timeout values which is valid for the EditProxy.
Argument can be a non-negative floating point number expressing seconds, or None.
If None, SOCKET_TIMEOUT value is used as default
"""
try:
self.proxy.setOperatingMode(mode)
self.device._editURL = self.baseURL + 'edit/'
self.device._editProxy = EditProxy(url=self.device._editURL, device=self.device, timeout=timeout)
yield
if mode == 1:
self.device._editURL = self.baseURL + 'edit/'
self.device._editProxy = EditProxy(url=self.device._editURL, device=self.device, timeout=timeout)
yield
if mode == 2:
self.device._simulationURL = self.baseURL + 'simulation/'
self.device._simulationProxy = SimulationProxy(url=self.device._simulationURL, device=self.device,
timeout=timeout)
yield
finally:
self.proxy.setOperatingMode(0)
self.device._editProxy.close()
self.device._editURL = None
self.device._editProxy = None
if mode == 1:
self.device._editProxy.close()
self.device._editURL = None
self.device._editProxy = None
if mode == 2:
self.device._simulationProxy.close()
self.device._simulationURL = None
self.device._simulationProxy = None


class EditProxy(BaseProxy):
Expand Down Expand Up @@ -240,3 +252,12 @@ def __init__(self, url, device, timeout=SOCKET_TIMEOUT):
self.device = device

super().__init__(url, device, timeout)

class SimulationProxy(BaseProxy):
"""Proxy representing simulation mode."""

def __init__(self, url, device, timeout=SOCKET_TIMEOUT):
self.baseURL = url
self.device = device

super().__init__(url, device, timeout)
Loading
Loading