diff --git a/config/perfecto.py b/config/perfecto.py index 2f59883..af11bc1 100644 --- a/config/perfecto.py +++ b/config/perfecto.py @@ -71,5 +71,20 @@ def get_ai_scriptless_api_url(cloud_name: str) -> str: return f"https://{cloud_name}.app.perfectomobile.com/native-automation-webapp/rest/v1/native-automation" +def get_ai_scriptless_draft_api_url(cloud_name: str) -> str: + return f"https://{cloud_name}.app.perfectomobile.com/native-automation-webapp/rest/v1/draft-management/draft" + + def get_ai_scriptless_execution_api_url(cloud_name: str) -> str: return f"https://{cloud_name}.perfectomobile.com/scriptless-mobile-engine/script-executor/api/executions" + + +def get_ai_scriptless_command_repository_url(cloud_name: str) -> str: + return f"https://{cloud_name}.perfectomobile.com/scriptless-mobile-engine/command-repository/api" + + +def get_repository_management_api_url(cloud_name: str) -> str: + return ( + f"https://{cloud_name}.app.perfectomobile.com" + "/repository-management-webapp/rest/v2/repository-management" + ) diff --git a/dist/perfecto-mcp-amd64.app/Contents/MacOS/perfecto-mcp b/dist/perfecto-mcp-amd64.app/Contents/MacOS/perfecto-mcp index 8dd672b..dcd1263 100644 Binary files a/dist/perfecto-mcp-amd64.app/Contents/MacOS/perfecto-mcp and b/dist/perfecto-mcp-amd64.app/Contents/MacOS/perfecto-mcp differ diff --git a/dist/perfecto-mcp-arm64.app/Contents/MacOS/perfecto-mcp b/dist/perfecto-mcp-arm64.app/Contents/MacOS/perfecto-mcp index 50f8dc7..3be2fcf 100644 Binary files a/dist/perfecto-mcp-arm64.app/Contents/MacOS/perfecto-mcp and b/dist/perfecto-mcp-arm64.app/Contents/MacOS/perfecto-mcp differ diff --git a/dist/perfecto-mcp-linux-amd64 b/dist/perfecto-mcp-linux-amd64 index a5846b3..0d07776 100755 Binary files a/dist/perfecto-mcp-linux-amd64 and b/dist/perfecto-mcp-linux-amd64 differ diff --git a/dist/perfecto-mcp-linux-amd64.sha256 b/dist/perfecto-mcp-linux-amd64.sha256 index 702156d..d543211 100644 --- a/dist/perfecto-mcp-linux-amd64.sha256 +++ b/dist/perfecto-mcp-linux-amd64.sha256 @@ -1 +1 @@ -891b5868edb7db6eeed2412bae6051452520d66393f1f93799835f6609f71761 perfecto-mcp-linux-amd64 +f52839d8062d5b1602a40b1d7dd7a9e51a75fce60501e8c1e208a7af6e1022ab perfecto-mcp-linux-amd64 diff --git a/dist/perfecto-mcp-linux-arm64 b/dist/perfecto-mcp-linux-arm64 index 6e58bff..765fd2f 100755 Binary files a/dist/perfecto-mcp-linux-arm64 and b/dist/perfecto-mcp-linux-arm64 differ diff --git a/dist/perfecto-mcp-linux-arm64.sha256 b/dist/perfecto-mcp-linux-arm64.sha256 index f8029cd..ad2e3fe 100644 --- a/dist/perfecto-mcp-linux-arm64.sha256 +++ b/dist/perfecto-mcp-linux-arm64.sha256 @@ -1 +1 @@ -f8ddcb5a3b77b3926d244be06ae1aff1024ba4f9db5c2b837002a3378e157a50 perfecto-mcp-linux-arm64 +7271e3c6695e21d1164b97aa3a6aeb3fc7315f85912f80a97f19e36be9b92ea5 perfecto-mcp-linux-arm64 diff --git a/dist/perfecto-mcp-windows-amd64.exe b/dist/perfecto-mcp-windows-amd64.exe index eab8383..3301e8f 100644 Binary files a/dist/perfecto-mcp-windows-amd64.exe and b/dist/perfecto-mcp-windows-amd64.exe differ diff --git a/formatters/ai_scriptless.py b/formatters/ai_scriptless.py index 5f49d97..fd04b39 100644 --- a/formatters/ai_scriptless.py +++ b/formatters/ai_scriptless.py @@ -1,5 +1,40 @@ from typing import List, Any, Optional +from models.ai_scriptless import ( + CommandCatalogEntry, + CommandDefinitionSummary, + ScriptFlowElement, + ScriptParameter, + ScriptVariableSummary, + SnapshotListResult, + SnapshotSummary, + TestStructure, +) + +PRIMARY_AI_COMMAND_IDS = ( + "ai_user-action", + "ai_validation", + "ai_visual-comparison", +) + + +def command_selection_policy_info() -> List[str]: + """Context returned with list_commands so agents load policy when choosing command_ids.""" + return [ + "Command selection policy (when authoring tests with add_command / modify_command):", + "Default: use only these primary AI command_ids: " + + ", ".join(PRIMARY_AI_COMMAND_IDS) + ".", + " • ai_user-action — user interactions (open browser/app, navigate to URL, tap, type, dismiss overlays); " + "argument: action (natural language).", + " • ai_validation — checkpoints and assertions; argument: validation (natural language).", + " • ai_visual-comparison — visual/baseline comparison; argument: name.", + "Prefer ai_user-action for navigation (e.g. open browser and go to URL), not browser_goto / browser_open.", + "Do not use browser_*, touch_tap, webpage.element_*, checkpoint_text, etc. unless the user explicitly " + "requests a non-AI command or agreed that AI commands cannot meet a documented requirement.", + "Structural helpers (add_logical_step, add_loop, add_condition, comment, wait) are OK; " + "keep observable steps AI-driven when possible.", + "Call get_command_definitions only for the AI command_ids you will use.", + ] def format_ai_scriptless_tests_filter_values(tests: dict[str, Any], params: Optional[dict] = None) -> dict[str, Any]: filter_values = { @@ -57,3 +92,282 @@ def format_ai_scriptless_tests(tests: dict[str, Any], params: Optional[dict] = N offset = len(formatted_ai_scriptless_tests) - 1 return formatted_ai_scriptless_tests[skip:offset] + + +def _command_id(command: Optional[str], subcommand: Optional[str]) -> Optional[str]: + if not command: + return None + sub = subcommand or "" + if sub: + return f"{command}_{sub}".replace("/", "_") + return command.replace("/", "_") + + +def _definitions_map(command_definitions: Optional[list]) -> dict[str, dict[str, Any]]: + if not command_definitions: + return {} + return {item["commandId"]: item for item in command_definitions if "commandId" in item} + + +def _argument_value(element: dict[str, Any], argument_name: str) -> Optional[str]: + for argument in element.get("arguments", []): + if argument.get("name") == argument_name: + data = argument.get("data", {}) + return data.get("value") + return None + + +def _definition_display_name(command_id: Optional[str], definitions_map: dict[str, dict[str, Any]]) -> Optional[str]: + if not command_id or command_id not in definitions_map: + return None + definition = definitions_map[command_id] + display = definition.get("data", {}).get("display", {}) + return display.get("name") or definition.get("name") + + +def _step_display_name(element: dict[str, Any], definitions_map: dict[str, dict[str, Any]]) -> str: + element_type = element.get("@type", "") + command = element.get("command") + subcommand = element.get("subcommand") + command_id = _command_id(command, subcommand) + + if command == "ai" and subcommand == "user-action": + action_text = _argument_value(element, "action") + if action_text: + return action_text + + if element_type == "Loop": + iterator = element.get("iterator", {}) + count = iterator.get("count") + if count is not None: + return f"Loop ({count})" + return "Loop" + + if element_type == "IfStatement": + expression = element.get("expression") or element.get("label") + if expression: + return f"Condition ({expression})" + return "Condition" + + if element_type == "LogicalStep": + label = element.get("label") + if label: + return label + return "Step" + + if element_type == "Branch": + clause = element.get("clause", "") + return clause.title() if clause else "Branch" + + display_name = _definition_display_name(command_id, definitions_map) + if display_name: + return display_name + + if command: + if subcommand: + return f"{command}/{subcommand}" + return command + + return element_type or "Unknown" + + +def _format_flow_element( + element: dict[str, Any], + definitions_map: dict[str, dict[str, Any]], + step_path: str, +) -> ScriptFlowElement: + element_type = element.get("@type", "") + command = element.get("command") + subcommand = element.get("subcommand") + children: List[ScriptFlowElement] = [] + + if element_type == "IfStatement": + for branch_index, branch in enumerate(element.get("branches", [])): + branch_path = f"{step_path}.b{branch_index}" + branch_label = branch.get("clause", "Branch") + branch_children = [ + _format_flow_element(child, definitions_map, f"{branch_path}.{child_index}") + for child_index, child in enumerate(branch.get("flowElements", [])) + ] + children.append(ScriptFlowElement( + type="Branch", + name=branch_label.title() if branch_label else "Branch", + active=branch.get("active", True), + step_path=branch_path, + children=branch_children, + )) + else: + for child_index, child in enumerate(element.get("flowElements", [])): + children.append( + _format_flow_element(child, definitions_map, f"{step_path}.{child_index}") + ) + + return ScriptFlowElement( + type=element_type, + name=_step_display_name(element, definitions_map), + command=command, + subcommand=subcommand, + active=element.get("active", True), + step_path=step_path, + children=children, + ) + + +def _format_root_flow_elements( + flow_elements: list[dict[str, Any]], + definitions_map: dict[str, dict[str, Any]], +) -> List[ScriptFlowElement]: + return [ + _format_flow_element(element, definitions_map, str(index)) + for index, element in enumerate(flow_elements) + ] + + +def format_test_structure(payload: dict[str, Any], params: Optional[dict] = None) -> TestStructure: + item_key = params.get("item_key", "") if params else "" + script = payload.get("script", {}) + definitions_map = _definitions_map(payload.get("commandDefinitions")) + + parameters = [] + for parameter in script.get("parameters", []): + data = parameter.get("data", {}) + parameters.append(ScriptParameter( + name=data.get("name", parameter.get("name", "")), + type=data.get("@type", "Unknown"), + )) + + flow_elements = _format_root_flow_elements(script.get("flowElements", []), definitions_map) + + info = script.get("info", {}) + return TestStructure( + item_key=item_key, + parameters=parameters, + model_version=info.get("modelVersion"), + flow_elements=flow_elements, + ) + + +def _flatten_command_catalog(node: dict[str, Any], category: Optional[str] = None) -> List[CommandCatalogEntry]: + entries: List[CommandCatalogEntry] = [] + node_name = node.get("name") + node_category = node_name if node.get("children") is not None else category + + if "commandId" in node: + entries.append(CommandCatalogEntry( + command_id=node["commandId"], + name=node.get("name", node["commandId"]), + path=node.get("path", ""), + status=node.get("status"), + category=category, + )) + + for child in node.get("children", []): + entries.extend(_flatten_command_catalog(child, node_category)) + + return entries + + +def format_command_catalog(catalog: dict[str, Any], params: Optional[dict] = None) -> List[CommandCatalogEntry]: + return _flatten_command_catalog(catalog) + + +def _parameter_names(parameters: Optional[list]) -> List[str]: + if not parameters: + return [] + return [param.get("name", param.get("parameterName", "")) for param in parameters if param.get("name") or param.get("parameterName")] + + +def _variable_type_label(data: dict[str, Any]) -> str: + data_type = data.get("@type", "Unknown") + if data_type == "StringData": + return "secured_string" if data.get("secured") else "string" + mapping = { + "BooleanData": "boolean", + "IntegerData": "number", + "HandsetData": "device", + "MediaData": "media", + "TableData": "datatable", + } + return mapping.get(data_type, data_type) + + +def format_test_variables(variables: Any, params: Optional[dict] = None) -> List[ScriptVariableSummary]: + if not isinstance(variables, list): + return [] + + formatted: List[ScriptVariableSummary] = [] + for variable in variables: + if not isinstance(variable, dict): + continue + data = variable.get("data", {}) + value = data.get("value") + if data.get("secured") and value: + value = "" + formatted.append(ScriptVariableSummary( + name=data.get("name", ""), + type=_variable_type_label(data), + value=value, + secured=bool(data.get("secured")), + set_at_runtime=variable.get("@type") == "Parameter", + )) + return formatted + + +def format_snapshots_list(response: Any, params: Optional[dict] = None) -> SnapshotListResult: + snapshots = response.get("snapshots", []) if isinstance(response, dict) else response + if not isinstance(snapshots, list): + snapshots = [] + + formatted: List[SnapshotSummary] = [] + for snapshot in snapshots: + if not isinstance(snapshot, dict): + continue + key = snapshot.get("key", "") + creation = snapshot.get("creationTime") or snapshot.get("createdTime") or {} + created_time = creation.get("formatted") if isinstance(creation, dict) else creation + formatted.append(SnapshotSummary( + key=key, + version=snapshot.get("version"), + comment=snapshot.get("comment"), + created_by=snapshot.get("createdBy"), + created_time=created_time, + is_current=key == "", + )) + + formatted.sort(key=lambda entry: (0 if entry.is_current else 1, entry.key)) + + test_id = params.get("test_id") if params else None + return SnapshotListResult( + test_id=test_id, + count=len(formatted), + snapshots=formatted, + notes=[ + "Every POST script save (including save_test without comment) adds a new UUID entry to snapshot history.", + "The '' entry marks the live editable script; use view_test_structure with test_id for its structure.", + "Open historical versions with view_snapshot using a UUID key from this list, not ''.", + "The comment argument on save_test/save_test_as labels '' (UI: Save with comment); it does not skip version creation.", + ], + ) + + +def format_command_definitions(response: Any, params: Optional[dict] = None) -> List[CommandDefinitionSummary]: + definitions = response if isinstance(response, list) else response.get("definitions", response.get("items", [])) + if not isinstance(definitions, list): + definitions = [definitions] if definitions else [] + + summaries: List[CommandDefinitionSummary] = [] + for definition in definitions: + if not isinstance(definition, dict): + continue + command_id = definition.get("commandId", "") + data = definition.get("data", definition) + display = data.get("display", {}) + summaries.append(CommandDefinitionSummary( + command_id=command_id, + name=display.get("name") or definition.get("name") or command_id, + mandatory_parameters=_parameter_names(data.get("mandatoryParameters")), + optional_parameters=_parameter_names(data.get("optionalParameters")), + help_text=display.get("helpText") or data.get("helpText"), + raw=definition, + )) + return summaries diff --git a/models/ai_scriptless.py b/models/ai_scriptless.py new file mode 100644 index 0000000..97c4820 --- /dev/null +++ b/models/ai_scriptless.py @@ -0,0 +1,72 @@ +from typing import Any, List, Optional + +from pydantic import BaseModel, Field + + +class ScriptFlowElement(BaseModel): + type: str = Field(description="Perfecto @type (Action, Validation, Loop, etc.)") + name: str = Field(description="Display name for the step") + command: Optional[str] = Field(description="Command namespace", default=None) + subcommand: Optional[str] = Field(description="Command subcommand", default=None) + active: bool = Field(description="Whether the step is enabled (not excluded)", default=True) + step_path: Optional[str] = Field( + description="Dot-separated positional path (e.g. 0, 2.0, 5.b0.1); derived from tree position", + default=None, + ) + children: List["ScriptFlowElement"] = Field(description="Nested flow elements", default_factory=list) + + +class ScriptParameter(BaseModel): + name: str = Field(description="Parameter name") + type: str = Field(description="Parameter data type") + + +class ScriptVariableSummary(BaseModel): + name: str = Field(description="Variable name") + type: str = Field(description="Variable type (string, number, boolean, secured_string, etc.)") + value: Optional[Any] = Field(description="Variable value when readable", default=None) + secured: bool = Field(description="Whether the value is secured", default=False) + set_at_runtime: bool = Field(description="True when value is provided at execution time", default=False) + + +class TestStructure(BaseModel): + item_key: str = Field(description="Script itemKey") + parameters: List[ScriptParameter] = Field(description="Test parameters", default_factory=list) + model_version: Optional[str] = Field(description="Script model version", default=None) + flow_elements: List[ScriptFlowElement] = Field(description="Root flow elements", default_factory=list) + + +class CommandCatalogEntry(BaseModel): + command_id: str = Field(description="Command identifier for definitions API") + name: str = Field(description="Display name") + path: str = Field(description="Catalog path") + status: Optional[str] = Field(description="Command status (GA, DRAFT, etc.)", default=None) + category: Optional[str] = Field(description="Parent category name", default=None) + + +class SnapshotSummary(BaseModel): + key: str = Field(description="Snapshot identifier (UUID for history, or '' for the live script marker)") + version: Optional[str] = Field(description="Snapshot version label", default=None) + comment: Optional[str] = Field(description="User comment from save with comment (typically on '' only)", default=None) + created_by: Optional[str] = Field(description="User who created the snapshot", default=None) + created_time: Optional[str] = Field(description="Creation timestamp", default=None) + is_current: bool = Field(description="True when key is '' (live script marker, not openable via view_snapshot)", default=False) + + +class SnapshotListResult(BaseModel): + test_id: Optional[str] = Field(description="Test itemKey queried", default=None) + count: int = Field(description="Number of snapshot entries returned") + snapshots: List[SnapshotSummary] = Field(description="Snapshot entries including '' marker", default_factory=list) + notes: List[str] = Field(description="Behavior notes for interpreting snapshot history", default_factory=list) + + +class CommandDefinitionSummary(BaseModel): + command_id: str = Field(description="Command identifier") + name: str = Field(description="Display name") + mandatory_parameters: List[str] = Field(description="Required parameter names", default_factory=list) + optional_parameters: List[str] = Field(description="Optional parameter names", default_factory=list) + help_text: Optional[str] = Field(description="Help text", default=None) + raw: Optional[dict[str, Any]] = Field(description="Full definition payload", default=None) + + +ScriptFlowElement.model_rebuild() \ No newline at end of file diff --git a/tools/ai_scriptless_manager.py b/tools/ai_scriptless_manager.py index 05c613f..862c234 100644 --- a/tools/ai_scriptless_manager.py +++ b/tools/ai_scriptless_manager.py @@ -1,6 +1,7 @@ import json import traceback from typing import Optional, Any, Dict +from urllib.parse import quote import httpx from mcp.server.fastmcp import Context @@ -10,11 +11,56 @@ from config.perfecto import TOOLS_PREFIX, SUPPORT_MESSAGE from config.token import PerfectoToken, token_verify from formatters.ai_scriptless import format_ai_scriptless_tests, \ - format_ai_scriptless_tests_filter_values + format_ai_scriptless_tests_filter_values, command_selection_policy_info, \ + format_command_catalog, format_command_definitions, format_snapshots_list, \ + format_test_structure, format_test_variables from models.manager import Manager from models.result import BaseResult, PaginationResult +from tools.ai_scriptless_script import ( + add_script_variable, + build_flow_element, + build_if_statement, + build_item_key, + build_logical_step, + build_loop, + build_move_test_body, + build_snapshot_search_body, + delete_script_variable, + delete_element_by_path, + fetch_current_username, + fetch_script_payload, + find_element_by_path, + find_step_path_for_element, + insert_flow_element, + load_and_mutate, + modify_script_variable, + move_element_by_path, + new_empty_script, + persist_script, + script_write_lock, + set_condition_expression, + set_element_enabled, + split_item_key, + test_file_name, + update_element_arguments, +) from tools.utils import api_request +STEP_PATH_REFRESH_NOTES = [ + "step_path values are dot-separated positional paths (e.g. 0, 2.0, 5.b0.1); Perfecto does not persist them.", + "After this operation, step paths may have changed. Call view_test_structure before the next edit; " + "do not reuse step_path values from this response.", +] + +def _append_step_path_refresh_notes(result: BaseResult) -> BaseResult: + if result.error or not isinstance(result.result, dict): + return result + notes = result.result.setdefault("notes", []) + for note in STEP_PATH_REFRESH_NOTES: + if note not in notes: + notes.append(note) + return result + class AiScriptlessManager(Manager): def __init__(self, token: Optional[PerfectoToken], ctx: Context): @@ -143,6 +189,436 @@ async def execute_test(self, test_id: str, device_type: str, device_under_test: error="Invalid device_type or device_under_test value." ) + @token_verify + async def list_commands(self, checkpoint: bool = False) -> BaseResult: + commands_url = perfecto.get_ai_scriptless_command_repository_url(self.token.cloud_name) + commands_url = commands_url + "/commands" + if checkpoint: + commands_url = commands_url + "?checkpoint=true" + result = await api_request(self.token, "GET", endpoint=commands_url, + result_formatter=format_command_catalog) + if not result.error: + result.append_info(command_selection_policy_info()) + return result + + @token_verify + async def get_command_definitions(self, command_ids: list[str]) -> BaseResult: + if not command_ids: + return BaseResult(error="command_ids is required and must not be empty") + definitions_url = perfecto.get_ai_scriptless_command_repository_url(self.token.cloud_name) + definitions_url = definitions_url + "/commands/definitions" + return await api_request(self.token, "POST", endpoint=definitions_url, + json={"commandIds": command_ids}, + result_formatter=format_command_definitions) + + @token_verify + async def view_test_structure(self, test_id: str) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required (itemKey from list_tests)") + script_url = perfecto.get_ai_scriptless_api_url(self.token.cloud_name) + script_url = script_url + f"/script?itemKey={quote(test_id, safe='')}" + return await api_request(self.token, "GET", endpoint=script_url, + result_formatter=format_test_structure, + result_formatter_params={"item_key": test_id}) + + @token_verify + async def add_command( + self, + test_id: str, + command_id: str, + arguments: Optional[dict[str, Any]] = None, + after_path: Optional[str] = None, + parent_path: Optional[str] = None, + ) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + if not command_id: + return BaseResult(error="command_id is required (from list_commands)") + + element = build_flow_element(command_id, arguments) + inserted_path: dict[str, Optional[str]] = {"step_path": None} + + def mutator(script: dict[str, Any]) -> None: + insert_flow_element(script, element, after_path=after_path, parent_path=parent_path) + inserted_path["step_path"] = find_step_path_for_element(script, element) + + result = await load_and_mutate(self.token, test_id, mutator) + if result.error: + return result + result.result["step_path"] = inserted_path["step_path"] + result.result["command_id"] = command_id + return _append_step_path_refresh_notes(result) + + @token_verify + async def modify_command(self, test_id: str, step_path: str, arguments: dict[str, Any]) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + if not step_path: + return BaseResult(error="step_path is required (from view_test_structure)") + if not arguments: + return BaseResult(error="arguments is required") + + def mutator(script: dict[str, Any]) -> None: + located = find_element_by_path(script, step_path) + if located is None: + raise ValueError(f"step_path not found: {step_path}") + _, _, element = located + update_element_arguments(element, arguments) + + return _append_step_path_refresh_notes( + await load_and_mutate(self.token, test_id, mutator) + ) + + @token_verify + async def delete_command(self, test_id: str, step_path: str) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + if not step_path: + return BaseResult(error="step_path is required (from view_test_structure)") + + def mutator(script: dict[str, Any]) -> None: + delete_element_by_path(script, step_path) + + return _append_step_path_refresh_notes( + await load_and_mutate(self.token, test_id, mutator) + ) + + @token_verify + async def set_command_enabled(self, test_id: str, step_path: str, enabled: bool) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + if not step_path: + return BaseResult(error="step_path is required (from view_test_structure)") + + def mutator(script: dict[str, Any]) -> None: + set_element_enabled(script, step_path, enabled) + + result = await load_and_mutate(self.token, test_id, mutator) + if result.error: + return result + result.result["step_path"] = step_path + result.result["active"] = enabled + return _append_step_path_refresh_notes(result) + + @token_verify + async def save_test(self, test_id: str, comment: Optional[str] = None) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + + def mutator(_script: dict[str, Any]) -> None: + pass + + return await load_and_mutate( + self.token, + test_id, + mutator, + snapshot_comment=comment, + ) + + @token_verify + async def create_test(self, name: str, folder: str = "My Folder", visibility: str = "PRIVATE") -> BaseResult: + if not name: + return BaseResult(error="name is required") + item_key = build_item_key(visibility, folder, name) + script = new_empty_script() + return await persist_script(self.token, item_key, script) + + @token_verify + async def save_test_as( + self, + test_id: str, + name: str, + folder: str = "My Folder", + visibility: str = "PRIVATE", + comment: Optional[str] = None, + ) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + if not name: + return BaseResult(error="name is required") + async with script_write_lock(test_id): + payload_result = await fetch_script_payload(self.token, test_id) + if payload_result.error: + return payload_result + script = payload_result.result.get("script", {}) + item_key = build_item_key(visibility, folder, name) + return _append_step_path_refresh_notes( + await persist_script(self.token, item_key, script, snapshot_comment=comment) + ) + + async def _add_structure( + self, + test_id: str, + element: dict[str, Any], + structure_type: str, + after_path: Optional[str] = None, + parent_path: Optional[str] = None, + ) -> BaseResult: + inserted_path: dict[str, Optional[str]] = {"step_path": None} + + def mutator(script: dict[str, Any]) -> None: + insert_flow_element(script, element, after_path=after_path, parent_path=parent_path) + inserted_path["step_path"] = find_step_path_for_element(script, element) + + result = await load_and_mutate(self.token, test_id, mutator) + if result.error: + return result + result.result["step_path"] = inserted_path["step_path"] + result.result["structure_type"] = structure_type + return _append_step_path_refresh_notes(result) + + @token_verify + async def add_logical_step( + self, + test_id: str, + label: Optional[str] = None, + after_path: Optional[str] = None, + parent_path: Optional[str] = None, + ) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + element = build_logical_step(label) + return await self._add_structure(test_id, element, "LogicalStep", after_path, parent_path) + + @token_verify + async def add_loop( + self, + test_id: str, + count: int = 1, + after_path: Optional[str] = None, + parent_path: Optional[str] = None, + ) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + if count < 1: + return BaseResult(error="count must be at least 1") + element = build_loop(count) + result = await self._add_structure(test_id, element, "Loop", after_path, parent_path) + if not result.error: + result.result["count"] = count + return result + + @token_verify + async def add_condition( + self, + test_id: str, + expression: Optional[str] = None, + label: Optional[str] = None, + after_path: Optional[str] = None, + parent_path: Optional[str] = None, + ) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + element = build_if_statement(expression, label) + result = await self._add_structure(test_id, element, "IfStatement", after_path, parent_path) + if not result.error and expression: + result.result["expression"] = expression + return result + + @token_verify + async def set_condition_expression(self, test_id: str, step_path: str, expression: str) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + if not step_path: + return BaseResult(error="step_path is required (IfStatement path from view_test_structure)") + if not expression: + return BaseResult(error="expression is required") + + def mutator(script: dict[str, Any]) -> None: + set_condition_expression(script, step_path, expression) + + result = await load_and_mutate(self.token, test_id, mutator) + if result.error: + return result + result.result["step_path"] = step_path + result.result["expression"] = expression + return _append_step_path_refresh_notes(result) + + @token_verify + async def move_command( + self, + test_id: str, + step_path: str, + after_path: Optional[str] = None, + parent_path: Optional[str] = None, + ) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + if not step_path: + return BaseResult(error="step_path is required") + if not after_path and not parent_path: + return BaseResult(error="after_path or parent_path is required") + + def mutator(script: dict[str, Any]) -> None: + move_element_by_path(script, step_path, after_path=after_path, parent_path=parent_path) + + result = await load_and_mutate(self.token, test_id, mutator) + if result.error: + return result + result.result["step_path"] = step_path + return _append_step_path_refresh_notes(result) + + @token_verify + async def delete_test(self, test_id: str) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required (itemKey from list_tests)") + delete_url = perfecto.get_ai_scriptless_api_url(self.token.cloud_name) + "/repositories" + return await api_request( + self.token, + "DELETE", + endpoint=delete_url, + params={"itemKey": test_id}, + ) + + @token_verify + async def move_test( + self, + test_id: str, + folder: str, + visibility: Optional[str] = None, + ) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required (itemKey from list_tests)") + + try: + source_visibility, _ = split_item_key(test_id) + except ValueError as exc: + return BaseResult(error=str(exc)) + + target_visibility = visibility or source_visibility + move_url = perfecto.get_repository_management_api_url(self.token.cloud_name) + "/artifacts" + body = build_move_test_body(test_id, folder, target_visibility) + result = await api_request(self.token, "PATCH", endpoint=move_url, json=body) + if result.error: + return result + target_item_key = build_item_key( + target_visibility, + folder, + test_file_name(test_id).removesuffix(".xml"), + ) + result.result = { + "source_item_key": test_id, + "target_item_key": target_item_key, + "folder": folder, + "visibility": target_visibility, + } + return result + + @token_verify + async def list_snapshots(self, test_id: str) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required (itemKey from list_tests)") + try: + body = build_snapshot_search_body(test_id) + except ValueError as exc: + return BaseResult(error=str(exc)) + snapshots_url = ( + perfecto.get_repository_management_api_url(self.token.cloud_name) + "/snapshots/search" + ) + return await api_request( + self.token, + "POST", + endpoint=snapshots_url, + json=body, + result_formatter=format_snapshots_list, + result_formatter_params={"test_id": test_id}, + ) + + @token_verify + async def view_snapshot(self, snapshot_id: str) -> BaseResult: + if not snapshot_id: + return BaseResult(error="snapshot_id is required (key from list_snapshots)") + if snapshot_id == "": + return BaseResult( + error="snapshot_id '' is the live script marker, not a historical snapshot. " + "Use view_test_structure with test_id for the current editable script." + ) + return await api_request( + self.token, + "GET", + endpoint=perfecto.get_ai_scriptless_api_url(self.token.cloud_name) + + f"/snapshots?itemKey={quote(snapshot_id, safe='')}", + result_formatter=format_test_structure, + result_formatter_params={"item_key": snapshot_id}, + ) + + @token_verify + async def list_test_variables(self, test_id: str) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required (itemKey from list_tests)") + payload_result = await fetch_script_payload(self.token, test_id) + if payload_result.error: + return payload_result + script = payload_result.result.get("script", {}) + variables = format_test_variables(script.get("variables", [])) + return BaseResult(result=variables) + + @token_verify + async def add_test_variable( + self, + test_id: str, + name: str, + variable_type: str = "string", + value: Any = "", + set_at_runtime: bool = False, + ) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + if not name: + return BaseResult(error="name is required") + + def mutator(script: dict[str, Any]) -> None: + add_script_variable(script, name, variable_type, value, set_at_runtime) + + result = await load_and_mutate(self.token, test_id, mutator) + if result.error: + return result + result.result["name"] = name + result.result["type"] = variable_type + result.result["set_at_runtime"] = set_at_runtime + return result + + @token_verify + async def modify_test_variable( + self, + test_id: str, + name: str, + value: Optional[Any] = None, + variable_type: Optional[str] = None, + set_at_runtime: Optional[bool] = None, + ) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + if not name: + return BaseResult(error="name is required") + if value is None and variable_type is None and set_at_runtime is None: + return BaseResult(error="At least one of value, variable_type, or set_at_runtime is required") + + def mutator(script: dict[str, Any]) -> None: + modify_script_variable(script, name, value, variable_type, set_at_runtime) + + result = await load_and_mutate(self.token, test_id, mutator) + if result.error: + return result + result.result["name"] = name + return result + + @token_verify + async def delete_test_variable(self, test_id: str, name: str) -> BaseResult: + if not test_id: + return BaseResult(error="test_id is required") + if not name: + return BaseResult(error="name is required") + + def mutator(script: dict[str, Any]) -> None: + delete_script_variable(script, name) + + result = await load_and_mutate(self.token, test_id, mutator) + if result.error: + return result + result.result["name"] = name + return result + def register(mcp, token: Optional[PerfectoToken]): @mcp.tool( @@ -161,28 +637,149 @@ def register(mcp, token: Optional[PerfectoToken]): filter_names (list[str], values=['test_name', 'owner_list']): The filter name list. - execute_test: Execute a preconfigured AI Scriptless Test. args(dict): Dictionary with the following required parameters: - test_id (str): Test ID from list_tests() + test_id (str): Test ID from list_tests device_type (str, default='real', values=['real', 'virtual', 'desktop']: The device type. device_under_test (dict, required): Device configuration object. - When device_type='real': {device_id: str} (Get from list_real_devices()). - When device_type='virtual': {platform_name: str, manufacturer: str, model: str, platform_version: str} (Get from list_virtual_devices()). + When device_type='real': {device_id: str} (Get from list_real_devices). + When device_type='virtual': {platform_name: str, manufacturer: str, model: str, platform_version: str} (Get from list_virtual_devices). When device_type='desktop': {platform_name: str, platform_version: str, browser_name: str, - browser_version: str, resolution: str, location: str} (Get from list_desktop_devices()). + browser_version: str, resolution: str, location: str} (Get from list_desktop_devices). +- view_test_structure: View the hierarchical structure of an AI Scriptless test. Each step has step_path (dot-separated positional path, e.g. 0, 2.0, 5.b0.1). + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests (e.g. PRIVATE:My Folder/My Test.xml). +- list_commands: List available AI Scriptless commands from the command repository. + Returns the catalog in result and command selection policy in info (read info before add_command when authoring tests). + args(dict): Dictionary with the following optional parameters: + checkpoint (bool, default=false): If true, list checkpoint commands only. +- get_command_definitions: Get parameter definitions for one or more commands. + args(dict): Dictionary with the following required parameters: + command_ids (list[str]): Command IDs from list_commands (typically ai_user-action, ai_validation, ai_visual-comparison). +- add_command: Add a command to a test and persist it. + args(dict): Dictionary with the following parameters: + test_id (str, required): Test itemKey from list_tests. + command_id (str, required): Command ID from list_commands. + arguments (dict, optional): Command argument names to values. + after_path (str, optional): Insert after this step (step_path from view_test_structure). + parent_path (str, optional): Insert inside a container (step_path of LogicalStep, Loop, or Branch). +- modify_command: Update command arguments and persist. + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests. + step_path (str): Step path from view_test_structure (e.g. 0, 2.0, 5.b0.1). + arguments (dict): Argument names to new values. +- delete_command: Remove a command from a test and persist. + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests. + step_path (str): Step path from view_test_structure. +- set_command_enabled: Enable or disable (exclude/include) a command and persist. + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests. + step_path (str): Step path from view_test_structure. + enabled (bool): True to include, false to exclude. +- save_test: Persist the current test script. + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests. + comment (str, optional): Labels the current version on '' in list_snapshots (UI: Save with comment). Every save also adds a UUID history entry. +- create_test: Create a new empty test with a DUT parameter. + args(dict): Dictionary with the following required parameters: + name (str): Test name without .xml extension. + folder (str, default='My Folder'): Target folder. + visibility (str, default='PRIVATE', values=['PUBLIC', 'PRIVATE']): Test visibility. +- save_test_as: Copy a test to a new itemKey and persist. + args(dict): Dictionary with the following required parameters: + test_id (str): Source test itemKey from list_tests. + name (str): New test name without .xml extension. + folder (str, default='My Folder'): Target folder. + visibility (str, default='PRIVATE', values=['PUBLIC', 'PRIVATE']): Test visibility. + comment (str, optional): Labels the current version on '' in list_snapshots for the saved copy. +- add_logical_step: Add a LogicalStep group container and persist. + args(dict): Dictionary with the following parameters: + test_id (str, required): Test itemKey from list_tests. + label (str, optional): Group label. + after_path (str, optional): Insert after this step path. + parent_path (str, optional): Insert inside a container step path. +- add_loop: Add a Loop container and persist. + args(dict): Dictionary with the following parameters: + test_id (str, required): Test itemKey from list_tests. + count (int, default=1): RepeatIterator count. + after_path (str, optional): Insert after this step path. + parent_path (str, optional): Insert inside a container step path. +- add_condition: Add an IfStatement condition with Then/Else branches and persist. + args(dict): Dictionary with the following parameters: + test_id (str, required): Test itemKey from list_tests. + expression (str, optional): Condition expression. + label (str, optional): Condition label. + after_path (str, optional): Insert after this step path. + parent_path (str, optional): Insert inside a container step path. +- set_condition_expression: Set the expression on an IfStatement and persist. + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests. + step_path (str): IfStatement path from view_test_structure (e.g. 5). + expression (str): Condition expression. +- move_command: Move a step to a new position and persist. + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests. + step_path (str): Step path to move. + after_path (str, optional): Insert after this sibling step path. + parent_path (str, optional): Move into this container step path (LogicalStep, Loop, or Branch). +- delete_test: Delete an AI Scriptless test from the repository. + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests. +- move_test: Move a test to another folder (same or different visibility). + args(dict): Dictionary with the following required parameters: + test_id (str): Source test itemKey from list_tests. + folder (str): Target folder path without visibility prefix (e.g. 'My Folder', 'MCP Archive', or 'My Folder/SubFolder'). The test file keeps its name. If the path does not exist, the API creates the nested folder segments automatically (the new folder may not appear as a CONTAINER in list_tests until it contains tests). + visibility (str, optional): Target visibility; defaults to the source test visibility. + Returns source_item_key and target_item_key; use target_item_key for view_test_structure, execute_test, and other actions after the move. +- list_snapshots: List snapshot history for a test (includes '' marker plus UUID historical versions). + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests. + Returns notes explaining that every save adds history entries and how '' vs UUID keys work. +- view_snapshot: View the hierarchical structure of a historical snapshot (same format as view_test_structure). + args(dict): Dictionary with the following required parameters: + snapshot_id (str): UUID key from list_snapshots (not ''; use view_test_structure for the live script). +- list_test_variables: List script variables configured on a test (distinct from DUT parameters). + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests. +- add_test_variable: Add a script variable and persist. + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests. + name (str): Variable name (letters, numbers, underscore; cannot start with a number). + variable_type (str, default='string', values=['string', 'secured_string', 'number', 'boolean']): Variable type. + value (any, default=''): Variable value. + set_at_runtime (bool, default=false): When true, value is supplied at execution time. +- modify_test_variable: Update a script variable and persist. + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests. + name (str): Existing variable name. + value (any, optional): New value. + variable_type (str, optional): New type (string, secured_string, number, boolean). + set_at_runtime (bool, optional): Toggle runtime parameter behavior. +- delete_test_variable: Remove a script variable and persist. + args(dict): Dictionary with the following required parameters: + test_id (str): Test itemKey from list_tests. + name (str): Variable name to delete. Hints: +- When authoring or editing test steps, call list_commands first and follow the command selection policy in the info field. +- step_path is a dot-separated positional path without spaces (0-based indices; b0=Then branch, b1=Else). Example: root step 3 is "3"; first step inside Then of condition at 5 is "5.b0.0". Perfecto does not persist paths; they change when steps are inserted, moved, or deleted. Always call view_test_structure before the next structure edit; do not reuse step_path from a previous mutation response. +- Use parent_path on add_command with the step_path of a LogicalStep, Loop, or Branch from view_test_structure. +- Use add_logical_step, add_loop, and add_condition to build control-flow structures matching the UI toolbar Group, Loop, and Condition actions. +- Script variables (list_test_variables, add/modify/delete_test_variable) are stored in script.variables[] and are distinct from the DUT parameter in script.parameters[]. +- Snapshot behavior: every save creates a UUID history entry; comment on save_test labels ''. See list_snapshots notes for details. +- Edits persist immediately via the internal draft→script pipeline (each persist also adds snapshot history; save_test is available to re-persist unchanged content). - IMPORTANT: Always call list_filter_values first to get valid filter values before using any filters in list_tests. This ensures you're using the correct test name, list of owners users or other filter values that actually exist in the system. - If in any result has_next_page is true, ask the user if they want to see the next page or access all pages before making a subsequent call. - Before executing a test, follow this validation workflow: - 1. list_tests() (get and validate test_id). + 1. list_tests (get and validate test_id). 2. Get device configuration based on device_type: - - 'real': list_real_devices() (get device_id). - - 'virtual': list_virtual_devices() (get platform_name, manufacturer, model, platform_version). - - 'desktop': list_desktop_devices() (get platform_name, platform_version, browser_name, browser_version, resolution, location). - 3. On real device use read_real_device_info() (verify device is available and not in use). - 4. execute_test() (execute the test). - 5. list_report_executions() with report name equal to test name and list_live_executions() when the device it's in use (monitor execution progress). + - 'real': list_real_devices (get device_id). + - 'virtual': list_virtual_devices (get platform_name, manufacturer, model, platform_version). + - 'desktop': list_desktop_devices (get platform_name, platform_version, browser_name, browser_version, resolution, location). + 3. On real device use read_real_device_info (verify device is available and not in use). + 4. execute_test (execute the test). + 5. list_report_executions with report name equal to test name and list_live_executions when the device it's in use (monitor execution progress). - Always check before running a test_id if the device_type and device_under_test exist and is available (when it's a real device), not use device in use or malfunctioning. -- Always monitor a real device's operation while it's in use by checking the information with read_real_device_info(). +- Always monitor a real device's operation while it's in use by checking the information with read_real_device_info. - Always stop the execution by stopping the live execution (make sure it's the correct execution, such as the execution name or user ID). """ ) @@ -204,6 +801,126 @@ async def ai_scriptless( return await ai_scriptless_manager.execute_test(args.get("test_id", ""), args.get("device_type", ""), args.get("device_under_test", {})) + case "view_test_structure": + return await ai_scriptless_manager.view_test_structure(args.get("test_id", "")) + case "list_commands": + return await ai_scriptless_manager.list_commands(args.get("checkpoint", False)) + case "get_command_definitions": + return await ai_scriptless_manager.get_command_definitions(args.get("command_ids", [])) + case "add_command": + return await ai_scriptless_manager.add_command( + args.get("test_id", ""), + args.get("command_id", ""), + args.get("arguments"), + args.get("after_path"), + args.get("parent_path"), + ) + case "modify_command": + return await ai_scriptless_manager.modify_command( + args.get("test_id", ""), + args.get("step_path", ""), + args.get("arguments", {}), + ) + case "delete_command": + return await ai_scriptless_manager.delete_command( + args.get("test_id", ""), + args.get("step_path", ""), + ) + case "set_command_enabled": + return await ai_scriptless_manager.set_command_enabled( + args.get("test_id", ""), + args.get("step_path", ""), + args.get("enabled", True), + ) + case "save_test": + return await ai_scriptless_manager.save_test( + args.get("test_id", ""), + args.get("comment"), + ) + case "create_test": + return await ai_scriptless_manager.create_test( + args.get("name", ""), + args.get("folder", "My Folder"), + args.get("visibility", "PRIVATE"), + ) + case "save_test_as": + return await ai_scriptless_manager.save_test_as( + args.get("test_id", ""), + args.get("name", ""), + args.get("folder", "My Folder"), + args.get("visibility", "PRIVATE"), + args.get("comment"), + ) + case "add_logical_step": + return await ai_scriptless_manager.add_logical_step( + args.get("test_id", ""), + args.get("label"), + args.get("after_path"), + args.get("parent_path"), + ) + case "add_loop": + return await ai_scriptless_manager.add_loop( + args.get("test_id", ""), + args.get("count", 1), + args.get("after_path"), + args.get("parent_path"), + ) + case "add_condition": + return await ai_scriptless_manager.add_condition( + args.get("test_id", ""), + args.get("expression"), + args.get("label"), + args.get("after_path"), + args.get("parent_path"), + ) + case "set_condition_expression": + return await ai_scriptless_manager.set_condition_expression( + args.get("test_id", ""), + args.get("step_path", ""), + args.get("expression", ""), + ) + case "move_command": + return await ai_scriptless_manager.move_command( + args.get("test_id", ""), + args.get("step_path", ""), + args.get("after_path"), + args.get("parent_path"), + ) + case "delete_test": + return await ai_scriptless_manager.delete_test(args.get("test_id", "")) + case "move_test": + return await ai_scriptless_manager.move_test( + args.get("test_id", ""), + args.get("folder", ""), + args.get("visibility"), + ) + case "list_snapshots": + return await ai_scriptless_manager.list_snapshots(args.get("test_id", "")) + case "view_snapshot": + return await ai_scriptless_manager.view_snapshot(args.get("snapshot_id", "")) + case "list_test_variables": + return await ai_scriptless_manager.list_test_variables(args.get("test_id", "")) + case "add_test_variable": + return await ai_scriptless_manager.add_test_variable( + args.get("test_id", ""), + args.get("name", ""), + args.get("variable_type", "string"), + args.get("value", ""), + args.get("set_at_runtime", False), + ) + case "modify_test_variable": + return await ai_scriptless_manager.modify_test_variable( + args.get("test_id", ""), + args.get("name", ""), + args.get("value"), + args.get("variable_type"), + args.get("set_at_runtime"), + ) + case "delete_test_variable": + return await ai_scriptless_manager.delete_test_variable( + args.get("test_id", ""), + args.get("name", ""), + ) case _: return BaseResult( error=f"Action {action} not found in AI Scriptless manager tool" diff --git a/tools/ai_scriptless_script.py b/tools/ai_scriptless_script.py new file mode 100644 index 0000000..788ed50 --- /dev/null +++ b/tools/ai_scriptless_script.py @@ -0,0 +1,804 @@ +import asyncio +import copy +import json +import re +from contextlib import asynccontextmanager +from typing import Any, Optional +from urllib.parse import quote + +from config import perfecto +from config.token import PerfectoToken +from models.result import BaseResult +from tools.utils import api_request + + +def build_item_key(visibility: str, folder: str, name: str) -> str: + test_name = name if name.endswith(".xml") else f"{name}.xml" + folder = folder.strip("/") + return f"{visibility}:{folder}/{test_name}" + + +def split_item_key(item_key: str) -> tuple[str, str]: + visibility, _, path = item_key.partition(":") + if not path: + raise ValueError(f"Invalid itemKey format (expected VISIBILITY:path): {item_key}") + return visibility, path + + +def folder_type(visibility: str) -> str: + if visibility in ("PRIVATE", "GROUP"): + return visibility + return "PUBLIC" + + +def test_file_name(item_key: str) -> str: + _, path = split_item_key(item_key) + return path.rsplit("/", 1)[-1] + + +def build_snapshot_search_body(test_id: str) -> dict[str, Any]: + visibility, artifact_id = split_item_key(test_id) + return { + "repositoryType": "SCRIPTS", + "keyDetails": {"artifactId": artifact_id, "version": "v0"}, + "folderType": folder_type(visibility), + } + + +def build_move_test_body( + test_id: str, + folder: str, + visibility: str, +) -> dict[str, Any]: + src_visibility, src_path = split_item_key(test_id) + file_name = test_file_name(test_id) + target_folder = folder.strip("/") + target_artifact_id = f"{target_folder}/{file_name}" if target_folder else file_name + return { + "repositoryType": "SCRIPTS", + "keyDetails": {"artifactId": src_path, "version": "v0"}, + "folderType": folder_type(src_visibility), + "targetKeyDetails": {"artifactId": target_artifact_id, "version": "v0"}, + "targetFolderType": folder_type(visibility), + "copy": False, + } + + +def parse_command_id(command_id: str) -> tuple[str, str]: + if command_id.startswith("ai_"): + return "ai", command_id[3:] + if "_" in command_id: + command, subcommand = command_id.split("_", 1) + return command, subcommand + return command_id, "" + + +def element_type_for_command(command: str, subcommand: str) -> str: + if command == "ai" and subcommand == "validation": + return "Validation" + if command == "checkpoint": + return "Validation" + return "Action" + + +def default_error_policy(element_type: str) -> str: + return "IGNORE" if element_type == "Validation" else "ABORT" + + +def _make_argument(name: str, value: Any, data_source: str = "CONSTANT") -> dict[str, Any]: + if data_source == "VARIABLE": + data: dict[str, Any] = { + "@type": "VariableArgumentData", + "dataSource": "VARIABLE", + "value": value, + } + else: + data = { + "@type": "ConstantArgumentData", + "dataSource": "CONSTANT", + "secured": False, + "value": value, + } + return {"@type": "FunctionArgument", "name": name, "data": data} + + +ARGUMENT_NAME_ALIASES: dict[str, dict[str, str]] = { + "wait": {"waitDuration": "duration"}, +} + + +def command_id_from_element(element: dict[str, Any]) -> str: + command = element.get("command", "") + subcommand = element.get("subcommand") or "" + if subcommand: + return f"{command}_{subcommand}" + return command + + +def _normalize_argument_names(command_id: str, arguments: dict[str, Any]) -> dict[str, Any]: + aliases = ARGUMENT_NAME_ALIASES.get(command_id, {}) + normalized: dict[str, Any] = {} + for name, value in arguments.items(): + canonical = aliases.get(name, name) + normalized[canonical] = value + return normalized + + +def _drop_superseded_argument_aliases( + command_id: str, + arguments: dict[str, tuple[str, Any]], +) -> None: + for alias, canonical in ARGUMENT_NAME_ALIASES.get(command_id, {}).items(): + if alias != canonical and canonical in arguments: + arguments.pop(alias, None) + + +def _default_arguments(command_id: str) -> dict[str, tuple[str, Any]]: + defaults: dict[str, dict[str, tuple[str, Any]]] = { + "ai_user-action": { + "handsetId": ("VARIABLE", "DUT"), + "action": ("CONSTANT", ""), + }, + "ai_validation": { + "handsetId": ("VARIABLE", "DUT"), + "validation": ("CONSTANT", ""), + }, + "ai_visual-comparison": { + "handsetId": ("VARIABLE", "DUT"), + "name": ("CONSTANT", ""), + }, + "comment": { + "text": ("CONSTANT", ""), + }, + "wait": { + "duration": ("CONSTANT", "1"), + }, + "handset_ready": { + "handsetId": ("VARIABLE", "DUT"), + }, + "touch_tap": { + "handsetId": ("VARIABLE", "DUT"), + }, + "checkpoint_text": { + "handsetId": ("VARIABLE", "DUT"), + }, + "checkpoint_image": { + "handsetId": ("VARIABLE", "DUT"), + }, + } + return defaults.get(command_id, {"handsetId": ("VARIABLE", "DUT")}) + + +def build_arguments(command_id: str, arguments: Optional[dict[str, Any]]) -> list[dict[str, Any]]: + merged: dict[str, tuple[str, Any]] = _default_arguments(command_id) + if arguments: + for name, value in _normalize_argument_names(command_id, arguments).items(): + if isinstance(value, dict) and "data_source" in value: + merged[name] = (value["data_source"], value.get("value")) + else: + merged[name] = ("CONSTANT", value) + _drop_superseded_argument_aliases(command_id, merged) + return [_make_argument(name, value, source) for name, (source, value) in merged.items()] + + +def build_flow_element(command_id: str, arguments: Optional[dict[str, Any]] = None) -> dict[str, Any]: + command, subcommand = parse_command_id(command_id) + element_type = element_type_for_command(command, subcommand) + return { + "@type": element_type, + "validations": [], + "errorPolicy": default_error_policy(element_type), + "command": command, + "subcommand": subcommand, + "arguments": build_arguments(command_id, arguments), + "comment": None, + "status": None, + "active": True, + } + + +CONTAINER_TYPES = frozenset({"LogicalStep", "Loop", "Branch"}) + +# Dot-separated positional paths (no spaces). Examples: 0, 2.0, 5.b0, 5.b0.1 +# Segments are either a 0-based index (0, 1, 2) or a branch marker (b0=Then, b1=Else). +STEP_PATH_PATTERN = re.compile(r"^(?:\d+|b\d+)(?:\.(?:\d+|b\d+))*$") + +VARIABLE_TYPE_ALIASES = { + "string": "StringData", + "secured_string": "StringData", + "number": "IntegerData", + "boolean": "BooleanData", + "device": "HandsetData", + "media": "MediaData", + "datatable": "TableData", +} + +SUPPORTED_VARIABLE_TYPES = frozenset({"string", "secured_string", "number", "boolean"}) + + +def validate_variable_name(name: str) -> None: + if not name or not name.strip(): + raise ValueError("name is required") + if name[0].isdigit(): + raise ValueError("name cannot begin with a number") + if not all(char.isalnum() or char == "_" for char in name): + raise ValueError("name may contain only letters, numbers, and underscore") + + +def _coerce_variable_value(variable_type: str, value: Any) -> Any: + if variable_type == "boolean": + if isinstance(value, bool): + return value + normalized = str(value).lower() + if normalized not in ("true", "false"): + raise ValueError("boolean value must be true or false") + return normalized == "true" + if variable_type == "number": + try: + numeric = int(value) + except (TypeError, ValueError) as exc: + raise ValueError("number value must be an integer") from exc + return numeric + return "" if value is None else str(value) + + +def build_variable_data(variable_type: str, name: str, value: Any) -> dict[str, Any]: + if variable_type not in SUPPORTED_VARIABLE_TYPES: + raise ValueError( + f"Unsupported variable type: {variable_type}. " + f"Supported types: {', '.join(sorted(SUPPORTED_VARIABLE_TYPES))}" + ) + validate_variable_name(name) + coerced_value = _coerce_variable_value(variable_type, value) + data_type = VARIABLE_TYPE_ALIASES[variable_type] + data: dict[str, Any] = { + "@type": data_type, + "description": None, + "displayName": None, + "name": name, + "secured": variable_type == "secured_string", + "value": coerced_value, + } + if data_type == "HandsetData": + data["key"] = None + if data_type == "TableData": + data["columns"] = [] + return data + + +def build_variable_entry( + name: str, + variable_type: str, + value: Any, + set_at_runtime: bool = False, +) -> dict[str, Any]: + return { + "@type": "Parameter" if set_at_runtime else "Variable", + "data": build_variable_data(variable_type, name, value), + } + + +def find_variable(script: dict[str, Any], variable_name: str) -> Optional[tuple[int, dict[str, Any]]]: + for index, variable in enumerate(script.get("variables", [])): + data = variable.get("data", {}) + if data.get("name") == variable_name: + return index, variable + return None + + +def list_script_variables(script: dict[str, Any]) -> list[dict[str, Any]]: + return list(script.get("variables", [])) + + +def add_script_variable( + script: dict[str, Any], + name: str, + variable_type: str, + value: Any, + set_at_runtime: bool = False, +) -> dict[str, Any]: + validate_variable_name(name) + if find_variable(script, name): + raise ValueError(f"variable already exists: {name}") + if name == "DUT": + raise ValueError("DUT is a test parameter, not a script variable") + entry = build_variable_entry(name, variable_type, value, set_at_runtime) + script.setdefault("variables", []).append(entry) + return entry + + +def modify_script_variable( + script: dict[str, Any], + variable_name: str, + value: Optional[Any] = None, + variable_type: Optional[str] = None, + set_at_runtime: Optional[bool] = None, +) -> dict[str, Any]: + located = find_variable(script, variable_name) + if located is None: + raise ValueError(f"variable not found: {variable_name}") + _, variable = located + current_type = _variable_type_from_data(variable.get("data", {})) + target_type = variable_type or current_type + if target_type not in SUPPORTED_VARIABLE_TYPES: + raise ValueError( + f"Unsupported variable type: {target_type}. " + f"Supported types: {', '.join(sorted(SUPPORTED_VARIABLE_TYPES))}" + ) + current_value = variable.get("data", {}).get("value") + target_value = current_value if value is None else value + variable["data"] = build_variable_data(target_type, variable_name, target_value) + if set_at_runtime is not None: + variable["@type"] = "Parameter" if set_at_runtime else "Variable" + return variable + + +def delete_script_variable(script: dict[str, Any], variable_name: str) -> None: + located = find_variable(script, variable_name) + if located is None: + raise ValueError(f"variable not found: {variable_name}") + index, _ = located + script.get("variables", []).pop(index) + + +def _variable_type_from_data(data: dict[str, Any]) -> str: + data_type = data.get("@type", "") + if data_type == "StringData": + return "secured_string" if data.get("secured") else "string" + reverse = { + "BooleanData": "boolean", + "IntegerData": "number", + "HandsetData": "device", + "MediaData": "media", + "TableData": "datatable", + } + return reverse.get(data_type, "string") + + +def build_branch(clause: str) -> dict[str, Any]: + return { + "@type": "Branch", + "clause": clause, + "flowElements": [], + "numOfFlowElements": 0, + "empty": True, + "active": True, + "comment": None, + "status": None, + } + + +def build_logical_step(label: Optional[str] = None) -> dict[str, Any]: + return { + "@type": "LogicalStep", + "flowElements": [], + "active": True, + "label": label or "", + "comment": None, + "status": None, + } + + +def build_loop(count: int = 1) -> dict[str, Any]: + return { + "@type": "Loop", + "iterator": {"@type": "RepeatIterator", "count": count}, + "flowElements": [], + "active": True, + "comment": None, + "status": None, + } + + +def build_if_statement(expression: Optional[str] = None, label: Optional[str] = None) -> dict[str, Any]: + then_branch = build_branch("THEN") + else_branch = build_branch("ELSE") + statement: dict[str, Any] = { + "@type": "IfStatement", + "branches": [then_branch, else_branch], + "thenClause": build_branch("THEN"), + "elseClause": build_branch("ELSE"), + "label": label or "", + "numOfFlowElements": 3, + "comment": None, + "status": None, + "active": True, + } + if expression: + statement["expression"] = expression + return statement + + +def validate_step_path(step_path: str) -> None: + if not step_path or step_path.strip() != step_path or " " in step_path: + raise ValueError( + "step_path must be a dot-separated positional path without spaces " + "(e.g. 0, 2.0, 5.b0, 5.b0.1)" + ) + if not STEP_PATH_PATTERN.match(step_path): + raise ValueError( + f"invalid step_path: {step_path!r}. Use dot-separated indices, e.g. 0, 2.0, 5.b0.1" + ) + + +def find_step_path_for_element(script: dict[str, Any], target: dict[str, Any]) -> Optional[str]: + def walk(flow_elements: list[dict[str, Any]], prefix: str) -> Optional[str]: + for index, element in enumerate(flow_elements): + step_path = f"{prefix}{index}" + if element is target: + return step_path + nested = walk(element.get("flowElements", []), f"{step_path}.") + if nested: + return nested + if element.get("@type") == "IfStatement": + for branch_index, branch in enumerate(element.get("branches", [])): + branch_path = f"{step_path}.b{branch_index}" + if branch is target: + return branch_path + nested = walk(branch.get("flowElements", []), f"{branch_path}.") + if nested: + return nested + return None + + return walk(script.get("flowElements", []), "") + + +def find_element_by_path( + script: dict[str, Any], + step_path: str, +) -> Optional[tuple[list[dict[str, Any]], int, dict[str, Any]]]: + validate_step_path(step_path) + parts = step_path.split(".") + current_list = script.get("flowElements", []) + current_element: Optional[dict[str, Any]] = None + + for part_index, part in enumerate(parts): + if part.startswith("b"): + if current_element is None or current_element.get("@type") != "IfStatement": + return None + branch_index = int(part[1:]) + branches = current_element.get("branches", []) + if branch_index >= len(branches): + return None + current_element = branches[branch_index] + current_list = current_element.get("flowElements", []) + continue + + index = int(part) + if index >= len(current_list): + return None + current_element = current_list[index] + if part_index < len(parts) - 1: + next_part = parts[part_index + 1] + if not next_part.startswith("b"): + current_list = current_element.get("flowElements", []) + + if current_element is None: + return None + for elements, index, element in _iter_element_locations(script.get("flowElements", [])): + if element is current_element: + return elements, index, element + return None + + +def find_container_by_path(script: dict[str, Any], step_path: str) -> Optional[dict[str, Any]]: + located = find_element_by_path(script, step_path) + if located is None: + return None + _, _, element = located + return element + + +def strip_non_api_script_fields(script: dict[str, Any]) -> None: + def walk_element(element: dict[str, Any]) -> None: + element.pop("uuid", None) + for child in element.get("flowElements", []): + walk_element(child) + if element.get("@type") == "IfStatement": + for branch in element.get("branches", []): + walk_element(branch) + for clause_key in ("thenClause", "elseClause"): + clause = element.get(clause_key) + if clause: + walk_element(clause) + + for element in script.get("flowElements", []): + walk_element(element) + + +def new_empty_script() -> dict[str, Any]: + return { + "@type": "Script", + "parameters": [{ + "@type": "Parameter", + "data": { + "@type": "HandsetData", + "key": None, + "value": None, + "secured": False, + "description": None, + "displayName": None, + "name": "DUT", + }, + }], + "info": {"@type": "ScriptInfo", "description": "", "modelVersion": "1.0"}, + "options": {"@type": "ScriptOptions", "automaticAllocation": True}, + "variables": [], + "flowElements": [], + "numOfFlowElements": 0, + } + + +def _iter_element_locations(flow_elements: list[dict[str, Any]]): + for index, element in enumerate(flow_elements): + yield flow_elements, index, element + for child_list, child_index, child in _iter_element_locations(element.get("flowElements", [])): + yield child_list, child_index, child + if element.get("@type") == "IfStatement": + branches = element.get("branches", []) + for branch_index, branch in enumerate(branches): + yield branches, branch_index, branch + for child_list, child_index, child in _iter_element_locations(branch.get("flowElements", [])): + yield child_list, child_index, child + + +def update_flow_element_counts(script: dict[str, Any]) -> None: + script["numOfFlowElements"] = len(script.get("flowElements", [])) + + +def insert_flow_element( + script: dict[str, Any], + element: dict[str, Any], + after_path: Optional[str] = None, + parent_path: Optional[str] = None, +) -> None: + if parent_path: + parent = find_container_by_path(script, parent_path) + if parent is None: + raise ValueError(f"parent_path not found: {parent_path}") + if parent.get("@type") not in CONTAINER_TYPES: + raise ValueError( + f"parent_path must reference a container (LogicalStep, Loop, Branch): {parent_path}" + ) + parent.setdefault("flowElements", []).append(element) + return + + flow_elements = script.setdefault("flowElements", []) + if after_path: + located = find_element_by_path(script, after_path) + if located is None: + raise ValueError(f"after_path not found: {after_path}") + elements, index, _ = located + elements.insert(index + 1, element) + else: + flow_elements.append(element) + update_flow_element_counts(script) + + +def update_element_arguments(element: dict[str, Any], arguments: dict[str, Any]) -> None: + command_id = command_id_from_element(element) + existing = {argument["name"]: argument for argument in element.get("arguments", [])} + for name, value in _normalize_argument_names(command_id, arguments).items(): + if isinstance(value, dict) and "data_source" in value: + source = value["data_source"] + argument_value = value.get("value") + else: + source = "CONSTANT" + argument_value = value + existing[name] = _make_argument(name, argument_value, source) + for alias, canonical in ARGUMENT_NAME_ALIASES.get(command_id, {}).items(): + if alias != canonical and canonical in existing: + existing.pop(alias, None) + element["arguments"] = list(existing.values()) + + +def delete_element_by_path(script: dict[str, Any], step_path: str) -> None: + located = find_element_by_path(script, step_path) + if located is None: + raise ValueError(f"step_path not found: {step_path}") + elements, index, _ = located + elements.pop(index) + update_flow_element_counts(script) + + +def set_element_enabled(script: dict[str, Any], step_path: str, enabled: bool) -> None: + located = find_element_by_path(script, step_path) + if located is None: + raise ValueError(f"step_path not found: {step_path}") + _, _, element = located + element["active"] = enabled + + +def set_condition_expression(script: dict[str, Any], step_path: str, expression: str) -> None: + located = find_element_by_path(script, step_path) + if located is None: + raise ValueError(f"step_path not found: {step_path}") + _, _, element = located + if element.get("@type") != "IfStatement": + raise ValueError(f"step_path must reference an IfStatement: {step_path}") + element["expression"] = expression + + +def move_element_by_path( + script: dict[str, Any], + step_path: str, + after_path: Optional[str] = None, + parent_path: Optional[str] = None, +) -> None: + located = find_element_by_path(script, step_path) + if located is None: + raise ValueError(f"step_path not found: {step_path}") + source_list, source_index, element = located + source_list.pop(source_index) + + if parent_path: + parent = find_container_by_path(script, parent_path) + if parent is None: + raise ValueError(f"parent_path not found: {parent_path}") + if parent.get("@type") not in CONTAINER_TYPES: + raise ValueError( + f"parent_path must reference a container (LogicalStep, Loop, Branch): {parent_path}" + ) + parent.setdefault("flowElements", []).append(element) + elif after_path: + target = find_element_by_path(script, after_path) + if target is None: + raise ValueError(f"after_path not found: {after_path}") + target_list, target_index, _ = target + if target_list is source_list and source_index < target_index: + target_index -= 1 + target_list.insert(target_index + 1, element) + else: + script.setdefault("flowElements", []).append(element) + update_flow_element_counts(script) + + +async def fetch_script_payload(token: PerfectoToken, test_id: str) -> BaseResult: + script_url = perfecto.get_ai_scriptless_api_url(token.cloud_name) + script_url = script_url + f"/script?itemKey={quote(test_id, safe='')}" + return await api_request(token, "GET", endpoint=script_url) + + +async def fetch_current_username(token: PerfectoToken) -> Optional[str]: + user_url = perfecto.get_user_management_api_url(token.cloud_name) + "/current" + result = await api_request(token, "GET", endpoint=user_url) + if result.error or not isinstance(result.result, dict): + return None + return result.result.get("username") or result.result.get("userId") + + +_script_write_locks: dict[str, asyncio.Lock] = {} +_script_write_locks_guard = asyncio.Lock() + + +async def _get_script_write_lock(item_key: str) -> asyncio.Lock: + async with _script_write_locks_guard: + lock = _script_write_locks.get(item_key) + if lock is None: + lock = asyncio.Lock() + _script_write_locks[item_key] = lock + return lock + + +@asynccontextmanager +async def script_write_lock(item_key: str): + lock = await _get_script_write_lock(item_key) + async with lock: + yield + + +async def _persist_script( + token: PerfectoToken, + item_key: str, + script: dict[str, Any], + saved_script: Optional[dict[str, Any]] = None, + snapshot_comment: Optional[str] = None, +) -> BaseResult: + working_script = copy.deepcopy(script) + baseline_script = copy.deepcopy(saved_script or script) + strip_non_api_script_fields(working_script) + strip_non_api_script_fields(baseline_script) + update_flow_element_counts(working_script) + + draft_url = perfecto.get_ai_scriptless_draft_api_url(token.cloud_name) + draft_data = json.dumps({ + "unsavedScript": working_script, + "savedScript": baseline_script, + }) + draft_result = await api_request( + token, + "POST", + endpoint=draft_url, + json={ + "path": item_key, + "type": "MOBILE_IDE_SCRIPT", + "data": draft_data, + }, + ) + if draft_result.error: + return draft_result + draft_key = draft_result.result.get("key") + if not draft_key: + return BaseResult(error="Draft creation failed: missing draft key in response") + + script_url = perfecto.get_ai_scriptless_api_url(token.cloud_name) + "/script" + save_body: dict[str, Any] = { + "script": working_script, + "itemKey": item_key, + "draftKey": draft_key, + } + if snapshot_comment: + save_body["snapshotComment"] = snapshot_comment + save_result = await api_request( + token, + "POST", + endpoint=script_url, + json=save_body, + ) + if save_result.error: + return save_result + + result: dict[str, Any] = { + "item_key": item_key, + "draft_key": draft_key, + "status": save_result.result.get("status", "success") if isinstance(save_result.result, dict) else "success", + "flow_element_count": len(working_script.get("flowElements", [])), + } + if snapshot_comment: + result["snapshot_comment"] = snapshot_comment + result["notes"] = [ + "Perfecto adds a new snapshot history entry on every script save.", + "Use list_snapshots with test_id to see version history after saving.", + ] + if snapshot_comment: + result["notes"].append( + "The comment labels the '' entry in list_snapshots (UI: Save with comment)." + ) + else: + result["notes"].append( + "Saving without comment still creates a history entry; pass comment to label the current version." + ) + return BaseResult(result=result) + + +async def persist_script( + token: PerfectoToken, + item_key: str, + script: dict[str, Any], + saved_script: Optional[dict[str, Any]] = None, + snapshot_comment: Optional[str] = None, +) -> BaseResult: + async with script_write_lock(item_key): + return await _persist_script( + token, + item_key, + script, + saved_script, + snapshot_comment, + ) + + +async def load_and_mutate( + token: PerfectoToken, + test_id: str, + mutator, + snapshot_comment: Optional[str] = None, +) -> BaseResult: + async with script_write_lock(test_id): + payload_result = await fetch_script_payload(token, test_id) + if payload_result.error: + return payload_result + payload = payload_result.result + script = copy.deepcopy(payload.get("script", {})) + saved_script = copy.deepcopy(payload.get("script", {})) + try: + mutator(script) + except ValueError as exc: + return BaseResult(error=str(exc)) + return await _persist_script( + token, + test_id, + script, + saved_script, + snapshot_comment, + ) diff --git a/tools/utils.py b/tools/utils.py index 49402d1..dc9d79a 100644 --- a/tools/utils.py +++ b/tools/utils.py @@ -2,6 +2,7 @@ Simple utilities for Perfecto MCP tools. """ import base64 +import json import os import platform import sys @@ -52,7 +53,13 @@ async def api_request(token: Optional[PerfectoToken], method: str, endpoint: str try: resp = await client.request(method, endpoint, headers=headers, **kwargs) resp.raise_for_status() - result = resp.json() + if not resp.content or not resp.content.strip(): + result = None + else: + try: + result = resp.json() + except json.JSONDecodeError: + result = resp.text error = None if isinstance(result, list) and len(result) > 0 and "userMessage" in result[0]: # It's an error final_result = None