From 27c8457382959b6b294857aa5ac2e80925f558c2 Mon Sep 17 00:00:00 2001 From: esto Date: Tue, 28 Apr 2026 12:41:09 +0200 Subject: [PATCH 1/7] feat(scan_task): add pre-capture pause to reduce vibration impact - Introduced `pause_before_capture_ms` setting to allow configurable delays before photo capture. - Enhanced scan logic to detect cancellation before and during the pause. - Updated schema to include the new pause setting with validation. --- openscan_firmware/config/scan.py | 5 +++ .../services/tasks/core/scan_task.py | 16 +++++++++ scripts/openapi/openapi_latest.json | 35 +++++++++++-------- scripts/openapi/openapi_next.json | 35 +++++++++++-------- scripts/openapi/openapi_v0.8.json | 35 +++++++++++-------- scripts/openapi/openapi_v0.9.json | 35 +++++++++++-------- 6 files changed, 105 insertions(+), 56 deletions(-) diff --git a/openscan_firmware/config/scan.py b/openscan_firmware/config/scan.py index 1f6a6e9..458c044 100644 --- a/openscan_firmware/config/scan.py +++ b/openscan_firmware/config/scan.py @@ -41,6 +41,11 @@ class ScanSetting(BaseModel): focus_stacks: int = Field(1, ge=1, le=99, description="Number of photos with different focus per position." "This ignores AF and you need to set a focus range." "Focus values will then be evenly spaced between min and max.") + pause_before_capture_ms: int = Field( + 0, + ge=0, + description="Pause in milliseconds before capture to let vibrations settle.", + ) focus_range: Tuple[ confloat(ge=0.0, le=15.0), confloat(ge=0.0, le=15.0)] = Field(default=(10.0, 15.0), diff --git a/openscan_firmware/controllers/services/tasks/core/scan_task.py b/openscan_firmware/controllers/services/tasks/core/scan_task.py index ee1eb9b..b3acabb 100644 --- a/openscan_firmware/controllers/services/tasks/core/scan_task.py +++ b/openscan_firmware/controllers/services/tasks/core/scan_task.py @@ -435,6 +435,10 @@ async def _capture_photos_at_position(self, current_point: PolarPoint3D, index: """ try: logger.debug("Capturing photo at position %s", current_point) + await self._wait_before_capture() + if self.is_cancelled(): + logger.info("Cancellation detected before capture at position %s.", index) + return if not self._ctx.focus_context or not self._ctx.focus_context["enabled"]: # Single photo capture @@ -492,6 +496,18 @@ async def _capture_photos_at_position(self, current_point: PolarPoint3D, index: logger.error("Error taking photo at position %s: %s", index, e, exc_info=True) raise + async def _wait_before_capture(self) -> None: + """Pause before capture to allow motor-induced vibrations to settle.""" + delay_ms = int(self._ctx.scan.settings.pause_before_capture_ms or 0) + if delay_ms <= 0: + return + + logger.debug("Waiting %d ms before capture", delay_ms) + await self.wait_for_pause() + if self.is_cancelled(): + return + await asyncio.sleep(delay_ms / 1000) + async def _cleanup_scan(self) -> None: """Cleanup after scan completion or failure and reset focus settings if needed.""" # Lazy import to avoid hardware side effects on module import diff --git a/scripts/openapi/openapi_latest.json b/scripts/openapi/openapi_latest.json index 57f5966..db1711d 100644 --- a/scripts/openapi/openapi_latest.json +++ b/scripts/openapi/openapi_latest.json @@ -1933,22 +1933,22 @@ "type": "integer", "title": "Scan Index" } + }, + { + "name": "photo_filenames", + "in": "query", + "required": true, + "schema": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Relative photo paths to delete.", + "title": "Photo Filenames" + }, + "description": "Relative photo paths to delete." } ], - "requestBody": { - "required": true, - "content": { - "application/json": { - "schema": { - "type": "array", - "items": { - "type": "string" - }, - "title": "Photo Filenames" - } - } - } - }, "responses": { "200": { "description": "Successful Response", @@ -6072,6 +6072,13 @@ "description": "Number of photos with different focus per position.This ignores AF and you need to set a focus range.Focus values will then be evenly spaced between min and max.", "default": 1 }, + "pause_before_capture_ms": { + "type": "integer", + "minimum": 0.0, + "title": "Pause Before Capture Ms", + "description": "Pause in milliseconds before capture to let vibrations settle.", + "default": 0 + }, "focus_range": { "prefixItems": [ { diff --git a/scripts/openapi/openapi_next.json b/scripts/openapi/openapi_next.json index 27c0ec8..4f70065 100644 --- a/scripts/openapi/openapi_next.json +++ b/scripts/openapi/openapi_next.json @@ -1934,22 +1934,22 @@ "type": "integer", "title": "Scan Index" } + }, + { + "name": "photo_filenames", + "in": "query", + "required": true, + "schema": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Relative photo paths to delete.", + "title": "Photo Filenames" + }, + "description": "Relative photo paths to delete." } ], - "requestBody": { - "required": true, - "content": { - "application/json": { - "schema": { - "type": "array", - "items": { - "type": "string" - }, - "title": "Photo Filenames" - } - } - } - }, "responses": { "200": { "description": "Successful Response", @@ -6862,6 +6862,13 @@ "description": "Number of photos with different focus per position.This ignores AF and you need to set a focus range.Focus values will then be evenly spaced between min and max.", "default": 1 }, + "pause_before_capture_ms": { + "type": "integer", + "minimum": 0.0, + "title": "Pause Before Capture Ms", + "description": "Pause in milliseconds before capture to let vibrations settle.", + "default": 0 + }, "focus_range": { "prefixItems": [ { diff --git a/scripts/openapi/openapi_v0.8.json b/scripts/openapi/openapi_v0.8.json index a19bdab..386e214 100644 --- a/scripts/openapi/openapi_v0.8.json +++ b/scripts/openapi/openapi_v0.8.json @@ -1731,22 +1731,22 @@ "type": "integer", "title": "Scan Index" } + }, + { + "name": "photo_filenames", + "in": "query", + "required": true, + "schema": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Relative photo paths to delete.", + "title": "Photo Filenames" + }, + "description": "Relative photo paths to delete." } ], - "requestBody": { - "required": true, - "content": { - "application/json": { - "schema": { - "type": "array", - "items": { - "type": "string" - }, - "title": "Photo Filenames" - } - } - } - }, "responses": { "200": { "description": "Successful Response", @@ -5623,6 +5623,13 @@ "description": "Number of photos with different focus per position.This ignores AF and you need to set a focus range.Focus values will then be evenly spaced between min and max.", "default": 1 }, + "pause_before_capture_ms": { + "type": "integer", + "minimum": 0.0, + "title": "Pause Before Capture Ms", + "description": "Pause in milliseconds before capture to let vibrations settle.", + "default": 0 + }, "focus_range": { "prefixItems": [ { diff --git a/scripts/openapi/openapi_v0.9.json b/scripts/openapi/openapi_v0.9.json index 57f5966..db1711d 100644 --- a/scripts/openapi/openapi_v0.9.json +++ b/scripts/openapi/openapi_v0.9.json @@ -1933,22 +1933,22 @@ "type": "integer", "title": "Scan Index" } + }, + { + "name": "photo_filenames", + "in": "query", + "required": true, + "schema": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Relative photo paths to delete.", + "title": "Photo Filenames" + }, + "description": "Relative photo paths to delete." } ], - "requestBody": { - "required": true, - "content": { - "application/json": { - "schema": { - "type": "array", - "items": { - "type": "string" - }, - "title": "Photo Filenames" - } - } - } - }, "responses": { "200": { "description": "Successful Response", @@ -6072,6 +6072,13 @@ "description": "Number of photos with different focus per position.This ignores AF and you need to set a focus range.Focus values will then be evenly spaced between min and max.", "default": 1 }, + "pause_before_capture_ms": { + "type": "integer", + "minimum": 0.0, + "title": "Pause Before Capture Ms", + "description": "Pause in milliseconds before capture to let vibrations settle.", + "default": 0 + }, "focus_range": { "prefixItems": [ { From 0abdd1735506d7f65cfe49f8da4a8fc34493f567 Mon Sep 17 00:00:00 2001 From: esto Date: Wed, 29 Apr 2026 12:56:22 +0200 Subject: [PATCH 2/7] refactor(scan_config): replace confloat with `Annotated` for focus range validation - Updated `focus_range` type in `ScanSetting` to use `Annotated` for clearer and more reusable validation constraints. - Replaced `confloat` with the `FocusValue` type alias for focus range fields. --- openscan_firmware/config/scan.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/openscan_firmware/config/scan.py b/openscan_firmware/config/scan.py index 458c044..0b4db58 100644 --- a/openscan_firmware/config/scan.py +++ b/openscan_firmware/config/scan.py @@ -1,9 +1,12 @@ -from pydantic import BaseModel, Field, SerializerFunctionWrapHandler, confloat, model_serializer -from typing import Tuple, Literal +from pydantic import BaseModel, Field, SerializerFunctionWrapHandler, model_serializer +from typing import Annotated, Literal from openscan_firmware.models.paths import PathMethod +FocusValue = Annotated[float, Field(ge=0.0, le=15.0)] + + class ScanSetting(BaseModel): path_method: PathMethod = Field( default=PathMethod.FIBONACCI, @@ -46,10 +49,10 @@ class ScanSetting(BaseModel): ge=0, description="Pause in milliseconds before capture to let vibrations settle.", ) - focus_range: Tuple[ - confloat(ge=0.0, le=15.0), - confloat(ge=0.0, le=15.0)] = Field(default=(10.0, 15.0), - description="Minimum and maximum focus distance in diopters.") + focus_range: tuple[FocusValue, FocusValue] = Field( + default=(10.0, 15.0), + description="Minimum and maximum focus distance in diopters.", + ) @property def focus_positions(self) -> list[float]: From 72f737695bdf33f43881cc7016ea9733e52f31a9 Mon Sep 17 00:00:00 2001 From: esto Date: Wed, 29 Apr 2026 13:02:47 +0200 Subject: [PATCH 3/7] test(scan_task): add tests for pre-capture pause and cancellation scenarios - Added tests for `pause_before_capture_ms` behavior, including handling of delay, cancellation, and resume scenarios. - Improved test coverage for photo capture logic during cancellation. - Verified default pause setting for legacy scan configurations. --- tests/config/test_scan_config.py | 18 +++ tests/controllers/services/test_scan_task.py | 118 +++++++++++++++++++ 2 files changed, 136 insertions(+) diff --git a/tests/config/test_scan_config.py b/tests/config/test_scan_config.py index e01dad0..90999ad 100644 --- a/tests/config/test_scan_config.py +++ b/tests/config/test_scan_config.py @@ -11,6 +11,24 @@ def test_scan_settings_omit_unset_phi_fields_from_json_dump() -> None: assert "max_phi" not in payload +def test_scan_settings_default_pause_before_capture_ms_for_legacy_payload() -> None: + settings = ScanSetting.model_validate( + { + "path_method": "fibonacci", + "points": 10, + "min_theta": 0.0, + "max_theta": 170.0, + "optimize_path": True, + "optimization_algorithm": "nearest_neighbor", + "focus_stacks": 1, + "focus_range": [10.0, 15.0], + "image_format": "jpeg", + } + ) + + assert settings.pause_before_capture_ms == 0 + + def test_external_trigger_run_settings_omit_unset_phi_fields_from_json_dump() -> None: settings = ExternalTriggerRunSettings(trigger_name="external-camera") diff --git a/tests/controllers/services/test_scan_task.py b/tests/controllers/services/test_scan_task.py index 62a4b7a..54c5ae7 100644 --- a/tests/controllers/services/test_scan_task.py +++ b/tests/controllers/services/test_scan_task.py @@ -259,6 +259,124 @@ def test_generate_scan_path_passes_optional_phi_constraints_when_set() -> None: } assert list(path_dict.keys()) == [PolarPoint3D(theta=10.0, fi=20.0, r=250.0)] + +@pytest.mark.asyncio +async def test_wait_before_capture_uses_configured_delay(sample_scan_model: Scan): + scan = sample_scan_model.model_copy(deep=True) + scan.settings.pause_before_capture_ms = 250 + + task_model = Task(name="scan_task", task_type="core") + scan_task = ScanTask(task_model) + scan_task._ctx = ScanRuntime( + scan=scan, + camera_controller=MagicMock(), + project_manager=MagicMock(), + path_dict={}, + focus_context=None, + ) + scan_task.wait_for_pause = AsyncMock(return_value=None) + + with patch( + "openscan_firmware.controllers.services.tasks.core.scan_task.asyncio.sleep", + new_callable=AsyncMock, + ) as sleep_mock: + await scan_task._wait_before_capture() + + scan_task.wait_for_pause.assert_awaited_once() + sleep_mock.assert_awaited_once_with(0.25) + + +@pytest.mark.asyncio +async def test_wait_before_capture_skips_sleep_when_cancelled_after_pause(sample_scan_model: Scan): + scan = sample_scan_model.model_copy(deep=True) + scan.settings.pause_before_capture_ms = 250 + + task_model = Task(name="scan_task", task_type="core") + scan_task = ScanTask(task_model) + scan_task._ctx = ScanRuntime( + scan=scan, + camera_controller=MagicMock(), + project_manager=MagicMock(), + path_dict={}, + focus_context=None, + ) + + async def cancel_during_pause_wait() -> None: + scan_task.cancel() + + scan_task.wait_for_pause = AsyncMock(side_effect=cancel_during_pause_wait) + + with patch( + "openscan_firmware.controllers.services.tasks.core.scan_task.asyncio.sleep", + new_callable=AsyncMock, + ) as sleep_mock: + await scan_task._wait_before_capture() + + scan_task.wait_for_pause.assert_awaited_once() + sleep_mock.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_wait_before_capture_blocks_while_paused_and_resumes(sample_scan_model: Scan): + scan = sample_scan_model.model_copy(deep=True) + scan.settings.pause_before_capture_ms = 1 + + task_model = Task(name="scan_task", task_type="core") + scan_task = ScanTask(task_model) + scan_task._ctx = ScanRuntime( + scan=scan, + camera_controller=MagicMock(), + project_manager=MagicMock(), + path_dict={}, + focus_context=None, + ) + + scan_task.pause() + wait_task = asyncio.create_task(scan_task._wait_before_capture()) + await asyncio.sleep(0.01) + + assert not wait_task.done() + + scan_task.resume() + await asyncio.wait_for(wait_task, timeout=1.0) + + +@pytest.mark.asyncio +async def test_capture_skips_photo_when_cancelled_before_capture_delay(sample_scan_model: Scan, fake_photo_data: PhotoData): + scan = sample_scan_model.model_copy(deep=True) + scan.settings.pause_before_capture_ms = 250 + + camera_controller = MagicMock() + camera_controller.photo = MagicMock(return_value=fake_photo_data) + + project_manager = MagicMock() + project_manager.add_photo_async = AsyncMock(return_value=None) + + task_model = Task(name="scan_task", task_type="core") + scan_task = ScanTask(task_model) + scan_task._ctx = ScanRuntime( + scan=scan, + camera_controller=camera_controller, + project_manager=project_manager, + path_dict={}, + focus_context=None, + ) + + async def cancel_during_pause_wait() -> None: + scan_task.cancel() + + scan_task.wait_for_pause = AsyncMock(side_effect=cancel_during_pause_wait) + + with patch( + "openscan_firmware.controllers.services.tasks.core.scan_task.asyncio.sleep", + new_callable=AsyncMock, + ) as sleep_mock: + await scan_task._capture_photos_at_position(PolarPoint3D(theta=0, fi=0), 0) + + sleep_mock.assert_not_awaited() + camera_controller.photo.assert_not_called() + project_manager.add_photo_async.assert_not_called() + @pytest.mark.asyncio @patch('openscan_firmware.controllers.hardware.cameras.camera.get_camera_controller') @patch('openscan_firmware.controllers.services.tasks.core.scan_task.get_project_manager') From 7d7b8908c6d565aaa4832a88631451ab9bc49133 Mon Sep 17 00:00:00 2001 From: esto Date: Wed, 29 Apr 2026 13:59:48 +0200 Subject: [PATCH 4/7] feat(tests): add unit tests for constrained path generation and default phi constraints - Added new tests for `get_constrained_path` to validate behavior for fixed theta and phi constraints, as well as fully fixed positions. - Updated existing tests to reflect default `min_phi` and `max_phi` values. - Adjusted validation logic to handle equal theta/phi bounds and default values. - Enhanced path generation to account for fully constrained positions and edge cases. --- openscan_firmware/config/scan.py | 4 +- openscan_firmware/utils/paths/paths.py | 15 +++--- tests/config/test_scan_config.py | 6 +-- tests/controllers/services/test_scan_task.py | 27 +++++++++- tests/utils/test_paths.py | 57 ++++++++++++++++++++ 5 files changed, 97 insertions(+), 12 deletions(-) create mode 100644 tests/utils/test_paths.py diff --git a/openscan_firmware/config/scan.py b/openscan_firmware/config/scan.py index 1f6a6e9..4c5433e 100644 --- a/openscan_firmware/config/scan.py +++ b/openscan_firmware/config/scan.py @@ -22,13 +22,13 @@ class ScanSetting(BaseModel): max_theta: float = Field(125.0, ge=0.0, le=180.0, description="Maximum theta angle in degrees for constrained paths.") min_phi: float | None = Field( - default=None, + default=0, ge=0.0, le=360.0, description="Optional minimum phi angle in degrees for constrained paths.", ) max_phi: float | None = Field( - default=None, + default=360.0, ge=0.0, le=360.0, description="Optional maximum phi angle in degrees for constrained paths.", diff --git a/openscan_firmware/utils/paths/paths.py b/openscan_firmware/utils/paths/paths.py index e12a4a9..bf71275 100644 --- a/openscan_firmware/utils/paths/paths.py +++ b/openscan_firmware/utils/paths/paths.py @@ -116,15 +116,12 @@ def get_constrained_path( if min_theta < 0 or max_theta > 180: logger.error("Theta angle must be between 0° and 180°") raise ValueError("Theta angle must be between 0° and 180°") - if min_theta >= max_theta: - logger.error("Minimum theta angle must be less than maximum theta angle") - raise ValueError("Minimum theta angle must be less than maximum theta angle") + if min_theta > max_theta: + logger.error("Minimum theta angle must be less than or equal to maximum theta angle") + raise ValueError("Minimum theta angle must be less than or equal to maximum theta angle") if min_phi < 0 or min_phi > 360 or max_phi < 0 or max_phi > 360: logger.error("Phi angle must be between 0° and 360°") raise ValueError("Phi angle must be between 0° and 360°") - if min_phi == max_phi: - logger.error("Minimum phi angle must not be equal to maximum phi angle") - raise ValueError("Minimum phi angle must not be equal to maximum phi angle") if method == PathMethod.FIBONACCI: return _generate_constrained_fibonacci( @@ -141,6 +138,9 @@ def get_constrained_path( def _phi_span(min_phi: float, max_phi: float) -> float: """Return the positive span of a phi interval, supporting wrap-around at 360°.""" + if min_phi == max_phi: + return 0.0 + span = (max_phi - min_phi) % 360 return 360 if span == 0 else span @@ -171,6 +171,9 @@ def _generate_constrained_fibonacci( min_phi, max_phi, ) + if min_theta == max_theta and min_phi == max_phi: + return [PolarPoint3D(theta=min_theta, fi=min_phi % 360, r=1.0)] + # Convert theta constraints to Z constraints # theta = arccos(z), so z = cos(theta) # Note: theta increases as z decreases diff --git a/tests/config/test_scan_config.py b/tests/config/test_scan_config.py index e01dad0..a77bd14 100644 --- a/tests/config/test_scan_config.py +++ b/tests/config/test_scan_config.py @@ -2,13 +2,13 @@ from openscan_firmware.config.scan import ScanSetting -def test_scan_settings_omit_unset_phi_fields_from_json_dump() -> None: +def test_scan_settings_include_default_phi_fields_in_json_dump() -> None: settings = ScanSetting() payload = settings.model_dump(mode="json") - assert "min_phi" not in payload - assert "max_phi" not in payload + assert payload["min_phi"] == 0 + assert payload["max_phi"] == 360.0 def test_external_trigger_run_settings_omit_unset_phi_fields_from_json_dump() -> None: diff --git a/tests/controllers/services/test_scan_task.py b/tests/controllers/services/test_scan_task.py index 62a4b7a..e2bbf80 100644 --- a/tests/controllers/services/test_scan_task.py +++ b/tests/controllers/services/test_scan_task.py @@ -196,7 +196,7 @@ async def delayed_add_photo(*args, **kwargs): ) -def test_generate_scan_path_omits_optional_phi_constraints_when_unset() -> None: +def test_generate_scan_path_passes_default_phi_constraints() -> None: scan_settings = ScanSetting( path_method=PathMethod.FIBONACCI, points=10, @@ -222,6 +222,8 @@ def test_generate_scan_path_omits_optional_phi_constraints_when_unset() -> None: "num_points": 10, "min_theta": 10.0, "max_theta": 120.0, + "min_phi": 0, + "max_phi": 360.0, } assert list(path_dict.keys()) == [PolarPoint3D(theta=10.0, fi=20.0, r=250.0)] @@ -709,6 +711,29 @@ async def test_focus_stacking_uses_configured_image_format( assert awaited_call.args == ("rgb_array",) +def test_generate_scan_path_fully_fixed_position_has_single_zero_index_step() -> None: + scan_settings = ScanSetting( + path_method=PathMethod.FIBONACCI, + points=130, + min_theta=45.0, + max_theta=45.0, + min_phi=90.0, + max_phi=90.0, + optimize_path=False, + focus_stacks=1, + focus_range=(10.0, 15.0), + image_format="jpeg", + ) + + with patch( + "openscan_firmware.controllers.services.tasks.core.scan_task._get_scan_radius_mm", + return_value=250.0, + ): + path_dict = generate_scan_path(scan_settings) + + assert path_dict == {PolarPoint3D(theta=45.0, fi=90.0, r=250.0): 0} + + class TestScanTaskIntegration: """Integration tests for ScanTask persistence behavior with real ProjectManager.""" diff --git a/tests/utils/test_paths.py b/tests/utils/test_paths.py new file mode 100644 index 0000000..f7b8faf --- /dev/null +++ b/tests/utils/test_paths.py @@ -0,0 +1,57 @@ +import pytest + +from openscan_firmware.models.paths import PathMethod, PolarPoint3D +from openscan_firmware.utils.paths.paths import get_constrained_path + + +def test_constrained_path_allows_fixed_theta() -> None: + path = get_constrained_path( + method=PathMethod.FIBONACCI, + num_points=5, + min_theta=45.0, + max_theta=45.0, + min_phi=0.0, + max_phi=180.0, + ) + + assert len(path) == 5 + assert {point.theta for point in path} == {45.0} + assert len({point.fi for point in path}) > 1 + + +def test_constrained_path_allows_fixed_phi() -> None: + path = get_constrained_path( + method=PathMethod.FIBONACCI, + num_points=5, + min_theta=10.0, + max_theta=120.0, + min_phi=90.0, + max_phi=90.0, + ) + + assert len(path) == 5 + assert {point.fi for point in path} == {90.0} + assert len({point.theta for point in path}) > 1 + + +def test_constrained_path_collapses_fully_fixed_position_to_one_point() -> None: + path = get_constrained_path( + method=PathMethod.FIBONACCI, + num_points=130, + min_theta=45.0, + max_theta=45.0, + min_phi=90.0, + max_phi=90.0, + ) + + assert path == [PolarPoint3D(theta=45.0, fi=90.0, r=1.0)] + + +def test_constrained_path_still_rejects_reversed_theta_range() -> None: + with pytest.raises(ValueError, match="less than or equal"): + get_constrained_path( + method=PathMethod.FIBONACCI, + num_points=5, + min_theta=120.0, + max_theta=10.0, + ) From 60ab87f7e5ed27a17ba610f47ebac809db4318fd Mon Sep 17 00:00:00 2001 From: esto Date: Wed, 29 Apr 2026 15:27:20 +0200 Subject: [PATCH 5/7] feat(tests): expand unit test coverage for GPIO, motor, and scan task handling - Added comprehensive tests for GPIO auto-initialization, error cases, and router patch handling. - Introduced new motor tests for movement execution, direction handling, and stop behavior. - Restored and updated scan task path generation tests to validate phi constraints and default values. - Enhanced mocking strategies to simplify setup for hardware and executor dependencies. --- .../picamera2/test_picamera2_focus_unit.py | 2 + tests/controllers/hardware/test_gpio.py | 52 +++++ tests/controllers/hardware/test_motor.py | 121 ++++++++++- tests/controllers/services/test_scan_task.py | 203 +++++++++--------- tests/routers/test_next_gpio_router.py | 46 ++++ 5 files changed, 313 insertions(+), 111 deletions(-) create mode 100644 tests/controllers/hardware/test_gpio.py create mode 100644 tests/routers/test_next_gpio_router.py diff --git a/tests/controllers/hardware/picamera2/test_picamera2_focus_unit.py b/tests/controllers/hardware/picamera2/test_picamera2_focus_unit.py index bc58c2a..8bbdfa6 100644 --- a/tests/controllers/hardware/picamera2/test_picamera2_focus_unit.py +++ b/tests/controllers/hardware/picamera2/test_picamera2_focus_unit.py @@ -28,9 +28,11 @@ class _AfModeEnum: picamera2 = types.ModuleType("picamera2") picamera2.Picamera2 = type("Picamera2", (), {}) + cv2 = types.ModuleType("cv2") monkeypatch.setitem(sys.modules, "libcamera", libcamera) monkeypatch.setitem(sys.modules, "picamera2", picamera2) + monkeypatch.setitem(sys.modules, "cv2", cv2) sys.modules.pop("openscan_firmware.controllers.hardware.cameras.picamera2", None) return importlib.import_module("openscan_firmware.controllers.hardware.cameras.picamera2") diff --git a/tests/controllers/hardware/test_gpio.py b/tests/controllers/hardware/test_gpio.py new file mode 100644 index 0000000..fd666bb --- /dev/null +++ b/tests/controllers/hardware/test_gpio.py @@ -0,0 +1,52 @@ +import pytest + +from openscan_firmware.controllers.hardware import gpio as gpio_module + + +class _FakeDigitalOutputDevice: + def __init__(self, pin: int, initial_value: bool = False): + self.pin = pin + self.value = bool(initial_value) + + def toggle(self): + self.value = not self.value + + def close(self): + return None + + +@pytest.fixture(autouse=True) +def reset_gpio_state(monkeypatch): + original_outputs = gpio_module._output_pins.copy() + original_buttons = gpio_module._buttons.copy() + + gpio_module._output_pins.clear() + gpio_module._buttons.clear() + monkeypatch.setattr(gpio_module, "DigitalOutputDevice", _FakeDigitalOutputDevice) + + yield + + gpio_module._output_pins.clear() + gpio_module._buttons.clear() + gpio_module._output_pins.update(original_outputs) + gpio_module._buttons.update(original_buttons) + + +def test_set_output_pin_auto_initializes_when_pin_is_free(): + result = gpio_module.set_output_pin(10, True, auto_initialize=True) + + assert result is True + assert 10 in gpio_module._output_pins + assert gpio_module.get_output_pin(10) is True + + +def test_set_output_pin_rejects_pin_initialized_as_button(): + gpio_module._buttons[10] = object() + + with pytest.raises(ValueError, match="initialized as button input"): + gpio_module.set_output_pin(10, True, auto_initialize=True) + + +def test_set_output_pin_requires_initialized_output_without_auto_init(): + with pytest.raises(ValueError, match="not initialized as output"): + gpio_module.set_output_pin(11, True) diff --git a/tests/controllers/hardware/test_motor.py b/tests/controllers/hardware/test_motor.py index 8f27b71..d8dbea4 100644 --- a/tests/controllers/hardware/test_motor.py +++ b/tests/controllers/hardware/test_motor.py @@ -1,5 +1,6 @@ import pytest import asyncio # Still needed for async functions +from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock # Keep for specific mock types # Adjust paths if necessary for your project structure @@ -55,7 +56,7 @@ def motor_event_loop(): @pytest.fixture def mocked_dependencies(monkeypatch, motor_event_loop): - """Mocks GPIO, time.sleep, math.cos, and event_loop.run_in_executor.""" + """Mocks GPIO, time.sleep, math.cos, and the low-level movement executor.""" import openscan_firmware.controllers.hardware.motors as motors_module @@ -67,12 +68,12 @@ def mocked_dependencies(monkeypatch, motor_event_loop): mock_math_cos = MagicMock(return_value=0.0) monkeypatch.setattr(motors_module.math, 'cos', mock_math_cos) + async def fake_execute_movement(self, step_count: int, requested_degrees: float) -> int: + self.model.angle = requested_degrees % 360 + return abs(step_count) - def sync_run_in_executor_side_effect(executor, callback, *args): - return callback(*args) - - mock_run_in_executor = AsyncMock(side_effect=sync_run_in_executor_side_effect) - monkeypatch.setattr(motor_event_loop, 'run_in_executor', mock_run_in_executor, raising=True) + monkeypatch.setattr(MotorController, '_execute_movement', fake_execute_movement) + mock_run_in_executor = AsyncMock() return { "gpio": mock_gpio, @@ -83,6 +84,40 @@ def sync_run_in_executor_side_effect(executor, callback, *args): } +@pytest.fixture +def movement_dependencies(monkeypatch): + """Mocks hardware and executor boundaries while keeping _execute_movement real.""" + import openscan_firmware.controllers.hardware.motors as motors_module + + mock_gpio = MagicMock() + monkeypatch.setattr(motors_module, 'gpio', mock_gpio) + monkeypatch.setattr(motors_module.time, 'sleep', MagicMock()) + monkeypatch.setattr(motors_module, 'notify_busy_change', MagicMock()) + + class ImmediateExecutorLoop: + def run_in_executor(self, executor, callback, *args): + future = asyncio.Future() + try: + future.set_result(callback(*args)) + except Exception as exc: + future.set_exception(exc) + return future + + monkeypatch.setattr( + motors_module, + 'asyncio', + SimpleNamespace( + CancelledError=asyncio.CancelledError, + get_event_loop=MagicMock(return_value=ImmediateExecutorLoop()), + ), + ) + + return { + "gpio": mock_gpio, + "notify_busy_change": motors_module.notify_busy_change, + } + + @pytest.fixture def motor_controller_instance(motor_model_instance, motor_config_instance, mocked_dependencies): """Provides a MotorController instance with mocked dependencies.""" @@ -245,3 +280,77 @@ async def test_move_to_with_clamping(motor_controller_clamping_instance, motor_m assert motor_model.angle == pytest.approx(expected_angle, abs=1), \ f"Angle mismatch for move_to({target_val}) from {initial_angle}" + +@pytest.fixture +def movement_motor_controller(movement_dependencies): + settings = MotorConfig( + direction_pin=1, + enable_pin=2, + step_pin=3, + acceleration=20000, + max_speed=7500, + min_angle=0, + max_angle=360, + direction=1, + steps_per_rotation=3200, + ) + controller = MotorController(Motor(name="test_motor", settings=settings, angle=0.0)) + controller.set_idle_callbacks(lambda: False, AsyncMock()) + controller._pre_calculate_step_times = MagicMock(return_value=[0.0, 0.0, 0.0]) + return controller + + +@pytest.mark.asyncio +async def test_execute_movement_sets_direction_and_steps_forward(movement_motor_controller, movement_dependencies): + controller = movement_motor_controller + + await controller._execute_movement(3, 0.0) + + assert controller.model.angle == pytest.approx(3 / 3200 * 360) + movement_dependencies["gpio"].set_output_pin.assert_any_call(controller.settings.direction_pin, True) + assert movement_dependencies["gpio"].set_output_pin.call_args_list.count( + ((controller.settings.step_pin, True),) + ) == 3 + assert movement_dependencies["gpio"].set_output_pin.call_args_list.count( + ((controller.settings.step_pin, False),) + ) == 3 + assert controller._current_steps == 0 + + +@pytest.mark.asyncio +async def test_execute_movement_sets_direction_and_updates_angle_backward( + movement_motor_controller, + movement_dependencies, +): + controller = movement_motor_controller + controller.model.angle = 10.0 + + await controller._execute_movement(-3, 0.0) + + assert controller.model.angle == pytest.approx((10.0 - (3 / 3200 * 360)) % 360) + movement_dependencies["gpio"].set_output_pin.assert_any_call(controller.settings.direction_pin, False) + + +@pytest.mark.asyncio +async def test_execute_movement_stops_when_stop_requested( + movement_motor_controller, + movement_dependencies, +): + controller = movement_motor_controller + controller._pre_calculate_step_times = MagicMock(return_value=[0.0, 1.0, 2.0, 3.0]) + + step_high_calls = 0 + + def set_output_pin(pin, value): + nonlocal step_high_calls + if pin == controller.settings.step_pin and value is True: + step_high_calls += 1 + controller._stop_requested = True + + movement_dependencies["gpio"].set_output_pin.side_effect = set_output_pin + + await controller._execute_movement(4, 0.0) + + assert step_high_calls == 1 + assert controller.model.angle == pytest.approx(1 / 3200 * 360) + assert controller._current_steps == 0 diff --git a/tests/controllers/services/test_scan_task.py b/tests/controllers/services/test_scan_task.py index e2bbf80..a7e95df 100644 --- a/tests/controllers/services/test_scan_task.py +++ b/tests/controllers/services/test_scan_task.py @@ -196,71 +196,6 @@ async def delayed_add_photo(*args, **kwargs): ) -def test_generate_scan_path_passes_default_phi_constraints() -> None: - scan_settings = ScanSetting( - path_method=PathMethod.FIBONACCI, - points=10, - min_theta=10.0, - max_theta=120.0, - optimize_path=False, - focus_stacks=1, - focus_range=(10.0, 15.0), - image_format="jpeg", - ) - - with patch( - "openscan_firmware.controllers.services.tasks.core.scan_task._get_scan_radius_mm", - return_value=250.0, - ), patch( - "openscan_firmware.controllers.services.tasks.core.scan_task.paths.get_constrained_path", - return_value=[PolarPoint3D(theta=10.0, fi=20.0)], - ) as get_constrained_path: - path_dict = generate_scan_path(scan_settings) - - assert get_constrained_path.call_args.kwargs == { - "method": PathMethod.FIBONACCI, - "num_points": 10, - "min_theta": 10.0, - "max_theta": 120.0, - "min_phi": 0, - "max_phi": 360.0, - } - assert list(path_dict.keys()) == [PolarPoint3D(theta=10.0, fi=20.0, r=250.0)] - - -def test_generate_scan_path_passes_optional_phi_constraints_when_set() -> None: - scan_settings = ScanSetting( - path_method=PathMethod.FIBONACCI, - points=10, - min_theta=10.0, - max_theta=120.0, - min_phi=45.0, - max_phi=180.0, - optimize_path=False, - focus_stacks=1, - focus_range=(10.0, 15.0), - image_format="jpeg", - ) - - with patch( - "openscan_firmware.controllers.services.tasks.core.scan_task._get_scan_radius_mm", - return_value=250.0, - ), patch( - "openscan_firmware.controllers.services.tasks.core.scan_task.paths.get_constrained_path", - return_value=[PolarPoint3D(theta=10.0, fi=20.0)], - ) as get_constrained_path: - path_dict = generate_scan_path(scan_settings) - - assert get_constrained_path.call_args.kwargs == { - "method": PathMethod.FIBONACCI, - "num_points": 10, - "min_theta": 10.0, - "max_theta": 120.0, - "min_phi": 45.0, - "max_phi": 180.0, - } - assert list(path_dict.keys()) == [PolarPoint3D(theta=10.0, fi=20.0, r=250.0)] - @pytest.mark.asyncio @patch('openscan_firmware.controllers.hardware.cameras.camera.get_camera_controller') @patch('openscan_firmware.controllers.services.tasks.core.scan_task.get_project_manager') @@ -427,27 +362,13 @@ async def slow_add_photo_pause(*args, **kwargs): assert final_task_model.status == TaskStatus.COMPLETED @pytest.mark.asyncio - @patch('openscan_firmware.controllers.hardware.cameras.camera.get_camera_controller') - @patch('openscan_firmware.controllers.services.tasks.core.scan_task.get_project_manager') - @patch('openscan_firmware.controllers.services.tasks.core.scan_task.generate_scan_path') - @patch('openscan_firmware.controllers.hardware.motors', create=True) async def test_focus_stacking_pause_and_resume_mid_capture( self, - mock_motors: MagicMock, - mock_generate_scan_path: MagicMock, - mock_get_project_manager: MagicMock, - mock_get_camera_controller: MagicMock, - task_manager_fixture: TaskManager, mock_camera_controller: MagicMock, sample_scan_model: Scan, - mock_project_manager: MagicMock, fake_photo_data: PhotoData, ): """Ensure pausing during focus stacking resumes cleanly mid-stack.""" - - mock_get_camera_controller.return_value = mock_camera_controller - mock_get_project_manager.return_value = mock_project_manager - scan = sample_scan_model.model_copy(deep=True) scan.settings.focus_stacks = 12 scan.settings.focus_range = (0.1, 0.4) @@ -456,52 +377,58 @@ async def test_focus_stacking_pause_and_resume_mid_capture( focus_settings = FocusTrackingSettings(AF=True, manual_focus=0.05) mock_camera_controller.settings = focus_settings - path_points = { - PolarPoint3D(theta=0, fi=0): 0, - PolarPoint3D(theta=15, fi=15): 1, - } - mock_generate_scan_path.return_value = path_points - mock_motors.move_to_point = AsyncMock(return_value=None) - capture_event = asyncio.Event() photo_counter = {"count": 0} pause_trigger_index = 2 - def slow_focus_photo(*args, **kwargs): + async def slow_focus_photo(*args, **kwargs): photo_counter["count"] += 1 if photo_counter["count"] == pause_trigger_index: capture_event.set() - time.sleep(0.05) + await asyncio.sleep(0.05) return fake_photo_data async def slow_add_photo(*args, **kwargs): await asyncio.sleep(0.01) - mock_camera_controller.photo.side_effect = slow_focus_photo - mock_project_manager.add_photo_async.side_effect = slow_add_photo + mock_camera_controller.photo_async = AsyncMock(side_effect=slow_focus_photo) + project_manager = MagicMock() + project_manager.add_photo_async = AsyncMock(side_effect=slow_add_photo) - tm = task_manager_fixture - task_model = await tm.create_and_run_task("scan_task", scan, 0) + task_model = Task(name="scan_task", task_type="core") + scan_task = ScanTask(task_model) + scan_task._ctx = ScanRuntime( + scan=scan, + camera_controller=mock_camera_controller, + project_manager=project_manager, + path_dict={PolarPoint3D(theta=0, fi=0): 0}, + focus_context={ + "enabled": True, + "positions": focus_positions, + "previous_settings": (focus_settings.AF, focus_settings.manual_focus), + }, + ) + + capture_task = asyncio.create_task( + scan_task._capture_photos_at_position(PolarPoint3D(theta=0, fi=0), 0) + ) await asyncio.wait_for(capture_event.wait(), timeout=2.0) - paused_task = await tm.pause_task(task_model.id) - assert paused_task.status == TaskStatus.PAUSED - assert mock_camera_controller.photo.call_count < scan.settings.focus_stacks + scan_task.pause() + assert mock_camera_controller.photo_async.await_count < scan.settings.focus_stacks await asyncio.sleep(0.05) + assert scan_task.is_paused() - resumed_task = await tm.resume_task(task_model.id) - assert resumed_task.status == TaskStatus.RUNNING - - final_task_model = await tm.wait_for_task(task_model.id) - assert final_task_model.status == TaskStatus.COMPLETED + scan_task.resume() + await capture_task + await asyncio.sleep(0) - expected_photos = scan.settings.focus_stacks * len(path_points) - assert mock_camera_controller.photo.call_count == expected_photos - assert mock_project_manager.add_photo_async.await_count == expected_photos + expected_photos = scan.settings.focus_stacks + assert mock_camera_controller.photo_async.await_count == expected_photos + assert project_manager.add_photo_async.await_count == expected_photos - expected_history = focus_positions * len(path_points) + [0.05] - assert focus_settings.history == expected_history + assert focus_settings.history == focus_positions @pytest.mark.asyncio @patch('openscan_firmware.controllers.hardware.cameras.camera.get_camera_controller') @@ -711,6 +638,72 @@ async def test_focus_stacking_uses_configured_image_format( assert awaited_call.args == ("rgb_array",) +def test_generate_scan_path_passes_default_phi_constraints() -> None: + scan_settings = ScanSetting( + path_method=PathMethod.FIBONACCI, + points=10, + min_theta=10.0, + max_theta=120.0, + optimize_path=False, + focus_stacks=1, + focus_range=(10.0, 15.0), + image_format="jpeg", + ) + + with patch( + "openscan_firmware.controllers.services.tasks.core.scan_task._get_scan_radius_mm", + return_value=250.0, + ), patch( + "openscan_firmware.controllers.services.tasks.core.scan_task.paths.get_constrained_path", + return_value=[PolarPoint3D(theta=10.0, fi=20.0)], + ) as get_constrained_path: + path_dict = generate_scan_path(scan_settings) + + assert get_constrained_path.call_args.kwargs == { + "method": PathMethod.FIBONACCI, + "num_points": 10, + "min_theta": 10.0, + "max_theta": 120.0, + "min_phi": 0, + "max_phi": 360.0, + } + assert list(path_dict.keys()) == [PolarPoint3D(theta=10.0, fi=20.0, r=250.0)] + + +def test_generate_scan_path_passes_optional_phi_constraints_when_set() -> None: + scan_settings = ScanSetting( + path_method=PathMethod.FIBONACCI, + points=10, + min_theta=10.0, + max_theta=120.0, + min_phi=45.0, + max_phi=180.0, + optimize_path=False, + focus_stacks=1, + focus_range=(10.0, 15.0), + image_format="jpeg", + ) + + with patch( + "openscan_firmware.controllers.services.tasks.core.scan_task._get_scan_radius_mm", + return_value=250.0, + ), patch( + "openscan_firmware.controllers.services.tasks.core.scan_task.paths.get_constrained_path", + return_value=[PolarPoint3D(theta=10.0, fi=20.0)], + ) as get_constrained_path: + path_dict = generate_scan_path(scan_settings) + + assert get_constrained_path.call_args.kwargs == { + "method": PathMethod.FIBONACCI, + "num_points": 10, + "min_theta": 10.0, + "max_theta": 120.0, + "min_phi": 45.0, + "max_phi": 180.0, + } + assert list(path_dict.keys()) == [PolarPoint3D(theta=10.0, fi=20.0, r=250.0)] + + def test_generate_scan_path_fully_fixed_position_has_single_zero_index_step() -> None: scan_settings = ScanSetting( path_method=PathMethod.FIBONACCI, diff --git a/tests/routers/test_next_gpio_router.py b/tests/routers/test_next_gpio_router.py new file mode 100644 index 0000000..174c4f8 --- /dev/null +++ b/tests/routers/test_next_gpio_router.py @@ -0,0 +1,46 @@ +from importlib import import_module + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + + +@pytest.fixture +def gpio_client_next() -> TestClient: + app = FastAPI() + gpio_router = import_module("openscan_firmware.routers.next.gpio") + app.include_router(gpio_router.router, prefix="/next") + with TestClient(app) as client: + yield client + + +def test_next_gpio_patch_sets_pin_with_auto_init(monkeypatch, gpio_client_next): + module_path = "openscan_firmware.routers.next.gpio" + captured: dict[str, tuple[int, bool, bool]] = {} + + def fake_set_output_pin(pin: int, status: bool, auto_initialize: bool = False): + captured["args"] = (pin, status, auto_initialize) + return status + + monkeypatch.setattr(f"{module_path}.gpio.set_output_pin", fake_set_output_pin, raising=False) + + response = gpio_client_next.patch("/next/gpio/10", params={"status": "true"}) + + assert response.status_code == 200 + assert response.json() is True + assert captured["args"] == (10, True, True) + + +def test_next_gpio_patch_returns_clear_conflict_for_busy_pin(monkeypatch, gpio_client_next): + module_path = "openscan_firmware.routers.next.gpio" + detail = "Cannot set pin 10. Pin is initialized as button input." + + def fake_set_output_pin(pin: int, status: bool, auto_initialize: bool = False): + raise ValueError(detail) + + monkeypatch.setattr(f"{module_path}.gpio.set_output_pin", fake_set_output_pin, raising=False) + + response = gpio_client_next.patch("/next/gpio/10", params={"status": "true"}) + + assert response.status_code == 409 + assert response.json()["detail"] == detail From baebc2670a9348d0a2636e49d43cac136c057c63 Mon Sep 17 00:00:00 2001 From: MicioMax Date: Wed, 6 May 2026 11:03:48 +0200 Subject: [PATCH 6/7] Add PWM support for light control (#66) Adds PWM-backed light control with percentage-based intensity handling. - Add PWM configuration defaults and validation for light configs - Map light intensity values (`0..100`) to the configured PWM range (`pwm_min..pwm_max`) - Handle hardware PWM cleanup through the PWM helper - Expose the current light intensity in `LightStatusResponse` - Add PWM developer documentation - Add/update light controller tests for intensity status and value clamping Co-authored-by: esto --- docs/PWM.md | 35 +++ openscan_firmware/config/light.py | 15 +- .../controllers/hardware/gpio.py | 107 +++++++-- .../controllers/hardware/lights.py | 44 +++- openscan_firmware/routers/next/lights.py | 28 ++- openscan_firmware/utils/pwm_hardware.py | 203 ++++++++++++++++++ settings/device/example_custom.json | 17 +- tests/controllers/hardware/test_light.py | 19 +- 8 files changed, 440 insertions(+), 28 deletions(-) create mode 100644 docs/PWM.md create mode 100644 openscan_firmware/utils/pwm_hardware.py diff --git a/docs/PWM.md b/docs/PWM.md new file mode 100644 index 0000000..8b4f7b4 --- /dev/null +++ b/docs/PWM.md @@ -0,0 +1,35 @@ +# PWM in OpenScan3 + +This is a short developer note about the PWM abstraction used in OpenScan3. + +## Summary + +OpenScan3 handles light intensity as a percentage (`0..100`) at controller/API level, then maps that value to a PWM duty cycle (`0..1`) based on configured voltage bounds (`pwm_min`, `pwm_max`). + +Main modules: + +- `openscan_firmware/controllers/hardware/lights.py` + - Owns brightness state (`value` in percent) and mapping logic. +- `openscan_firmware/controllers/hardware/gpio.py` + - Selects hardware PWM when available, otherwise software PWM fallback. +- `openscan_firmware/utils/pwm_hardware.py` + - Low-level hardware PWM implementation for Raspberry Pi (`/sys/class/pwm` + pinctrl). + +## Raspberry Pi Setup Requirement + +For hardware PWM support, add the following to `/boot/firmware/config.txt`: + +```txt +dtparam=audio=off +dtoverlay=pwm-2chan +``` + +Important: + +- PWM and onboard audio are mutually exclusive with this setup. +- If audio is required, use a separate external PWM chip on the board. + +## Practical Note + +`pwm_hardware.py` is the utility-layer solution for hardware PWM. +As long as the boot config above is applied, the rest is handled by the OpenScan3 abstraction. diff --git a/openscan_firmware/config/light.py b/openscan_firmware/config/light.py index b8cd511..90b6fa0 100644 --- a/openscan_firmware/config/light.py +++ b/openscan_firmware/config/light.py @@ -12,6 +12,9 @@ class LightConfig(BaseModel): default=False, description="Indicates whether this light hardware can handle PWM (otherwise only on/off).", ) + pwm_frequency: float = Field(10000.0, ge=50.0, le=100000.0, description="PWM frequency for led driver.") + pwm_min: float = Field(0.0, ge=0, le=3.3, description="Minimum pwm voltage for led driver.") + pwm_max: float = Field(3.3, ge=0, le=3.3, description="Maximum pwm voltage for led driver.") @model_validator(mode="before") @classmethod @@ -37,4 +40,14 @@ def ensure_pins(cls, values): merged_pins.append(pin) values["pins"] = list(dict.fromkeys(merged_pins)) - return values \ No newline at end of file + return values + + @model_validator(mode="after") + def validate_pwm_range(self): + """ + Ensures a valid range when PWM mode is enabled. + """ + if self.pwm_support and self.pwm_min >= self.pwm_max: + raise ValueError("pwm_min must be less than pwm_max") + + return self diff --git a/openscan_firmware/controllers/hardware/gpio.py b/openscan_firmware/controllers/hardware/gpio.py index ff8407a..40f700f 100644 --- a/openscan_firmware/controllers/hardware/gpio.py +++ b/openscan_firmware/controllers/hardware/gpio.py @@ -1,12 +1,16 @@ import logging -from gpiozero import DigitalOutputDevice, Button +from gpiozero import DigitalOutputDevice, PWMOutputDevice, Button from typing import Dict, List, Optional, Callable +# hardware PWM module +from openscan_firmware.utils.pwm_hardware import hwpwm + logger = logging.getLogger(__name__) # Track pins and buttons _output_pins = {} +_pwm_pins = {} _buttons = {} @@ -15,8 +19,10 @@ def initialize_output_pins(pins: List[int]): for pin in pins: if pin in _output_pins: logger.warning(f"Warning: Output pin {pin} already initialized.") + elif pin in _pwm_pins: + logger.error(f"Error: Cannot initialize pin {pin} as output. Already initialized as PWM.") elif pin in _buttons: - logger.error(f"Error: Cannot initialize pin {pin} as output. Already initialized as Button.") + logger.error(f"Error: Cannot initialize pin {pin} as output. Already initialized as button.") else: try: _output_pins[pin] = DigitalOutputDevice(pin, initial_value=False) @@ -27,7 +33,6 @@ def initialize_output_pins(pins: List[int]): if pin in _output_pins: del _output_pins[pin] - def toggle_output_pin(pin: int): """Toggles the state of an output pin.""" if pin in _output_pins: @@ -65,14 +70,6 @@ def set_output_pin(pin: int, status: bool, auto_initialize: bool = False): raise ValueError(message) -def get_initialized_pins() -> Dict[str, List[int]]: - """Returns a dictionary listing initialized output pins and buttons.""" - return { - "output_pins": list(_output_pins.keys()), - "buttons": list(_buttons.keys()) - } - - def get_output_pin(pin: int): """Returns the state of an output pin.""" if pin in _output_pins: @@ -83,6 +80,63 @@ def get_output_pin(pin: int): raise ValueError(message) +def initialize_pwm_pins(pins: List[int], freq: int): + """Initializes one or more GPIO pins as pwm outputs.""" + for pin in pins: + if pin in _pwm_pins: + logger.warning(f"Warning: PWM pin {pin} already initialized.") + elif pin in _output_pins: + logger.error(f"Error: Cannot initialize pin {pin} as PWM. Already initialized as output.") + elif pin in _buttons: + logger.error(f"Error: Cannot initialize pin {pin} as PWM. Already initialized as button.") + else: + try: + if hwpwm.supports(pin): + _pwm_pins[pin] = pin + hwpwm.setup(pin) + hwpwm.set_frequency(pin, freq) + logger.info(f"Initialized pin {pin} as hardware PWM.") + else: + _pwm_pins[pin] = PWMOutputDevice(pin, active_high=True, initial_value=0.0, frequency=freq) + logger.info(f"Initialized pin {pin} as software PWM.") + except Exception as e: + logger.error(f"Error initializing PWM pin {pin}: {e}", exc_info=True) + # Clean up if initialization failed partially + if pin in _pwm_pins: + del _pwm_pins[pin] + +def set_pwm_pin(pin: int, value: float): + """Sets the value of a PWM pin.""" + if pin in _pwm_pins: + dev = _pwm_pins[pin] + + # on hw pwm we store just pin number here, not the device + if isinstance(dev, int): + # hw pwm + hwpwm.set_duty_cycle(dev, value) + else: + # soft pwm + _pwm_pins[pin].value = value + else: + logger.warning(f"Warning: Cannot set pin {pin}. Not initialized as PWM.") + +def get_pwm_pin(pin: int): + """Returns the state of an output pin.""" + if pin in _pwm_pins: + dev = _pwm_pins[pin] + + # on hw pwm we store just pin number here, not the device + if isinstance(dev, int): + # hw pwm + return hwpwm.get_duty_cycle(dev) + else: + # soft pwm + return _pwm_pins[pin].value + else: + logger.warning(f"Warning: Pin {pin} not initialized as PWM.") + return None + + def initialize_button(pin: int, pull_up: Optional[bool] = True, bounce_time: Optional[float] = 0.05): """ Initializes a GPIO pin as button input using gpiozero.Button. @@ -98,6 +152,8 @@ def initialize_button(pin: int, pull_up: Optional[bool] = True, bounce_time: Opt logger.warning(f"Warning: Button on pin {pin} already initialized.") elif pin in _output_pins: logger.error(f"Error: Cannot initialize pin {pin} as Button. Already initialized as output.") + elif pin in _pwm_pins: + logger.error(f"Error: Cannot initialize pin {pin} as output. Already initialized as PWM.") else: try: _buttons[pin] = Button(pin, pull_up=pull_up, bounce_time=bounce_time, hold_time=0.01) @@ -182,10 +238,32 @@ def is_button_pressed(pin: int) -> Optional[bool]: # Returning None indicates it's not a known button. return None +def get_initialized_pins() -> Dict[str, List[int]]: + """Returns a dictionary listing initialized output pins and buttons.""" + return { + "output_pins": list(_output_pins.keys()), + "pwm_pins": list(_pwm_pins.keys()), + "buttons": list(_buttons.keys()) + } + def cleanup_all_pins(): """Closes all initialized GPIO devices (output pins and buttons).""" logger.debug("Cleaning up GPIO resources...") + # Close PWM pins + pins_to_remove = list(_pwm_pins.keys()) # Create a copy of keys to iterate over + for pin in pins_to_remove: + try: + dev = _pwm_pins[pin] + if isinstance(dev, int): + hwpwm.release(dev) + else: + dev.close() + del _pwm_pins[pin] # Remove from tracking dict after successful close + logger.debug(f"PWM pin {pin} closed.") + except Exception as e: + logger.error(f"Error closing PWM pin {pin}: {e}", exc_info=True) + # Close output pins pins_to_remove = list(_output_pins.keys()) # Create a copy of keys to iterate over for pin in pins_to_remove: @@ -207,7 +285,10 @@ def cleanup_all_pins(): logger.error(f"Error closing button on pin {pin}: {e}", exc_info=True) # Double check if dictionaries are empty - if not _output_pins and not _buttons: + if not _output_pins and not _pwm_pins and not _buttons: logger.info("GPIO cleanup successful. All tracked pins released.") else: - logger.warning(f"GPIO cleanup potentially incomplete. Remaining outputs: {list(_output_pins.keys())}, Remaining buttons: {list(_buttons.keys())}") + logger.warning( + f"GPIO cleanup potentially incomplete. Remaining outputs: {list(_output_pins.keys())}, " + f"Remaining PWM: {list(_pwm_pins.keys())}, Remaining buttons: {list(_buttons.keys())}" + ) diff --git a/openscan_firmware/controllers/hardware/lights.py b/openscan_firmware/controllers/hardware/lights.py index 0c08dff..0c8ecd9 100644 --- a/openscan_firmware/controllers/hardware/lights.py +++ b/openscan_firmware/controllers/hardware/lights.py @@ -16,6 +16,8 @@ from openscan_firmware.controllers.hardware.interfaces import HardwareEvent, SwitchableHardware, SleepCapableHardware, create_controller_registry from openscan_firmware.controllers.services.device_events import schedule_device_status_broadcast +from openscan_firmware.utils.inactivity_timer import inactivity_timer + logger = logging.getLogger(__name__) class LightController(SwitchableHardware, SleepCapableHardware): @@ -26,17 +28,25 @@ def __init__(self, light: Light): on_change=self._apply_settings_to_hardware ) self._is_on = False - # idle helpers must exist before first refresh - self.is_idle = lambda: False + self._value = 100.0 + + # no idle callbacks + self.is_idle = lambda: True self.send_event = None + self._apply_settings_to_hardware(self.settings.model) logger.debug(f"Light controller for '{self.model.name}' initialized.") - + def _apply_settings_to_hardware(self, settings: LightConfig): """Apply settings to hardware and preserve light state.""" self.model.settings = settings - gpio.initialize_output_pins(self.settings.pins) + if self.settings.pwm_support: + logger.info(f"Light '{self.model.name}' initializing PWM.") + gpio.initialize_pwm_pins(self.settings.pins, self.settings.pwm_frequency) + else: + logger.info(f"Light '{self.model.name}' initializing digital.") + gpio.initialize_output_pins(self.settings.pins) # Re-apply desired state synchronously; refresh handles idle logic self.refresh() @@ -47,6 +57,7 @@ def get_status(self): return { "name": self.model.name, "is_on": self.is_on, + "value": self._value, "settings": self.get_config().model_dump() } @@ -54,14 +65,24 @@ def get_config(self) -> LightConfig: return self.settings.model def refresh(self): + inactivity_timer.reset() if self.is_idle(): logger.info(f"Light '{self.model.name}' idle.") for pin in self.settings.pins: - gpio.set_output_pin(pin, False) + if self.settings.pwm_support: + gpio.set_pwm_pin(pin, self.settings.pwm_min / 3.3) + else: + gpio.set_output_pin(pin, False) else: logger.info(f"Light '{self.model.name}' active.") for pin in self.settings.pins: - gpio.set_output_pin(pin, self._is_on) + if self.settings.pwm_support: + _minVal = self.settings.pwm_min / 3.3 + _maxVal = self.settings.pwm_max / 3.3 + _val = self._value / 100.0 * (_maxVal - _minVal) + _minVal + gpio.set_pwm_pin(pin, _val if self._is_on else _minVal) + else: + gpio.set_output_pin(pin, self._is_on) def set_idle_callbacks(self, is_idle: Callable[[], bool], send_event: Callable[[HardwareEvent], Awaitable[None]]) -> None: @@ -91,6 +112,17 @@ async def turn_off(self): await self._wake_if_idle(HardwareEvent.LIGHT_EVENT) logger.info(f"Light '{self.model.name}' turned off.") schedule_device_status_broadcast([f"lights.{self.model.name}.is_on"]) + + async def set_value(self, value: float): + if value < 0: + self._value = 0 + elif value > 100: + self._value = 100 + else: + self._value = value + await self._wake_if_idle(HardwareEvent.LIGHT_EVENT) + schedule_device_status_broadcast([f"lights.{self.model.name}.value"]) + logger.info(f"Light '{self.model.name}' value set to {self._value}.") create_light_controller, get_light_controller, remove_light_controller, _light_registry = create_controller_registry(LightController) diff --git a/openscan_firmware/routers/next/lights.py b/openscan_firmware/routers/next/lights.py index 8bc8a08..c363209 100644 --- a/openscan_firmware/routers/next/lights.py +++ b/openscan_firmware/routers/next/lights.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from openscan_firmware.controllers.hardware.lights import get_light_controller, get_all_light_controllers @@ -14,6 +14,7 @@ class LightStatusResponse(BaseModel): name: str is_on: bool + value: float settings: LightConfig @@ -102,6 +103,31 @@ async def toggle_light(light_name: str): except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) +@router.put("/{light_name}/intensity", response_model=LightStatusResponse) +async def pwm_light( + light_name: str, + value: float = Query( + 100, + description=( + "sets light intensity, from 0 to 100%" + ), + ), +): + """Set light intensity + + Args: + light_name: The name of the light to toggle + value: intensity of light, from 0% to 100% + + Returns: + LightStatusResponse: A response object containing the status of the light after the toggle operation + """ + try: + controller = get_light_controller(light_name) + await controller.set_value(value) + return controller.get_status() + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) create_settings_endpoints( router=router, diff --git a/openscan_firmware/utils/pwm_hardware.py b/openscan_firmware/utils/pwm_hardware.py new file mode 100644 index 0000000..719d140 --- /dev/null +++ b/openscan_firmware/utils/pwm_hardware.py @@ -0,0 +1,203 @@ +import subprocess +from pathlib import Path + +from dataclasses import dataclass + +import atexit +import signal +import sys + +@dataclass +class _HwPWM: + + _PWMCHIP = Path("/sys/class/pwm/pwmchip0") + + _PIN_INFO = { + 12: {"channel": 0, "alt": "a0"}, + 18: {"channel": 0, "alt": "a5"}, + 13: {"channel": 1, "alt": "a0"}, + 19: {"channel": 1, "alt": "a5"}, + } + + _pins = {} + + # register cleanup at exit + def __init__(self): + atexit.register(_HwPWM._cleanup) + signal.signal(signal.SIGTERM, _HwPWM._signal_handler) + signal.signal(signal.SIGINT, _HwPWM._signal_handler) + + @staticmethod + def _run(cmd): + result = subprocess.run(cmd, check=True, capture_output=True, text=True).stdout + try: + return result.split(":", 1)[1].split()[0] + except: + return "" + + @staticmethod + def _pwm_path(channel): + return _HwPWM._PWMCHIP / f"pwm{channel}" + + + @staticmethod + def _write(path, value): + path.write_text(str(value)) + + + @staticmethod + def _export(channel): + p = _HwPWM._pwm_path(channel) + if not p.exists(): + (_HwPWM._PWMCHIP / "export").write_text(str(channel)) + + + @staticmethod + def _unexport(channel): + p = _HwPWM._pwm_path(channel) + if p.exists(): + (_HwPWM._PWMCHIP / "unexport").write_text(str(channel)) + + + @staticmethod + def supports(pin: int): + # first check if pin is a supported one + if not pin in _HwPWM._PIN_INFO: + return False + + # then check if its PWM is not already in use + chan = _HwPWM._PIN_INFO[pin]["channel"] + for p in _HwPWM._pins.keys(): + # harmless re-set already set pin + if p == pin: + return True + # if using same channel as already setup pin don't accept it + if chan == _HwPWM._PIN_INFO[p]["channel"]: + return False + + # available PWM pin and channel not used, ok + return True + + @staticmethod + def setup(pin: int): + if not _HwPWM.supports(pin): + raise ValueError("unsupported pin or pwm channel in use") + + info = _HwPWM._PIN_INFO[pin] + ch = info["channel"] + + # configure pin mux + old_func = _HwPWM._run(["pinctrl", str(pin)]) + _HwPWM._run(["pinctrl", str(pin), info["alt"]]) + + # enable pwm channel + _HwPWM._export(ch) + + pwm = _HwPWM._pwm_path(ch) + + # ensure disabled before configuration + try: + _HwPWM._write(pwm / "enable", 0) + except: + pass + + _HwPWM._pins[pin] = { "freq": 20000.0, "duty": 1.0, "oldfunc": old_func } + + + @staticmethod + def release(pin: int): + if not pin in _HwPWM._pins: + return + + info = _HwPWM._PIN_INFO[pin] + ch = info["channel"] + + pwm = _HwPWM._pwm_path(ch) + + if pwm.exists(): + try: + _HwPWM._write(pwm / "enable", 0) + except: + pass + + # return pin to input + _HwPWM._run(["pinctrl", str(pin), _HwPWM._pins[pin]["oldfunc"]]) + + del _HwPWM._pins[pin] + + @staticmethod + def _set_freq_duty(pin: int, freq: float, duty: float): + + info = _HwPWM._PIN_INFO[pin] + ch = info["channel"] + + pwm = _HwPWM._pwm_path(ch) + + period_ns = int(1_000_000_000 / freq) + duty_val = int(period_ns * duty) + + _HwPWM._write(pwm / "enable", 0) + _HwPWM._write(pwm / "period", period_ns) + _HwPWM._write(pwm / "duty_cycle", duty_val) + _HwPWM._write(pwm / "enable", 1) + + _HwPWM._pins[pin]["freq"] = freq + _HwPWM._pins[pin]["duty"] = duty + + @staticmethod + def set_frequency(pin: int, freq: float): + if not pin in _HwPWM._pins: + raise ValueError("pwm pin not initialized") + + info = _HwPWM._PIN_INFO[pin] + ch = info["channel"] + + duty = _HwPWM._pins[pin]["duty"] + _HwPWM._set_freq_duty(pin, freq, duty) + + @staticmethod + def get_frequency(pin: int): + if not pin in _HwPWM._pins: + raise ValueError("pwm pin not initialized") + + return _HwPWM._pins[pin]["freq"] + + @staticmethod + def set_duty_cycle(pin: int, duty: float): + if not pin in _HwPWM._pins: + raise ValueError("pwm pin not initialized") + + info = _HwPWM._PIN_INFO[pin] + ch = info["channel"] + + freq = _HwPWM._pins[pin]["freq"] + _HwPWM._set_freq_duty(pin, freq, duty) + + + @staticmethod + def get_duty_cycle(pin: int): + if not pin in _HwPWM._pins: + raise ValueError("pwm pin not initialized") + + return _HwPWM._pins[pin]["duty"] + + # cleanup routines -- resets PWM pins + + @staticmethod + def _cleanup(): + to_clean = [] + for pin in _HwPWM._pins.keys(): + to_clean.append(pin) + for pin in to_clean: + _HwPWM.release(pin) + + def _signal_handler(signum, frame): + _HwPWM._cleanup() + + +# ========================================================== +# SINGLETON +# ========================================================== + +# hardware pw, singleton +hwpwm = _HwPWM() diff --git a/settings/device/example_custom.json b/settings/device/example_custom.json index 9c3225a..2e1e148 100644 --- a/settings/device/example_custom.json +++ b/settings/device/example_custom.json @@ -26,10 +26,15 @@ } }, "lights": { - "Openscan.eu Ringlight": { - "pins": [12], - "pwm_support": true - } + "Openscan.eu Ringlight": { + "pins": [ + 12 + ], + "pwm_support": true, + "pwm_frequency": 50000.0, + "pwm_min": 0.0, + "pwm_max": 3.3 + } }, "endstops": { "rotor-endstop": { @@ -44,7 +49,7 @@ } } }, - "motors_timeout": 180, + "motors_timeout": 30.0, "startup_mode": "startup_idle", - "calibrate_mode": "calibrate_on_wake" + "calibrate_mode": "calibrate_on_home" } diff --git a/tests/controllers/hardware/test_light.py b/tests/controllers/hardware/test_light.py index 5a93cd3..d08ec00 100644 --- a/tests/controllers/hardware/test_light.py +++ b/tests/controllers/hardware/test_light.py @@ -99,5 +99,22 @@ def test_lightcontroller_get_status(light_config_with_pins, idle_callbacks): assert isinstance(status, dict) assert status["name"] == "test_light" assert status["is_on"] is False # Light not turned on after initializing + assert status["value"] == 100.0 assert isinstance(status["settings"], dict) - assert status["settings"]["pins"] == light_config_with_pins.pins \ No newline at end of file + assert status["settings"]["pins"] == light_config_with_pins.pins + + +@pytest.mark.asyncio +async def test_set_value_clamps_and_updates_status(light_config_with_pins, idle_callbacks): + light = Light(name="test_light", settings=light_config_with_pins) + controller = LightController(light) + controller.set_idle_callbacks(*idle_callbacks) + + await controller.set_value(150) + assert controller.get_status()["value"] == 100 + + await controller.set_value(-2) + assert controller.get_status()["value"] == 0 + + await controller.set_value(42.5) + assert controller.get_status()["value"] == 42.5 From f9b180a521289d20750bb3a7325ae8dc90e13d15 Mon Sep 17 00:00:00 2001 From: esto Date: Wed, 6 May 2026 14:14:13 +0200 Subject: [PATCH 7/7] Rename `motors_timeout` to `idle_timeout` and add wakeup API endpoint (#113) * feat(device): introduce PWM configuration and idle timeout support - Added new fields for PWM frequency, min, and max voltage in the light configuration schema. - Introduced `idle_timeout` to replace and mirror `motors_timeout` for backward compatibility. - Updated models, controllers, and routers to handle `idle_timeout` in device settings. - Extended OpenAPI definitions to support new PWM and timeout configurations. - Adjusted tests to validate changes in timeout handling and PWM intensity settings. * feat(device): add wakeup endpoint to resume idle devices - Introduced `/wakeup` endpoint to bring devices out of idle mode. - Device auto-recalibrates upon wake if calibration is required. - Updated OpenAPI definitions and tests to include wakeup functionality. * chore: bump version to 0.11.4 in pyproject.toml --- openscan_firmware/controllers/device.py | 11 +- openscan_firmware/models/scanner.py | 33 ++++- openscan_firmware/routers/next/device.py | 26 ++++ openscan_firmware/routers/v0_8/device.py | 22 +++ openscan_firmware/routers/v0_9/device.py | 25 ++++ pyproject.toml | 2 +- scripts/openapi/openapi_latest.json | 70 +++++++++- scripts/openapi/openapi_next.json | 138 ++++++++++++++++++- scripts/openapi/openapi_v0.8.json | 60 +++++++- scripts/openapi/openapi_v0.9.json | 70 +++++++++- tests/controllers/services/test_scan_task.py | 2 + tests/routers/test_device_router.py | 51 +++++++ tests/routers/test_device_router_v0_8.py | 53 +++++++ tests/routers/test_device_router_v0_9.py | 81 +++++++++++ 14 files changed, 622 insertions(+), 22 deletions(-) create mode 100644 tests/routers/test_device_router_v0_9.py diff --git a/openscan_firmware/controllers/device.py b/openscan_firmware/controllers/device.py index 2491697..80542cc 100644 --- a/openscan_firmware/controllers/device.py +++ b/openscan_firmware/controllers/device.py @@ -144,6 +144,7 @@ def _runtime_to_persisted_config() -> ScannerDeviceConfig: name: PersistedEndstopConfig(settings=endstop.settings) for name, endstop in _scanner_device.endstops.items() }, + idle_timeout=_scanner_device.idle_timeout, motors_timeout=_scanner_device.motors_timeout, scan_radius_mm=_scanner_device.scan_radius_mm, startup_mode=_scanner_device.startup_mode.value if _scanner_device.startup_mode else None, @@ -270,6 +271,7 @@ def get_device_info(): "lights": {name: controller.get_status() for name, controller in get_all_light_controllers().items()}, "triggers": {name: controller.get_status() for name, controller in get_all_trigger_controllers().items()}, + "idle_timeout": _scanner_device.idle_timeout, "motors_timeout": _scanner_device.motors_timeout, "scan_radius_mm": _scanner_device.scan_radius_mm, "startup_mode": _scanner_device.startup_mode, @@ -696,7 +698,8 @@ async def _initialize_with_config(config: dict | ScannerDeviceConfig, detect_cam triggers=trigger_objects, endstops=endstop_objects, - # motors timeout in seconds - 0 to disable + # device idle timeout in seconds - 0 to disable + idle_timeout=config_dict["idle_timeout"], motors_timeout=config_dict["motors_timeout"], scan_radius_mm=config_dict["scan_radius_mm"], @@ -709,11 +712,11 @@ async def _initialize_with_config(config: dict | ScannerDeviceConfig, detect_cam _scanner_device._initialized=True # initialize inactivity timer - if _scanner_device.motors_timeout > 0: - inactivity_timer.set_timeout(_scanner_device.motors_timeout) + if _scanner_device.idle_timeout > 0: + inactivity_timer.set_timeout(_scanner_device.idle_timeout) inactivity_timer.on_timeout = go_to_idle inactivity_timer.enable() - logger.info(f"Inactivity timer set to {_scanner_device.motors_timeout} seconds.") + logger.info(f"Inactivity timer set to {_scanner_device.idle_timeout} seconds.") else: inactivity_timer.disable() logger.info("Inactivity timer disabled.") diff --git a/openscan_firmware/models/scanner.py b/openscan_firmware/models/scanner.py index 15177bd..50157c2 100644 --- a/openscan_firmware/models/scanner.py +++ b/openscan_firmware/models/scanner.py @@ -1,7 +1,7 @@ from enum import Enum from typing import Optional -from pydantic import BaseModel, PrivateAttr, ConfigDict, Field +from pydantic import BaseModel, PrivateAttr, ConfigDict, Field, model_validator from openscan_firmware.config.camera import CameraSettings from openscan_firmware.config.endstop import EndstopConfig @@ -47,7 +47,9 @@ class ScannerDevice(BaseModel): triggers: dict[str, Trigger] = Field(default_factory=dict) endstops: Optional[dict[str, Endstop]] - # motors timeout in seconds - 0 to disable + # device idle timeout in seconds - 0 to disable + idle_timeout: float = 0.0 + # backward compatibility alias for older code paths / payloads motors_timeout: float = 0.0 scan_radius_mm: float = Field( default=1.0, @@ -60,6 +62,14 @@ class ScannerDevice(BaseModel): _idle : bool = PrivateAttr(default=False) _initialized: bool = PrivateAttr(default=False) + @model_validator(mode="after") + def _sync_idle_timeout_fields(self) -> "ScannerDevice": + # Keep both names aligned to avoid breaking older callers. + if self.idle_timeout == 0.0 and self.motors_timeout != 0.0: + self.idle_timeout = self.motors_timeout + self.motors_timeout = self.idle_timeout + return self + class PersistedCameraConfig(BaseModel): type: CameraType | str @@ -84,7 +94,8 @@ class ScannerDeviceConfig(BaseModel): lights: dict[str, LightConfig] = Field(default_factory=dict) triggers: dict[str, TriggerConfig] = Field(default_factory=dict) endstops: dict[str, PersistedEndstopConfig] | None = None - motors_timeout: float = 0.0 + idle_timeout: float = 0.0 + motors_timeout: float | None = None scan_radius_mm: float = Field( default=1.0, gt=0.0, @@ -92,3 +103,19 @@ class ScannerDeviceConfig(BaseModel): ) startup_mode: ScannerStartupMode | str = ScannerStartupMode.STARTUP_ENABLED calibrate_mode: ScannerCalibrateMode | str = ScannerCalibrateMode.CALIBRATE_MANUAL + + @model_validator(mode="before") + @classmethod + def _accept_legacy_timeout_key(cls, data): + if isinstance(data, dict): + if "idle_timeout" not in data and "motors_timeout" in data: + data["idle_timeout"] = data["motors_timeout"] + return data + + @model_validator(mode="after") + def _mirror_timeout_keys(self) -> "ScannerDeviceConfig": + # Persist both names for backward compatibility with existing tooling. + if self.idle_timeout == 0.0 and self.motors_timeout is not None and self.motors_timeout != 0.0: + self.idle_timeout = self.motors_timeout + self.motors_timeout = self.idle_timeout + return self diff --git a/openscan_firmware/routers/next/device.py b/openscan_firmware/routers/next/device.py index a61a50f..d87ec17 100644 --- a/openscan_firmware/routers/next/device.py +++ b/openscan_firmware/routers/next/device.py @@ -37,6 +37,7 @@ class DeviceStatusResponse(BaseModel): motors: dict[str, MotorStatusResponse] lights: dict[str, LightStatusResponse] triggers: dict[str, TriggerStatusResponse] = Field(default_factory=dict) + idle_timeout: float motors_timeout: float scan_radius_mm: float = 1.0 startup_mode: ScannerStartupMode @@ -346,6 +347,31 @@ async def reinitialize_hardware(detect_cameras: bool = False): raise HTTPException(status_code=500, detail=f"Error reloading hardware: {str(e)}") +@router.post("/wakeup", response_model=DeviceControlResponse) +async def wakeup_device(): + """Wake up the device from idle mode. + + If the device is already awake, this endpoint is a no-op and still returns success. + """ + logger.info("Wakeup requested") + try: + was_idle = device.is_idle() + if was_idle: + await device.resume_from_idle() + if device._scanner_device.calibrate_mode == ScannerCalibrateMode.CALIBRATE_ON_WAKE: + await device.recalibrate_motors() + + status = _runtime_status_response() + return DeviceControlResponse( + success=True, + message="Device awakened successfully" if was_idle else "Device already awake", + status=status, + ) + except Exception as e: + logger.exception("Error waking device") + raise HTTPException(status_code=500, detail=f"Error waking device: {str(e)}") + + @router.post("/reboot", response_model=bool) def reboot(save_config: bool = False): """Reboot system and optionally save config. diff --git a/openscan_firmware/routers/v0_8/device.py b/openscan_firmware/routers/v0_8/device.py index bd1dd74..93262f8 100644 --- a/openscan_firmware/routers/v0_8/device.py +++ b/openscan_firmware/routers/v0_8/device.py @@ -255,6 +255,28 @@ async def reinitialize_hardware(detect_cameras: bool = False): raise HTTPException(status_code=500, detail=f"Error reloading hardware: {str(e)}") +@router.post("/wakeup", response_model=DeviceControlResponse) +async def wakeup_device(): + """Wake up the device from idle mode. + + If the device is already awake, this endpoint is a no-op and still returns success. + """ + try: + was_idle = device.is_idle() + if was_idle: + await device.resume_from_idle() + if device._scanner_device.calibrate_mode == ScannerCalibrateMode.CALIBRATE_ON_WAKE: + await device.recalibrate_motors() + + return DeviceControlResponse( + success=True, + message="Device awakened successfully" if was_idle else "Device already awake", + status=_runtime_status_response(), + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error waking device: {str(e)}") + + @router.post("/reboot", response_model=bool) def reboot(save_config: bool = False): """Reboot system and optionally save config. diff --git a/openscan_firmware/routers/v0_9/device.py b/openscan_firmware/routers/v0_9/device.py index 3f14eb5..df38a64 100644 --- a/openscan_firmware/routers/v0_9/device.py +++ b/openscan_firmware/routers/v0_9/device.py @@ -344,6 +344,31 @@ async def reinitialize_hardware(detect_cameras: bool = False): raise HTTPException(status_code=500, detail=f"Error reloading hardware: {str(e)}") +@router.post("/wakeup", response_model=DeviceControlResponse) +async def wakeup_device(): + """Wake up the device from idle mode. + + If the device is already awake, this endpoint is a no-op and still returns success. + """ + logger.info("Wakeup requested") + try: + was_idle = device.is_idle() + if was_idle: + await device.resume_from_idle() + if device._scanner_device.calibrate_mode == ScannerCalibrateMode.CALIBRATE_ON_WAKE: + await device.recalibrate_motors() + + status = _runtime_status_response() + return DeviceControlResponse( + success=True, + message="Device awakened successfully" if was_idle else "Device already awake", + status=status, + ) + except Exception as e: + logger.exception("Error waking device") + raise HTTPException(status_code=500, detail=f"Error waking device: {str(e)}") + + @router.post("/reboot", response_model=bool) def reboot(save_config: bool = False): """Reboot system and optionally save config. diff --git a/pyproject.toml b/pyproject.toml index 2dd09ad..498249e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "openscan-firmware" -version = "0.11.3" +version = "0.11.4" description = "OpenScan3 - Raspberry Pi based photogrammetry scanner (FastAPI-based application)" readme = "README.md" requires-python = ">=3.11" diff --git a/scripts/openapi/openapi_latest.json b/scripts/openapi/openapi_latest.json index db1711d..57e1166 100644 --- a/scripts/openapi/openapi_latest.json +++ b/scripts/openapi/openapi_latest.json @@ -3203,6 +3203,31 @@ } } }, + "/device/wakeup": { + "post": { + "tags": [ + "device" + ], + "summary": "Wakeup Device", + "description": "Wake up the device from idle mode.\n\nIf the device is already awake, this endpoint is a no-op and still returns success.", + "operationId": "wakeup_device", + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/DeviceControlResponse" + } + } + } + }, + "404": { + "description": "Not found" + } + } + } + }, "/device/reboot": { "post": { "tags": [ @@ -5490,6 +5515,30 @@ "title": "Pwm Support", "description": "Indicates whether this light hardware can handle PWM (otherwise only on/off).", "default": false + }, + "pwm_frequency": { + "type": "number", + "maximum": 100000.0, + "minimum": 50.0, + "title": "Pwm Frequency", + "description": "PWM frequency for led driver.", + "default": 10000.0 + }, + "pwm_min": { + "type": "number", + "maximum": 3.3, + "minimum": 0.0, + "title": "Pwm Min", + "description": "Minimum pwm voltage for led driver.", + "default": 0.0 + }, + "pwm_max": { + "type": "number", + "maximum": 3.3, + "minimum": 0.0, + "title": "Pwm Max", + "description": "Maximum pwm voltage for led driver.", + "default": 3.3 } }, "type": "object", @@ -6036,7 +6085,8 @@ } ], "title": "Min Phi", - "description": "Optional minimum phi angle in degrees for constrained paths." + "description": "Optional minimum phi angle in degrees for constrained paths.", + "default": 0 }, "max_phi": { "anyOf": [ @@ -6050,7 +6100,8 @@ } ], "title": "Max Phi", - "description": "Optional maximum phi angle in degrees for constrained paths." + "description": "Optional maximum phi angle in degrees for constrained paths.", + "default": 360.0 }, "optimize_path": { "type": "boolean", @@ -6186,11 +6237,22 @@ ], "title": "Endstops" }, - "motors_timeout": { + "idle_timeout": { "type": "number", - "title": "Motors Timeout", + "title": "Idle Timeout", "default": 0.0 }, + "motors_timeout": { + "anyOf": [ + { + "type": "number" + }, + { + "type": "null" + } + ], + "title": "Motors Timeout" + }, "scan_radius_mm": { "type": "number", "exclusiveMinimum": 0.0, diff --git a/scripts/openapi/openapi_next.json b/scripts/openapi/openapi_next.json index 4f70065..9a7c0a1 100644 --- a/scripts/openapi/openapi_next.json +++ b/scripts/openapi/openapi_next.json @@ -1256,6 +1256,64 @@ } } }, + "/lights/{light_name}/intensity": { + "put": { + "tags": [ + "lights" + ], + "summary": "Pwm Light", + "description": "Set light intensity\n\nArgs:\n light_name: The name of the light to toggle\n value: intensity of light, from 0% to 100%\n\nReturns:\n LightStatusResponse: A response object containing the status of the light after the toggle operation", + "operationId": "pwm_light", + "parameters": [ + { + "name": "light_name", + "in": "path", + "required": true, + "schema": { + "type": "string", + "title": "Light Name" + } + }, + { + "name": "value", + "in": "query", + "required": false, + "schema": { + "type": "number", + "description": "sets light intensity, from 0 to 100%", + "default": 100, + "title": "Value" + }, + "description": "sets light intensity, from 0 to 100%" + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/LightStatusResponse" + } + } + } + }, + "404": { + "description": "Not found" + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, "/lights/{name}/settings": { "get": { "tags": [ @@ -3024,6 +3082,31 @@ } } }, + "/device/wakeup": { + "post": { + "tags": [ + "device" + ], + "summary": "Wakeup Device", + "description": "Wake up the device from idle mode.\n\nIf the device is already awake, this endpoint is a no-op and still returns success.", + "operationId": "wakeup_device", + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/DeviceControlResponse" + } + } + } + }, + "404": { + "description": "Not found" + } + } + } + }, "/device/reboot": { "post": { "tags": [ @@ -5818,6 +5901,10 @@ "type": "object", "title": "Triggers" }, + "idle_timeout": { + "type": "number", + "title": "Idle Timeout" + }, "motors_timeout": { "type": "number", "title": "Motors Timeout" @@ -5844,6 +5931,7 @@ "cameras", "motors", "lights", + "idle_timeout", "motors_timeout", "startup_mode", "calibrate_mode", @@ -6280,6 +6368,30 @@ "title": "Pwm Support", "description": "Indicates whether this light hardware can handle PWM (otherwise only on/off).", "default": false + }, + "pwm_frequency": { + "type": "number", + "maximum": 100000.0, + "minimum": 50.0, + "title": "Pwm Frequency", + "description": "PWM frequency for led driver.", + "default": 10000.0 + }, + "pwm_min": { + "type": "number", + "maximum": 3.3, + "minimum": 0.0, + "title": "Pwm Min", + "description": "Minimum pwm voltage for led driver.", + "default": 0.0 + }, + "pwm_max": { + "type": "number", + "maximum": 3.3, + "minimum": 0.0, + "title": "Pwm Max", + "description": "Maximum pwm voltage for led driver.", + "default": 3.3 } }, "type": "object", @@ -6295,6 +6407,10 @@ "type": "boolean", "title": "Is On" }, + "value": { + "type": "number", + "title": "Value" + }, "settings": { "$ref": "#/components/schemas/LightConfig" } @@ -6303,6 +6419,7 @@ "required": [ "name", "is_on", + "value", "settings" ], "title": "LightStatusResponse" @@ -6826,7 +6943,8 @@ } ], "title": "Min Phi", - "description": "Optional minimum phi angle in degrees for constrained paths." + "description": "Optional minimum phi angle in degrees for constrained paths.", + "default": 0 }, "max_phi": { "anyOf": [ @@ -6840,7 +6958,8 @@ } ], "title": "Max Phi", - "description": "Optional maximum phi angle in degrees for constrained paths." + "description": "Optional maximum phi angle in degrees for constrained paths.", + "default": 360.0 }, "optimize_path": { "type": "boolean", @@ -6976,11 +7095,22 @@ ], "title": "Endstops" }, - "motors_timeout": { + "idle_timeout": { "type": "number", - "title": "Motors Timeout", + "title": "Idle Timeout", "default": 0.0 }, + "motors_timeout": { + "anyOf": [ + { + "type": "number" + }, + { + "type": "null" + } + ], + "title": "Motors Timeout" + }, "scan_radius_mm": { "type": "number", "exclusiveMinimum": 0.0, diff --git a/scripts/openapi/openapi_v0.8.json b/scripts/openapi/openapi_v0.8.json index 386e214..8afa6ee 100644 --- a/scripts/openapi/openapi_v0.8.json +++ b/scripts/openapi/openapi_v0.8.json @@ -2932,6 +2932,31 @@ } } }, + "/device/wakeup": { + "post": { + "tags": [ + "device" + ], + "summary": "Wakeup Device", + "description": "Wake up the device from idle mode.\n\nIf the device is already awake, this endpoint is a no-op and still returns success.", + "operationId": "wakeup_device", + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/DeviceControlResponse" + } + } + } + }, + "404": { + "description": "Not found" + } + } + } + }, "/device/reboot": { "post": { "tags": [ @@ -5050,6 +5075,30 @@ "title": "Pwm Support", "description": "Indicates whether this light hardware can handle PWM (otherwise only on/off).", "default": false + }, + "pwm_frequency": { + "type": "number", + "maximum": 100000.0, + "minimum": 50.0, + "title": "Pwm Frequency", + "description": "PWM frequency for led driver.", + "default": 10000.0 + }, + "pwm_min": { + "type": "number", + "maximum": 3.3, + "minimum": 0.0, + "title": "Pwm Min", + "description": "Minimum pwm voltage for led driver.", + "default": 0.0 + }, + "pwm_max": { + "type": "number", + "maximum": 3.3, + "minimum": 0.0, + "title": "Pwm Max", + "description": "Maximum pwm voltage for led driver.", + "default": 3.3 } }, "type": "object", @@ -5587,7 +5636,8 @@ } ], "title": "Min Phi", - "description": "Optional minimum phi angle in degrees for constrained paths." + "description": "Optional minimum phi angle in degrees for constrained paths.", + "default": 0 }, "max_phi": { "anyOf": [ @@ -5601,7 +5651,8 @@ } ], "title": "Max Phi", - "description": "Optional maximum phi angle in degrees for constrained paths." + "description": "Optional maximum phi angle in degrees for constrained paths.", + "default": 360.0 }, "optimize_path": { "type": "boolean", @@ -5735,6 +5786,11 @@ ], "title": "Endstops" }, + "idle_timeout": { + "type": "number", + "title": "Idle Timeout", + "default": 0.0 + }, "motors_timeout": { "type": "number", "title": "Motors Timeout", diff --git a/scripts/openapi/openapi_v0.9.json b/scripts/openapi/openapi_v0.9.json index db1711d..57e1166 100644 --- a/scripts/openapi/openapi_v0.9.json +++ b/scripts/openapi/openapi_v0.9.json @@ -3203,6 +3203,31 @@ } } }, + "/device/wakeup": { + "post": { + "tags": [ + "device" + ], + "summary": "Wakeup Device", + "description": "Wake up the device from idle mode.\n\nIf the device is already awake, this endpoint is a no-op and still returns success.", + "operationId": "wakeup_device", + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/DeviceControlResponse" + } + } + } + }, + "404": { + "description": "Not found" + } + } + } + }, "/device/reboot": { "post": { "tags": [ @@ -5490,6 +5515,30 @@ "title": "Pwm Support", "description": "Indicates whether this light hardware can handle PWM (otherwise only on/off).", "default": false + }, + "pwm_frequency": { + "type": "number", + "maximum": 100000.0, + "minimum": 50.0, + "title": "Pwm Frequency", + "description": "PWM frequency for led driver.", + "default": 10000.0 + }, + "pwm_min": { + "type": "number", + "maximum": 3.3, + "minimum": 0.0, + "title": "Pwm Min", + "description": "Minimum pwm voltage for led driver.", + "default": 0.0 + }, + "pwm_max": { + "type": "number", + "maximum": 3.3, + "minimum": 0.0, + "title": "Pwm Max", + "description": "Maximum pwm voltage for led driver.", + "default": 3.3 } }, "type": "object", @@ -6036,7 +6085,8 @@ } ], "title": "Min Phi", - "description": "Optional minimum phi angle in degrees for constrained paths." + "description": "Optional minimum phi angle in degrees for constrained paths.", + "default": 0 }, "max_phi": { "anyOf": [ @@ -6050,7 +6100,8 @@ } ], "title": "Max Phi", - "description": "Optional maximum phi angle in degrees for constrained paths." + "description": "Optional maximum phi angle in degrees for constrained paths.", + "default": 360.0 }, "optimize_path": { "type": "boolean", @@ -6186,11 +6237,22 @@ ], "title": "Endstops" }, - "motors_timeout": { + "idle_timeout": { "type": "number", - "title": "Motors Timeout", + "title": "Idle Timeout", "default": 0.0 }, + "motors_timeout": { + "anyOf": [ + { + "type": "number" + }, + { + "type": "null" + } + ], + "title": "Motors Timeout" + }, "scan_radius_mm": { "type": "number", "exclusiveMinimum": 0.0, diff --git a/tests/controllers/services/test_scan_task.py b/tests/controllers/services/test_scan_task.py index e73d120..ed02196 100644 --- a/tests/controllers/services/test_scan_task.py +++ b/tests/controllers/services/test_scan_task.py @@ -202,6 +202,8 @@ def test_generate_scan_path_omits_optional_phi_constraints_when_unset() -> None: points=10, min_theta=10.0, max_theta=120.0, + min_phi=None, + max_phi=None, optimize_path=False, focus_stacks=1, focus_range=(10.0, 15.0), diff --git a/tests/routers/test_device_router.py b/tests/routers/test_device_router.py index 8498e92..6b8333c 100644 --- a/tests/routers/test_device_router.py +++ b/tests/routers/test_device_router.py @@ -75,6 +75,7 @@ async def fake_set_device_config(path: str): "cameras": {}, "motors": {}, "lights": {}, + "idle_timeout": 0.0, "motors_timeout": 0.0, "startup_mode": "startup_enabled", "calibrate_mode": "calibrate_manual", @@ -191,6 +192,7 @@ def test_config_roundtrip_flow(monkeypatch, tmp_path, device_client, device_rout "cameras": {}, "motors": {}, "lights": {}, + "idle_timeout": 0.0, "motors_timeout": 0.0, "startup_mode": "startup_enabled", "calibrate_mode": "calibrate_manual", @@ -281,9 +283,11 @@ def test_reinitialize_endpoint_calls_controller(monkeypatch, device_client, devi "ring": { "name": "ring", "is_on": False, + "value": 100.0, "settings": light_settings, } }, + "idle_timeout": 0.0, "motors_timeout": 0.0, "startup_mode": "startup_enabled", "calibrate_mode": "calibrate_manual", @@ -309,3 +313,50 @@ async def fake_initialize(*, detect_cameras: bool = False): assert payload["status"]["initialized"] is True assert set(payload["status"]["motors"].keys()) == {"rotor"} assert set(payload["status"]["lights"].keys()) == {"ring"} + + +def test_wakeup_endpoint_resumes_idle_device(monkeypatch, device_client, device_router_path): + module_path = device_router_path("device") + + status_payload = { + "name": "Preset", + "model": "mini", + "shield": "greenshield", + "cameras": {}, + "motors": {}, + "lights": {}, + "triggers": {}, + "idle_timeout": 0.0, + "motors_timeout": 0.0, + "scan_radius_mm": 1.0, + "startup_mode": "startup_enabled", + "calibrate_mode": "calibrate_manual", + "initialized": True, + } + monkeypatch.setattr(f"{module_path}.device.get_device_info", lambda: status_payload, raising=False) + + wake_calls = {"resume": 0, "recalibrate": 0} + + monkeypatch.setattr(f"{module_path}.device.is_idle", lambda: True, raising=False) + + async def fake_resume(): + wake_calls["resume"] += 1 + + async def fake_recalibrate(): + wake_calls["recalibrate"] += 1 + + monkeypatch.setattr(f"{module_path}.device.resume_from_idle", fake_resume, raising=False) + monkeypatch.setattr(f"{module_path}.device.recalibrate_motors", fake_recalibrate, raising=False) + + class _ScannerDevice: + calibrate_mode = "calibrate_manual" + + monkeypatch.setattr(f"{module_path}.device._scanner_device", _ScannerDevice(), raising=False) + + response = device_client.post("/latest/device/wakeup") + assert response.status_code == 200 + payload = response.json() + assert payload["success"] is True + assert payload["message"] == "Device awakened successfully" + assert wake_calls["resume"] == 1 + assert wake_calls["recalibrate"] == 0 diff --git a/tests/routers/test_device_router_v0_8.py b/tests/routers/test_device_router_v0_8.py index 1d73265..f659e65 100644 --- a/tests/routers/test_device_router_v0_8.py +++ b/tests/routers/test_device_router_v0_8.py @@ -138,6 +138,59 @@ async def fake_initialize(*, detect_cameras: bool = False): assert payload["message"] == "Hardware reinitialized successfully" +def test_v08_wakeup_endpoint_resumes_idle_device(monkeypatch, device_client_v08, device_router_path_v08): + module_path = device_router_path_v08("device") + + monkeypatch.setattr( + f"{module_path}.device.get_device_info", + lambda: { + "name": "Preset", + "model": "mini", + "shield": "greenshield", + "cameras": {}, + "motors": {}, + "lights": {}, + "motors_timeout": 0.0, + "startup_mode": "startup_enabled", + "calibrate_mode": "calibrate_manual", + "initialized": True, + }, + raising=False, + ) + + class _PassthroughStatus: + @staticmethod + def model_validate(payload): + return payload + + monkeypatch.setattr(f"{module_path}.DeviceStatusResponse", _PassthroughStatus, raising=False) + monkeypatch.setattr(f"{module_path}.device.is_idle", lambda: True, raising=False) + + wake_calls = {"resume": 0} + + async def fake_resume(): + wake_calls["resume"] += 1 + + async def fake_recalibrate(): + raise AssertionError("recalibrate_motors should not be called in this scenario") + + monkeypatch.setattr(f"{module_path}.device.resume_from_idle", fake_resume, raising=False) + monkeypatch.setattr(f"{module_path}.device.recalibrate_motors", fake_recalibrate, raising=False) + + class _ScannerDevice: + calibrate_mode = "calibrate_manual" + + monkeypatch.setattr(f"{module_path}.device._scanner_device", _ScannerDevice(), raising=False) + + response = device_client_v08.post("/v0.8/device/wakeup") + assert response.status_code == 200 + + payload = response.json() + assert payload["success"] is True + assert payload["message"] == "Device awakened successfully" + assert wake_calls["resume"] == 1 + + def test_v08_add_config_json_rejects_persisted_shape(device_client_v08): repo_root = Path(__file__).resolve().parents[2] default_config = repo_root / "settings" / "device" / "default_mini_greenshield.json" diff --git a/tests/routers/test_device_router_v0_9.py b/tests/routers/test_device_router_v0_9.py new file mode 100644 index 0000000..0277e65 --- /dev/null +++ b/tests/routers/test_device_router_v0_9.py @@ -0,0 +1,81 @@ +"""Baseline integration-style tests for the v0_9 device router contract.""" + +from __future__ import annotations + +from importlib import import_module +from typing import Callable + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + + +def _v09_router_module_path(name: str) -> str: + return f"openscan_firmware.routers.v0_9.{name}" + + +@pytest.fixture +def device_client_v09() -> TestClient: + app = FastAPI() + device_router = import_module(_v09_router_module_path("device")) + app.include_router(device_router.router, prefix="/v0.9") + with TestClient(app) as client: + yield client + + +@pytest.fixture +def device_router_path_v09() -> Callable[[str], str]: + return _v09_router_module_path + + +def test_v09_wakeup_endpoint_resumes_idle_device(monkeypatch, device_client_v09, device_router_path_v09): + module_path = device_router_path_v09("device") + + monkeypatch.setattr( + f"{module_path}.device.get_device_info", + lambda: { + "name": "Preset", + "model": "mini", + "shield": "greenshield", + "cameras": {}, + "motors": {}, + "lights": {}, + "motors_timeout": 0.0, + "startup_mode": "startup_enabled", + "calibrate_mode": "calibrate_manual", + "initialized": True, + }, + raising=False, + ) + + class _PassthroughStatus: + @staticmethod + def model_validate(payload): + return payload + + monkeypatch.setattr(f"{module_path}.DeviceStatusResponse", _PassthroughStatus, raising=False) + monkeypatch.setattr(f"{module_path}.device.is_idle", lambda: True, raising=False) + + wake_calls = {"resume": 0} + + async def fake_resume(): + wake_calls["resume"] += 1 + + async def fake_recalibrate(): + raise AssertionError("recalibrate_motors should not be called in this scenario") + + monkeypatch.setattr(f"{module_path}.device.resume_from_idle", fake_resume, raising=False) + monkeypatch.setattr(f"{module_path}.device.recalibrate_motors", fake_recalibrate, raising=False) + + class _ScannerDevice: + calibrate_mode = "calibrate_manual" + + monkeypatch.setattr(f"{module_path}.device._scanner_device", _ScannerDevice(), raising=False) + + response = device_client_v09.post("/v0.9/device/wakeup") + assert response.status_code == 200 + + payload = response.json() + assert payload["success"] is True + assert payload["message"] == "Device awakened successfully" + assert wake_calls["resume"] == 1