diff --git a/src/aignostics/application/_cli.py b/src/aignostics/application/_cli.py index 9ee52785..84e8a5a0 100644 --- a/src/aignostics/application/_cli.py +++ b/src/aignostics/application/_cli.py @@ -1213,10 +1213,22 @@ def run_update_metadata( metadata_json: Annotated[ str, typer.Argument(..., help='Custom metadata as JSON string (e.g., \'{"key": "value"}\')') ], + checksum: Annotated[ + str | None, + typer.Option( + "--checksum", + help=( + "Optional checksum for optimistic concurrency control. " + "The server rejects the update with HTTP 412 if the metadata was modified since this checksum was read." + ), + ), + ] = None, ) -> None: """Update custom metadata for a run.""" import json # noqa: PLC0415 + from aignostics.platform import ConcurrencyConflictError # noqa: PLC0415 + logger.trace("Updating custom metadata for run with ID '{}'", run_id) try: @@ -1230,13 +1242,17 @@ def run_update_metadata( console.print(f"[error]Error:[/error] Invalid JSON: {e}") sys.exit(1) - Service().application_run_update_custom_metadata(run_id, custom_metadata) + Service().application_run_update_custom_metadata(run_id, custom_metadata, custom_metadata_checksum=checksum) logger.debug("Updated custom metadata for run with ID '{}'.", run_id) console.print(f"Successfully updated custom metadata for run with ID '{run_id}'.") except NotFoundException: logger.warning(f"Run with ID '{run_id}' not found.") console.print(f"[warning]Warning:[/warning] Run with ID '{run_id}' not found.") sys.exit(2) + except ConcurrencyConflictError as e: + logger.warning(f"Concurrency conflict updating metadata for run '{run_id}': {e}") + console.print(f"[warning]Warning:[/warning] Metadata was modified by another process. Re-read and retry: {e}") + sys.exit(3) except ValueError as e: logger.warning(f"Run ID '{run_id}' invalid or metadata invalid: {e}") console.print(f"[warning]Warning:[/warning] Run ID '{run_id}' invalid or metadata invalid: {e}") @@ -1254,10 +1270,22 @@ def run_update_item_metadata( metadata_json: Annotated[ str, typer.Argument(..., help='Custom metadata as JSON string (e.g., \'{"key": "value"}\')') ], + checksum: Annotated[ + str | None, + typer.Option( + "--checksum", + help=( + "Optional checksum for optimistic concurrency control. " + "The server rejects the update with HTTP 412 if the metadata was modified since this checksum was read." + ), + ), + ] = None, ) -> None: """Update custom metadata for an item in a run.""" import json # noqa: PLC0415 + from aignostics.platform import ConcurrencyConflictError # noqa: PLC0415 + logger.trace("Updating custom metadata for item '{}' in run with ID '{}'", external_id, run_id) try: @@ -1271,13 +1299,19 @@ def run_update_item_metadata( console.print(f"[error]Error:[/error] Invalid JSON: {e}") sys.exit(1) - Service().application_run_update_item_custom_metadata(run_id, external_id, custom_metadata) + Service().application_run_update_item_custom_metadata( + run_id, external_id, custom_metadata, custom_metadata_checksum=checksum + ) logger.debug("Updated custom metadata for item '{}' in run with ID '{}'.", external_id, run_id) console.print(f"Successfully updated custom metadata for item '{external_id}' in run with ID '{run_id}'.") except NotFoundException: logger.warning(f"Run with ID '{run_id}' or item '{external_id}' not found.") console.print(f"[warning]Warning:[/warning] Run with ID '{run_id}' or item '{external_id}' not found.") sys.exit(2) + except ConcurrencyConflictError as e: + logger.warning("Concurrency conflict updating metadata for item '{}' in run '{}': {}", external_id, run_id, e) + console.print(f"[warning]Warning:[/warning] Metadata was modified by another process. Re-read and retry: {e}") + sys.exit(3) except ValueError as e: logger.warning( "Run ID '{}' or item external ID '{}' invalid or metadata invalid: {}", diff --git a/src/aignostics/application/_gui/_page_application_run_describe.py b/src/aignostics/application/_gui/_page_application_run_describe.py index 926aa767..fae8726f 100644 --- a/src/aignostics/application/_gui/_page_application_run_describe.py +++ b/src/aignostics/application/_gui/_page_application_run_describe.py @@ -19,7 +19,15 @@ ) from nicegui import run as nicegui_run -from aignostics.platform import ArtifactOutput, ItemOutput, ItemResult, ItemState, Run, RunState +from aignostics.platform import ( + ArtifactOutput, + ConcurrencyConflictError, + ItemOutput, + ItemResult, + ItemState, + Run, + RunState, +) from aignostics.third_party.showinfm.showinfm import show_in_file_manager from aignostics.utils import GUILocalFilePicker, get_user_data_directory @@ -747,6 +755,11 @@ async def handle_metadata_change(e: Any) -> None: # noqa: ANN401 ) ui.notify("Custom metadata updated successfully!", type="positive") ui.navigate.reload() + except ConcurrencyConflictError: + ui.notify( + "Metadata was modified by another process — reload the page and retry.", + type="warning", + ) except Exception as ex: ui.notify(f"Failed to update custom metadata: {ex!s}", type="negative") diff --git a/src/aignostics/application/_service.py b/src/aignostics/application/_service.py index 315c674f..0a2423a2 100644 --- a/src/aignostics/application/_service.py +++ b/src/aignostics/application/_service.py @@ -23,6 +23,7 @@ ApplicationSummary, ApplicationVersion, Client, + ConcurrencyConflictError, ForbiddenException, InputArtifact, InputItem, @@ -1120,21 +1121,31 @@ def application_run_update_custom_metadata( self, run_id: str, custom_metadata: dict[str, Any], + *, + custom_metadata_checksum: str | None = None, ) -> None: """Update custom metadata for an existing application run. Args: run_id (str): The ID of the run to update custom_metadata (dict[str, Any]): The new custom metadata to attach to the run. + custom_metadata_checksum (str | None): Optional checksum for optimistic concurrency + control. When provided, the server returns HTTP 412 (and rejects the update) if + the metadata was modified since the checksum was read. Pass ``None`` to skip + the precondition check. Raises: NotFoundException: If the application run with the given ID is not found. + ConcurrencyConflictError: If the checksum precondition failed (HTTP 412). ValueError: If the run ID is invalid. RuntimeError: If updating the run metadata fails unexpectedly. """ try: logger.trace("Updating custom metadata for run with ID '{}'", run_id) - self._get_platform_client().run(run_id).update_custom_metadata(custom_metadata) + self._get_platform_client().run(run_id).update_custom_metadata( + custom_metadata, + custom_metadata_checksum=custom_metadata_checksum, + ) logger.trace("Updated custom metadata for run with ID '{}'", run_id) except ValueError as e: message = f"Failed to update custom metadata for run with ID '{run_id}': ValueError {e}" @@ -1145,6 +1156,13 @@ def application_run_update_custom_metadata( logger.warning(message) raise NotFoundException(message) from e except ApiException as e: + if e.status == HTTPStatus.PRECONDITION_FAILED: + message = ( + f"Custom metadata for run '{run_id}' was modified since the checksum was read " + f"(optimistic concurrency conflict): {e!s}." + ) + logger.warning(message) + raise ConcurrencyConflictError(message) from e if e.status == HTTPStatus.UNPROCESSABLE_ENTITY: message = f"Run ID '{run_id}' invalid: {e!s}." logger.warning(message) @@ -1161,25 +1179,36 @@ def application_run_update_custom_metadata( def application_run_update_custom_metadata_static( run_id: str, custom_metadata: dict[str, Any], + *, + custom_metadata_checksum: str | None = None, ) -> None: """Static wrapper for updating custom metadata for an application run. Args: run_id (str): The ID of the run to update custom_metadata (dict[str, Any]): The new custom metadata to attach to the run. + custom_metadata_checksum (str | None): Optional checksum for optimistic concurrency + control. When provided, the server returns HTTP 412 (and rejects the update) if + the metadata was modified since the checksum was read. Pass ``None`` to skip + the precondition check. Raises: NotFoundException: If the application run with the given ID is not found. + ConcurrencyConflictError: If the checksum precondition failed (HTTP 412). ValueError: If the run ID is invalid. RuntimeError: If updating the run metadata fails unexpectedly. """ - Service().application_run_update_custom_metadata(run_id, custom_metadata) + Service().application_run_update_custom_metadata( + run_id, custom_metadata, custom_metadata_checksum=custom_metadata_checksum + ) def application_run_update_item_custom_metadata( self, run_id: str, external_id: str, custom_metadata: dict[str, Any], + *, + custom_metadata_checksum: str | None = None, ) -> None: """Update custom metadata for an existing item in an application run. @@ -1187,9 +1216,14 @@ def application_run_update_item_custom_metadata( run_id (str): The ID of the run containing the item external_id (str): The external ID of the item to update custom_metadata (dict[str, Any]): The new custom metadata to attach to the item. + custom_metadata_checksum (str | None): Optional checksum for optimistic concurrency + control. When provided, the server returns HTTP 412 (and rejects the update) if + the metadata was modified since the checksum was read. Pass ``None`` to skip + the precondition check. Raises: NotFoundException: If the application run or item with the given IDs is not found. + ConcurrencyConflictError: If the checksum precondition failed (HTTP 412). ValueError: If the run ID or item external ID is invalid. RuntimeError: If updating the item metadata fails unexpectedly. """ @@ -1202,6 +1236,7 @@ def application_run_update_item_custom_metadata( self._get_platform_client().run(run_id).update_item_custom_metadata( external_id, custom_metadata, + custom_metadata_checksum=custom_metadata_checksum, ) logger.trace( "Updated custom metadata for item '{}' in run with ID '{}'", @@ -1219,6 +1254,13 @@ def application_run_update_item_custom_metadata( logger.warning(message) raise NotFoundException(message) from e except ApiException as e: + if e.status == HTTPStatus.PRECONDITION_FAILED: + message = ( + f"Custom metadata for item '{external_id}' in run '{run_id}' was modified since " + f"the checksum was read (optimistic concurrency conflict): {e!s}." + ) + logger.warning(message) + raise ConcurrencyConflictError(message) from e if e.status == HTTPStatus.UNPROCESSABLE_ENTITY: message = f"Run ID '{run_id}' or item external ID '{external_id}' invalid: {e!s}." logger.warning(message) @@ -1236,6 +1278,8 @@ def application_run_update_item_custom_metadata_static( run_id: str, external_id: str, custom_metadata: dict[str, Any], + *, + custom_metadata_checksum: str | None = None, ) -> None: """Static wrapper for updating custom metadata for an item in an application run. @@ -1243,13 +1287,20 @@ def application_run_update_item_custom_metadata_static( run_id (str): The ID of the run containing the item external_id (str): The external ID of the item to update custom_metadata (dict[str, Any]): The new custom metadata to attach to the item. + custom_metadata_checksum (str | None): Optional checksum for optimistic concurrency + control. When provided, the server returns HTTP 412 (and rejects the update) if + the metadata was modified since the checksum was read. Pass ``None`` to skip + the precondition check. Raises: NotFoundException: If the application run or item with the given IDs is not found. + ConcurrencyConflictError: If the checksum precondition failed (HTTP 412). ValueError: If the run ID or item external ID is invalid. RuntimeError: If updating the item metadata fails unexpectedly. """ - Service().application_run_update_item_custom_metadata(run_id, external_id, custom_metadata) + Service().application_run_update_item_custom_metadata( + run_id, external_id, custom_metadata, custom_metadata_checksum=custom_metadata_checksum + ) def application_run_cancel(self, run_id: str) -> None: """Cancel a run by its ID. diff --git a/src/aignostics/platform/__init__.py b/src/aignostics/platform/__init__.py index fea6f14f..5de50f2e 100644 --- a/src/aignostics/platform/__init__.py +++ b/src/aignostics/platform/__init__.py @@ -81,6 +81,7 @@ TOKEN_URL_STAGING, TOKEN_URL_TEST, ) +from ._exceptions import ConcurrencyConflictError from ._messages import AUTHENTICATION_FAILED, NOT_YET_IMPLEMENTED, UNKNOWN_ENDPOINT_URL from ._sdk_metadata import ( PipelineConfig, @@ -155,6 +156,7 @@ "Artifact", "ArtifactOutput", "Client", + "ConcurrencyConflictError", "Documents", "ForbiddenException", "InputArtifact", diff --git a/src/aignostics/platform/_exceptions.py b/src/aignostics/platform/_exceptions.py new file mode 100644 index 00000000..abab5bbf --- /dev/null +++ b/src/aignostics/platform/_exceptions.py @@ -0,0 +1,10 @@ +"""Exceptions of platform module.""" + + +class ConcurrencyConflictError(ValueError): + """Raised when an optimistic concurrency precondition (HTTP 412) fails. + + Subclasses ValueError so existing ``except ValueError`` callers still catch it, + while callers that need to distinguish a conflict from a bad-ID error can use + ``except ConcurrencyConflictError``. + """ diff --git a/src/aignostics/platform/resources/runs.py b/src/aignostics/platform/resources/runs.py index 7a861c4b..a95623ad 100644 --- a/src/aignostics/platform/resources/runs.py +++ b/src/aignostics/platform/resources/runs.py @@ -596,11 +596,17 @@ def ensure_artifacts_downloaded( def update_custom_metadata( self, custom_metadata: dict[str, Any], + *, + custom_metadata_checksum: str | None = None, ) -> None: """Update custom metadata for this application run. Args: custom_metadata (dict[str, Any]): The new custom metadata to attach to the run. + custom_metadata_checksum (str | None): Optional checksum for optimistic concurrency + control. When provided, the server returns HTTP 412 (and rejects the update) if + the metadata was modified since the checksum was read. Pass ``None`` to skip + the precondition check. Raises: Exception: If the API request fails. @@ -615,7 +621,8 @@ def update_custom_metadata( self._api.put_run_custom_metadata_v1_runs_run_id_custom_metadata_put( self.run_id, custom_metadata_update_request=CustomMetadataUpdateRequest( - custom_metadata=cast("dict[str, Any]", convert_to_json_serializable(custom_metadata)) + custom_metadata=cast("dict[str, Any]", convert_to_json_serializable(custom_metadata)), + custom_metadata_checksum=custom_metadata_checksum, ), _request_timeout=settings().run_submit_timeout, _headers={"User-Agent": user_agent()}, @@ -626,12 +633,18 @@ def update_item_custom_metadata( self, external_id: str, custom_metadata: dict[str, Any], + *, + custom_metadata_checksum: str | None = None, ) -> None: """Update custom metadata for an item in this application run. Args: external_id (str): The external ID of the item. custom_metadata (dict[str, Any]): The new custom metadata to attach to the item. + custom_metadata_checksum (str | None): Optional checksum for optimistic concurrency + control. When provided, the server returns HTTP 412 (and rejects the update) if + the metadata was modified since the checksum was read. Pass ``None`` to skip + the precondition check. Raises: Exception: If the API request fails. @@ -647,7 +660,8 @@ def update_item_custom_metadata( self.run_id, external_id, custom_metadata_update_request=CustomMetadataUpdateRequest( - custom_metadata=cast("dict[str, Any]", convert_to_json_serializable(custom_metadata)) + custom_metadata=cast("dict[str, Any]", convert_to_json_serializable(custom_metadata)), + custom_metadata_checksum=custom_metadata_checksum, ), _request_timeout=settings().run_submit_timeout, _headers={"User-Agent": user_agent()}, diff --git a/tests/aignostics/application/cli_test.py b/tests/aignostics/application/cli_test.py index 4c5b9aeb..8544536e 100644 --- a/tests/aignostics/application/cli_test.py +++ b/tests/aignostics/application/cli_test.py @@ -53,6 +53,8 @@ DOCUMENT_MODEL_CARD_PDF = "model_card.pdf" DOCUMENT_MISSING_PDF = "missing.pdf" APPLICATION_CLI_CLIENT_PATCH_TARGET = "aignostics.application._cli.Client" +APPLICATION_CLI_SERVICE_PATCH_TARGET = "aignostics.application._cli.Service" +_TEST_METADATA_JSON = '{"key": "value"}' # Stub values reused across the document CLI tests. DOCUMENT_TEST_FAILURE_MESSAGE = "kaboom" # canonical exception body for unexpected-failure paths @@ -949,7 +951,7 @@ def test_cli_run_describe_json_includes_items(runner: CliRunner) -> None: with ( patch("aignostics.application._cli.PlatformService.get_user_info", return_value=mock_user_info), - patch("aignostics.application._cli.Service") as mock_service_cls, + patch(APPLICATION_CLI_SERVICE_PATCH_TARGET) as mock_service_cls, ): mock_service_cls.return_value.application_run.return_value = mock_run_handle @@ -1211,6 +1213,96 @@ def test_cli_run_update_item_metadata_not_dict(runner: CliRunner) -> None: assert "Metadata must be a JSON object" in result.output +@pytest.mark.unit +def test_cli_run_update_metadata_success_with_checksum(runner: CliRunner) -> None: + """Check run update-metadata command succeeds and forwards --checksum to the service.""" + with patch(APPLICATION_CLI_SERVICE_PATCH_TARGET) as mock_service_cls: + result = runner.invoke( + cli, + [ + "application", + "run", + "update-metadata", + "run-123", + _TEST_METADATA_JSON, + "--checksum", + "abc123", + ], + ) + assert result.exit_code == 0 + assert "Successfully updated" in result.output + mock_service_cls.return_value.application_run_update_custom_metadata.assert_called_once_with( + "run-123", {"key": "value"}, custom_metadata_checksum="abc123" + ) + + +@pytest.mark.unit +def test_cli_run_update_metadata_concurrency_conflict(runner: CliRunner) -> None: + """Check run update-metadata exits 3 with a clear message on ConcurrencyConflictError.""" + from aignostics.platform import ConcurrencyConflictError + + with patch(APPLICATION_CLI_SERVICE_PATCH_TARGET) as mock_service_cls: + mock_service_cls.return_value.application_run_update_custom_metadata.side_effect = ConcurrencyConflictError( + "stale checksum" + ) + result = runner.invoke( + cli, + ["application", "run", "update-metadata", "run-123", _TEST_METADATA_JSON, "--checksum", "old"], + ) + assert result.exit_code == 3 + assert "modified by another process" in result.output + + +@pytest.mark.unit +def test_cli_run_update_item_metadata_success_with_checksum(runner: CliRunner) -> None: + """Check run update-item-metadata command succeeds and forwards --checksum to the service.""" + with patch(APPLICATION_CLI_SERVICE_PATCH_TARGET) as mock_service_cls: + result = runner.invoke( + cli, + [ + "application", + "run", + "update-item-metadata", + "run-123", + "item-ext-id", + _TEST_METADATA_JSON, + "--checksum", + "abc123", + ], + ) + assert result.exit_code == 0 + assert "Successfully updated" in result.output + mock_service_cls.return_value.application_run_update_item_custom_metadata.assert_called_once_with( + "run-123", "item-ext-id", {"key": "value"}, custom_metadata_checksum="abc123" + ) + + +@pytest.mark.unit +def test_cli_run_update_item_metadata_concurrency_conflict(runner: CliRunner) -> None: + """Check run update-item-metadata exits 3 with a clear message on ConcurrencyConflictError.""" + from aignostics.platform import ConcurrencyConflictError + + with patch(APPLICATION_CLI_SERVICE_PATCH_TARGET) as mock_service_cls: + mock_service_cls.return_value.application_run_update_item_custom_metadata.side_effect = ( + ConcurrencyConflictError("stale checksum") + ) + result = runner.invoke( + cli, + [ + "application", + "run", + "update-item-metadata", + "run-123", + "item-ext-id", + _TEST_METADATA_JSON, + "--checksum", + "old", + ], + ) + assert result.exit_code == 3 + assert "modified by another process" in result.output + + @pytest.mark.e2e @pytest.mark.timeout(timeout=180) @pytest.mark.sequential diff --git a/tests/aignostics/application/service_test.py b/tests/aignostics/application/service_test.py index ea023455..90bf87c1 100644 --- a/tests/aignostics/application/service_test.py +++ b/tests/aignostics/application/service_test.py @@ -1,13 +1,14 @@ """Tests to verify the service functionality of the application module.""" from datetime import UTC, datetime, timedelta +from http import HTTPStatus from unittest.mock import MagicMock, patch import pytest from typer.testing import CliRunner from aignostics.application import Service as ApplicationService -from aignostics.platform import NotFoundException, RunData, RunOutput +from aignostics.platform import ApiException, ConcurrencyConflictError, NotFoundException, RunData, RunOutput from tests.constants_test import ( HETA_APPLICATION_ID, HETA_APPLICATION_VERSION, @@ -511,7 +512,7 @@ def test_application_run_update_custom_metadata_success(mock_get_client: MagicMo # Verify the run() method was called with correct run_id mock_client.run.assert_called_once_with("run-123") # Verify the update_custom_metadata method was called with correct arguments - mock_run.update_custom_metadata.assert_called_once_with(custom_metadata) + mock_run.update_custom_metadata.assert_called_once_with(custom_metadata, custom_metadata_checksum=None) @pytest.mark.unit @@ -548,7 +549,9 @@ def test_application_run_update_item_custom_metadata_success(mock_get_client: Ma # Verify the run() method was called with correct run_id mock_client.run.assert_called_once_with("run-123") # Verify the update_item_custom_metadata method was called with correct arguments - mock_run.update_item_custom_metadata.assert_called_once_with("item-ext-id", custom_metadata) + mock_run.update_item_custom_metadata.assert_called_once_with( + "item-ext-id", custom_metadata, custom_metadata_checksum=None + ) @pytest.mark.unit @@ -565,3 +568,71 @@ def test_application_run_update_item_custom_metadata_not_found(mock_get_client: with pytest.raises(NotFoundException, match="not found"): service.application_run_update_item_custom_metadata("run-123", "invalid-item-id", {"key": "value"}) + + +@pytest.mark.unit +@patch("aignostics.application._service.Service._get_platform_client") +def test_application_run_update_custom_metadata_with_checksum(mock_get_client: MagicMock) -> None: + """Checksum is forwarded through the service layer to Run.update_custom_metadata.""" + mock_client = MagicMock() + mock_run = MagicMock() + mock_client.run.return_value = mock_run + mock_get_client.return_value = mock_client + + service = ApplicationService() + service.application_run_update_custom_metadata("run-123", {"key": "value"}, custom_metadata_checksum="abc123") + + mock_run.update_custom_metadata.assert_called_once_with({"key": "value"}, custom_metadata_checksum="abc123") + + +@pytest.mark.unit +@patch("aignostics.application._service.Service._get_platform_client") +def test_application_run_update_custom_metadata_412_raises_concurrency_error(mock_get_client: MagicMock) -> None: + """HTTP 412 from server (optimistic concurrency conflict) raises ConcurrencyConflictError.""" + mock_client = MagicMock() + mock_run = MagicMock() + mock_run.update_custom_metadata.side_effect = ApiException(status=HTTPStatus.PRECONDITION_FAILED) + mock_client.run.return_value = mock_run + mock_get_client.return_value = mock_client + + service = ApplicationService() + + with pytest.raises(ConcurrencyConflictError, match="optimistic concurrency conflict"): + service.application_run_update_custom_metadata("run-123", {"key": "value"}, custom_metadata_checksum="stale") + + +@pytest.mark.unit +@patch("aignostics.application._service.Service._get_platform_client") +def test_application_run_update_item_custom_metadata_with_checksum(mock_get_client: MagicMock) -> None: + """Checksum is forwarded through the service layer to Run.update_item_custom_metadata.""" + mock_client = MagicMock() + mock_run = MagicMock() + mock_client.run.return_value = mock_run + mock_get_client.return_value = mock_client + + service = ApplicationService() + service.application_run_update_item_custom_metadata( + "run-123", "item-ext-id", {"key": "value"}, custom_metadata_checksum="abc123" + ) + + mock_run.update_item_custom_metadata.assert_called_once_with( + "item-ext-id", {"key": "value"}, custom_metadata_checksum="abc123" + ) + + +@pytest.mark.unit +@patch("aignostics.application._service.Service._get_platform_client") +def test_application_run_update_item_custom_metadata_412_raises_concurrency_error(mock_get_client: MagicMock) -> None: + """HTTP 412 from server raises ConcurrencyConflictError for item metadata update.""" + mock_client = MagicMock() + mock_run = MagicMock() + mock_run.update_item_custom_metadata.side_effect = ApiException(status=HTTPStatus.PRECONDITION_FAILED) + mock_client.run.return_value = mock_run + mock_get_client.return_value = mock_client + + service = ApplicationService() + + with pytest.raises(ConcurrencyConflictError, match="optimistic concurrency conflict"): + service.application_run_update_item_custom_metadata( + "run-123", "item-ext-id", {"key": "value"}, custom_metadata_checksum="stale" + )