diff --git a/requirements-runners.txt b/requirements-runners.txt index 7923650..d1c31a8 100644 --- a/requirements-runners.txt +++ b/requirements-runners.txt @@ -1,6 +1,4 @@ ARS_Test_Runner==0.2.4 # benchmarks-runner==0.1.3 # ui-test-runner==0.0.2 -graph-validation-test-runners==0.1.5 -# we need to manually pin the right reasoner-validator version -reasoner-validator==4.2.5 +# graph-validation-test-runners==0.1.5 diff --git a/requirements.txt b/requirements.txt index 7dbe719..59f0bac 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ pydantic==2.7.1 setproctitle==1.3.3 slack_sdk==3.27.2 tqdm==4.66.4 -translator-testing-model==0.3.2 +translator-testing-model==0.4.1 +reasoner-validator==4.2.5 diff --git a/test_harness/download.py b/test_harness/download.py index e84295c..58d2dd8 100644 --- a/test_harness/download.py +++ b/test_harness/download.py @@ -10,14 +10,18 @@ from typing import List, Union, Dict import zipfile -from translator_testing_model.datamodel.pydanticmodel import TestCase, TestSuite +from translator_testing_model.datamodel.pydanticmodel import ( + TestCase, + PathfinderTestCase, + TestSuite, +) def download_tests( suite: Union[str, List[str]], url: Path, logger: logging.Logger, -) -> Dict[str, TestCase]: +) -> Dict[str, Union[TestCase, PathfinderTestCase]]: """Download tests from specified location.""" assert Path(url).suffix == ".zip" logger.info(f"Downloading tests from {url}...") diff --git a/test_harness/pathfinder_test_runner.py b/test_harness/pathfinder_test_runner.py new file mode 100644 index 0000000..0799d31 --- /dev/null +++ b/test_harness/pathfinder_test_runner.py @@ -0,0 +1,44 @@ +from typing import Dict, Union, List + + +async def pathfinder_pass_fail_analysis( + report: Dict[str, any], + agent: str, + message: Dict[str, any], + path_nodes: List[List[str]], + minimum_required_path_nodes: int, +) -> Dict[str, any]: + found_path_nodes = set() + unmatched_paths = set() + for analysis in message["results"][0]["analyses"]: + for path_bindings in analysis["path_bindings"].values(): + for path_binding in path_bindings: + path_id = path_binding["id"] + matching_path_nodes = set() + for edge_id in message["auxiliary_graphs"][path_id]["edges"]: + edge = message["knowledge_graph"]["edges"][edge_id] + for node_curies in path_nodes: + unhit_node = True + for curie in node_curies: + if curie in matching_path_nodes: + unhit_node = False + if unhit_node: + if edge["subject"] in node_curies: + matching_path_nodes.add(edge["subject"]) + if edge["object"] in node_curies: + matching_path_nodes.add(edge["object"]) + if len(matching_path_nodes) >= minimum_required_path_nodes: + found_path_nodes.add(",".join(matching_path_nodes)) + elif len(matching_path_nodes) > 0: + unmatched_paths.add(",".join(matching_path_nodes)) + + if len(found_path_nodes) > 0: + report[agent]["status"] = "PASSED" + report[agent]["expected_nodes_found"] = "; ".join(found_path_nodes) + elif len(unmatched_paths) > 0: + report[agent]["status"] = "FAILED" + report[agent]["expected_nodes_found"] = "; ".join(unmatched_paths) + else: + report[agent]["status"] = "FAILED" + + return report diff --git a/test_harness/reporter.py b/test_harness/reporter.py index 51b1778..5a953fb 100644 --- a/test_harness/reporter.py +++ b/test_harness/reporter.py @@ -4,9 +4,14 @@ import httpx import logging import os -from typing import List +from typing import List, Union -from translator_testing_model.datamodel.pydanticmodel import TestCase, TestAsset +from translator_testing_model.datamodel.pydanticmodel import ( + TestCase, + PathfinderTestCase, + TestAsset, + PathfinderTestAsset, +) class Reporter: @@ -63,25 +68,36 @@ async def create_test_run(self, test_env, suite_name): self.test_run_id = res_json["id"] return self.test_run_id - async def create_test(self, test: TestCase, asset: TestAsset): + async def create_test( + self, + test: Union[TestCase, PathfinderTestCase], + asset: Union[TestAsset, PathfinderTestAsset], + ): """Create a test in the IR.""" name = asset.name if asset.name else asset.description - res = await self.authenticated_client.post( - url=f"{self.base_path}/api/reporting/v1/test-runs/{self.test_run_id}/tests", - json={ - "name": name, - "className": test.name, - "methodName": asset.name, - "startedAt": datetime.now().astimezone().isoformat(), - "labels": [ - { - "key": "TestCase", - "value": test.id, - }, - { - "key": "TestAsset", - "value": asset.id, - }, + test_json = { + "name": name, + "className": test.name, + "methodName": asset.name, + "startedAt": datetime.now().astimezone().isoformat(), + "labels": [ + { + "key": "TestCase", + "value": test.id, + }, + { + "key": "TestAsset", + "value": asset.id, + }, + { + "key": "ExpectedOutput", + "value": asset.expected_output, + }, + ], + } + if isinstance(test, TestCase) and isinstance(asset, TestAsset): + test_json["labels"].extend( + [ { "key": "InputCurie", "value": asset.input_id, @@ -90,12 +106,28 @@ async def create_test(self, test: TestCase, asset: TestAsset): "key": "OutputCurie", "value": asset.output_id, }, + ] + ) + elif isinstance(test, PathfinderTestCase) and isinstance( + asset, PathfinderTestAsset + ): + test_json["labels"].extend( + [ + { + "key": "SourceInputCurie", + "value": asset.source_input_id, + }, { - "key": "ExpectedOutput", - "value": asset.expected_output, + "key": "TargetInputCurie", + "value": asset.target_input_id, }, - ], - }, + ] + ) + else: + raise Exception + res = await self.authenticated_client.post( + url=f"{self.base_path}/api/reporting/v1/test-runs/{self.test_run_id}/tests", + json=test_json, ) res.raise_for_status() res_json = res.json() diff --git a/test_harness/result_collector.py b/test_harness/result_collector.py index d075e09..622ba29 100644 --- a/test_harness/result_collector.py +++ b/test_harness/result_collector.py @@ -2,7 +2,12 @@ import logging from typing import Union -from translator_testing_model.datamodel.pydanticmodel import TestAsset, TestCase +from translator_testing_model.datamodel.pydanticmodel import ( + TestAsset, + PathfinderTestAsset, + TestCase, + PathfinderTestCase, +) from test_harness.utils import get_tag @@ -43,8 +48,8 @@ def __init__(self, logger: logging.Logger): def collect_result( self, - test: TestCase, - asset: TestAsset, + test: Union[TestCase, PathfinderTestCase], + asset: Union[TestAsset, PathfinderTestAsset], report: dict, parent_pk: Union[str, None], url: str, diff --git a/test_harness/run.py b/test_harness/run.py index fb85d8d..0384931 100644 --- a/test_harness/run.py +++ b/test_harness/run.py @@ -5,26 +5,31 @@ import time from tqdm import tqdm import traceback -from typing import Dict +from typing import Dict, Union from ARS_Test_Runner.semantic_test import pass_fail_analysis -from standards_validation_test_runner import StandardsValidationTest + +# from standards_validation_test_runner import StandardsValidationTest # from benchmarks_runner import run_benchmarks -from translator_testing_model.datamodel.pydanticmodel import TestCase +from translator_testing_model.datamodel.pydanticmodel import ( + TestCase, + PathfinderTestCase, +) from test_harness.runner.query_runner import QueryRunner from test_harness.reporter import Reporter from test_harness.slacker import Slacker from test_harness.result_collector import ResultCollector from test_harness.utils import get_tag, hash_test_asset +from test_harness.pathfinder_test_runner import pathfinder_pass_fail_analysis async def run_tests( reporter: Reporter, slacker: Slacker, - tests: Dict[str, TestCase], + tests: Dict[str, Union[TestCase, PathfinderTestCase]], logger: logging.Logger = logging.getLogger(__name__), args: Dict[str, any] = {}, ) -> Dict: @@ -71,7 +76,7 @@ async def run_tests( try: test_id = await reporter.create_test(test, asset) test_ids.append(test_id) - except Exception: + except Exception as e: logger.error(f"Failed to create test: {test.id}") continue @@ -91,9 +96,24 @@ async def run_tests( "pks": test_query["pks"], "result": {}, } + if isinstance(test, PathfinderTestCase): + report["test_details"] = { + "minimum_required_path_nodes": asset.minimum_required_path_nodes, + "expected_path_nodes": "; ".join( + [ + ",".join( + [ + normalized_curies[path_node_id] + for path_node_id in path_node.ids + ] + ) + for path_node in asset.path_nodes + ] + ), + } for agent, response in test_query["responses"].items(): report["result"][agent] = { - "trapi_validation": "NA", + # "trapi_validation": "NA", } agent_report = report["result"][agent] try: @@ -117,28 +137,28 @@ async def run_tests( logger.warning( f"Failed to parse basic response fields from {agent}: {e}" ) - try: - svt = StandardsValidationTest( - test_asset=asset, - environment=test.test_env, - component=agent, - trapi_version=args["trapi_version"], - biolink_version="suppress", - runner_settings="Inferred", - ) - results = svt.test_case_processor( - trapi_response=response["response"] - ) - agent_report["trapi_validation"] = results[ - next(iter(results.keys())) - ][agent]["status"] - if agent_report["trapi_validation"] == "FAILED": - agent_report["status"] = "FAILED" - agent_report["message"] = "TRAPI Validation Error" - continue - except Exception as e: - logger.warning(f"Failed to run TRAPI validation with {e}") - agent_report["trapi_validation"] = "ERROR" + # try: + # svt = StandardsValidationTest( + # test_asset=asset, + # environment=test.test_env, + # component=agent, + # trapi_version=args["trapi_version"], + # biolink_version="suppress", + # runner_settings="Inferred", + # ) + # results = svt.test_case_processor( + # trapi_response=response["response"] + # ) + # agent_report["trapi_validation"] = results[ + # next(iter(results.keys())) + # ][agent]["status"] + # if agent_report["trapi_validation"] == "FAILED": + # agent_report["status"] = "FAILED" + # agent_report["message"] = "TRAPI Validation Error" + # continue + # except Exception as e: + # logger.warning(f"Failed to run TRAPI validation with {e}") + # agent_report["trapi_validation"] = "ERROR" try: if ( response["response"]["message"].get("results") is None @@ -147,13 +167,28 @@ async def run_tests( agent_report["status"] = "DONE" agent_report["message"] = "No results" continue - await pass_fail_analysis( - report["result"], - agent, - response["response"]["message"]["results"], - normalized_curies[asset.output_id], - asset.expected_output, - ) + if isinstance(test, PathfinderTestCase): + await pathfinder_pass_fail_analysis( + report["result"], + agent, + response["response"]["message"], + [ + [ + normalized_curies[path_node_id] + for path_node_id in path_node.ids + ] + for path_node in asset.path_nodes + ], + asset.minimum_required_path_nodes, + ) + else: + await pass_fail_analysis( + report["result"], + agent, + response["response"]["message"]["results"], + normalized_curies[asset.output_id], + asset.expected_output, + ) except Exception as e: logger.error( f"Failed to run acceptance test analysis on {agent}: {e}" diff --git a/test_harness/runner/generate_query.py b/test_harness/runner/generate_query.py index 2a585c4..8da5d5d 100644 --- a/test_harness/runner/generate_query.py +++ b/test_harness/runner/generate_query.py @@ -1,7 +1,11 @@ """Given a Test Asset, generate a TRAPI query.""" import copy -from translator_testing_model.datamodel.pydanticmodel import TestAsset +from typing import Union +from translator_testing_model.datamodel.pydanticmodel import ( + TestAsset, + PathfinderTestAsset, +) from test_harness.utils import get_qualifier_constraints @@ -56,11 +60,43 @@ } } +PATHFINDER = { + "message": { + "query_graph": { + "nodes": { + "SN": { + "set_interpretation": "BATCH", + "constraints": [], + "member_ids": [], + }, + "ON": { + "set_interpretation": "BATCH", + "constraints": [], + "member_ids": [], + }, + }, + "paths": {"p0": {"subject": "SN", "object": "ON"}}, + } + } +} + -def generate_query(test_asset: TestAsset) -> dict: +def generate_query(test_asset: Union[TestAsset, PathfinderTestAsset]) -> dict: """Generate a TRAPI query.""" query = {} - if test_asset.predicate_id == "biolink:treats": + if isinstance(test_asset, PathfinderTestAsset): + source_id = test_asset.source_input_id + target_id = test_asset.target_input_id + query = copy.deepcopy(PATHFINDER) + query["message"]["query_graph"]["nodes"]["SN"] = { + "ids": [source_id], + "categories": [test_asset.source_input_category], + } + query["message"]["query_graph"]["nodes"]["ON"] = { + "ids": [target_id], + "categories": [test_asset.target_input_category], + } + elif test_asset.predicate_id == "biolink:treats": # MVP1 query = copy.deepcopy(MVP1) # add id to node diff --git a/test_harness/runner/query_runner.py b/test_harness/runner/query_runner.py index 07c2882..6f9470c 100644 --- a/test_harness/runner/query_runner.py +++ b/test_harness/runner/query_runner.py @@ -4,9 +4,12 @@ import httpx import logging import time -from typing import Tuple, Dict +from typing import Tuple, Dict, Union -from translator_testing_model.datamodel.pydanticmodel import TestCase +from translator_testing_model.datamodel.pydanticmodel import ( + TestCase, + PathfinderTestCase, +) from test_harness.runner.smart_api_registry import retrieve_registry_from_smartapi from test_harness.runner.generate_query import generate_query from test_harness.utils import hash_test_asset, normalize_curies @@ -79,64 +82,6 @@ async def run_query( return query_hash, responses, pks - async def run_queries( - self, - test_case: TestCase, - concurrency: int = 1, # for performance testing - ) -> Tuple[Dict[int, dict], Dict[str, str]]: - """Run all queries specified in a Test Case.""" - # normalize all the curies in a test case - normalized_curies = await normalize_curies(test_case, self.logger) - # TODO: figure out the right way to handle input category wrt normalization - - queries: Dict[int, dict] = {} - for test_asset in test_case.test_assets: - test_asset.input_id = normalized_curies[test_asset.input_id] - # TODO: make this better - asset_hash = hash_test_asset(test_asset) - if asset_hash not in queries: - # generate query - try: - query = generate_query(test_asset) - queries[asset_hash] = { - "query": query, - "responses": {}, - "pks": {}, - } - except Exception as e: - self.logger.warning(e) - - # send queries to a single type of component at a time - for component in test_case.components: - # component = "ara" - # loop over all specified components, i.e. ars, ara, kp, utilities - semaphore = asyncio.Semaphore(concurrency) - self.logger.info( - f"Sending queries to {self.registry[env_map[test_case.test_env]][component]}" - ) - tasks = [ - asyncio.create_task( - self.run_query( - query_hash, - semaphore, - query["query"], - service["url"], - service["infores"], - ) - ) - for service in self.registry[env_map[test_case.test_env]][component] - for query_hash, query in queries.items() - ] - try: - all_responses = await asyncio.gather(*tasks, return_exceptions=True) - for query_hash, responses, pks in all_responses: - queries[query_hash]["responses"].update(responses) - queries[query_hash]["pks"].update(pks) - except Exception as e: - self.logger.error(f"Something went wrong with the queries: {e}") - - return queries, normalized_curies - async def get_ars_child_response( self, child_pk: str, @@ -306,3 +251,69 @@ async def get_ars_responses( } return responses, pks + + async def run_queries( + self, + test_case: Union[TestCase, PathfinderTestCase], + concurrency: int = 1, # for performance testing + ) -> Tuple[Dict[int, dict], Dict[str, str]]: + """Run all queries specified in a Test Case.""" + # normalize all the curies in a test case + normalized_curies = await normalize_curies(test_case, self.logger) + # TODO: figure out the right way to handle input category wrt normalization + + queries: Dict[int, dict] = {} + for test_asset in test_case.test_assets: + if isinstance(test_case, PathfinderTestCase): + test_asset.source_input_id = normalized_curies[ + test_asset.source_input_id + ] + test_asset.target_input_id = normalized_curies[ + test_asset.target_input_id + ] + else: + test_asset.input_id = normalized_curies[test_asset.input_id] + # TODO: make this better + asset_hash = hash_test_asset(test_asset) + if asset_hash not in queries: + # generate query + try: + query = generate_query(test_asset) + queries[asset_hash] = { + "query": query, + "responses": {}, + "pks": {}, + } + except Exception as e: + self.logger.warning(e) + + # send queries to a single type of component at a time + for component in test_case.components: + # component = "ara" + # loop over all specified components, i.e. ars, ara, kp, utilities + semaphore = asyncio.Semaphore(concurrency) + self.logger.info( + f"Sending queries to {self.registry[env_map[test_case.test_env]][component]}" + ) + tasks = [ + asyncio.create_task( + self.run_query( + query_hash, + semaphore, + query["query"], + service["url"], + service["infores"], + ) + ) + for service in self.registry[env_map[test_case.test_env]][component] + for query_hash, query in queries.items() + ] + try: + all_responses = await asyncio.gather(*tasks, return_exceptions=True) + for query_hash, responses, pks in all_responses: + queries[query_hash]["responses"].update(responses) + queries[query_hash]["pks"].update(pks) + except Exception as e: + self.logger.error(f"Something went wrong with the queries: {e}") + + return queries, normalized_curies diff --git a/test_harness/utils.py b/test_harness/utils.py index a50b2c1..e78041a 100644 --- a/test_harness/utils.py +++ b/test_harness/utils.py @@ -4,7 +4,12 @@ import logging from typing import Dict, Union, List, Tuple -from translator_testing_model.datamodel.pydanticmodel import TestCase, TestAsset +from translator_testing_model.datamodel.pydanticmodel import ( + TestCase, + PathfinderTestCase, + TestAsset, + PathfinderTestAsset, +) NODE_NORM_URL = { "dev": "https://nodenormalization-sri.renci.org/1.4", @@ -15,15 +20,27 @@ async def normalize_curies( - test: TestCase, + test: Union[TestCase, PathfinderTestCase], logger: logging.Logger = logging.getLogger(__name__), ) -> Dict[str, Dict[str, Union[Dict[str, str], List[str]]]]: """Normalize a list of curies.""" node_norm = NODE_NORM_URL.get(test.test_env) # collect all curies from test - curies = set([asset.output_id for asset in test.test_assets]) - curies.update([asset.input_id for asset in test.test_assets]) - curies.add(test.test_case_input_id) + if isinstance(test, PathfinderTestCase): + curies = set([asset.source_input_id for asset in test.test_assets]) + curies.update([asset.target_input_id for asset in test.test_assets]) + curies.update( + [ + path_node_id + for asset in test.test_assets + for path_node in asset.path_nodes + for path_node_id in path_node.ids + ] + ) + else: + curies = set([asset.output_id for asset in test.test_assets]) + curies.update([asset.input_id for asset in test.test_assets]) + curies.add(test.test_case_input_id) normalized_curies = {} async with httpx.AsyncClient() as client: @@ -63,15 +80,25 @@ def get_tag(result): return tag -def hash_test_asset(test_asset: TestAsset) -> int: +def hash_test_asset(test_asset: Union[TestAsset, PathfinderTestAsset]) -> int: """Given a test asset, return its unique hash.""" - asset_hash = hash( - ( - test_asset.input_id, - test_asset.predicate_id, - *[qualifier.value for qualifier in test_asset.qualifiers], + if isinstance(test_asset, PathfinderTestAsset): + asset_hash = hash( + ( + test_asset.source_input_id, + test_asset.target_input_id, + test_asset.predicate_id, + *[qualifier.value for qualifier in (test_asset.qualifiers or [])], + ) + ) + else: + asset_hash = hash( + ( + test_asset.input_id, + test_asset.predicate_id, + *[qualifier.value for qualifier in test_asset.qualifiers], + ) ) - ) return asset_hash diff --git a/tests/helpers/example_tests.py b/tests/helpers/example_tests.py index 672fcd1..40eb295 100644 --- a/tests/helpers/example_tests.py +++ b/tests/helpers/example_tests.py @@ -103,7 +103,67 @@ "test_case_predicate_id": "biolink:treats", "test_case_input_id": "MONDO:0010794", "test_runner_settings": ["inferred"], - } + }, + "TestCase_2": { + "id": "TestCase_2", + "name": "imatinib to asthma", + "description": "imatinib to asthma", + "tags": [], + "test_runner_settings": ["pathfinder"], + "query_type": None, + "test_assets": [ + { + "id": "PTFQ_1", + "name": "Imatinib to Asthma", + "description": "Imatinib to Asthma", + "tags": [], + "test_runner_settings": ["pathfinder"], + "source_input_id": "CHEBI:31690", + "source_input_name": "Imatinib", + "source_input_category": "biolink:Drug", + "target_input_id": "MONDO:0004979", + "target_input_name": "Asthma", + "target_input_category": "biolink:Disease", + "predicate_id": "biolink:related_to", + "predicate_name": "related to", + "minimum_required_path_nodes": 2, + "path_nodes": [ + {"ids": ["NCBIGene:3815"], "name": "KIT"}, + { + "ids": ["CHEBI:18295", "PR:000049994"], + "name": "Histamine", + }, + {"ids": ["NCBIGene:4254"], "name": "SCF-1"}, + {"ids": ["CL:0000097"], "name": "Mast Cell"}, + ], + "association": None, + "qualifiers": None, + "expected_output": "TopAnswer", + "test_issue": None, + "semantic_severity": None, + "in_v1": None, + "well_known": False, + "test_reference": None, + "test_metadata": { + "id": "1", + "name": None, + "description": None, + "tags": [], + "test_runner_settings": [], + "test_source": "SMURF", + "test_reference": None, + "test_objective": "AcceptanceTest", + "test_annotations": [], + }, + } + ], + "preconditions": [], + "trapi_template": None, + "test_case_objective": "AcceptanceTest", + "test_case_source": None, + "components": ["ars"], + "test_env": "ci", + }, }, } ).test_cases diff --git a/tests/test_run.py b/tests/test_run.py index d4c4ae1..517eaa9 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -35,6 +35,13 @@ async def test_run_tests(mocker, httpx_mock: HTTPXMock): "MONDO:0010794": None, "DRUGBANK:DB00313": None, "MESH:D001463": None, + "CHEBI:18295": None, + "CHEBI:31690": None, + "CL:0000097": None, + "MONDO:0004979": None, + "NCBIGene:3815": None, + "NCBIGene:4254": None, + "PR:000049994": None, }, ) full_report = await run_tests( @@ -46,7 +53,7 @@ async def test_run_tests(mocker, httpx_mock: HTTPXMock): logger=logger, args={ "suite": "testing", - "trapi_version": "1.5.0", + "trapi_version": "1.6.0", }, ) - assert full_report["SKIPPED"] == 2 + assert full_report["SKIPPED"] == 3