From 9e9f7cb3b72d5ae6b329dc0d7f70085072f9d0a3 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Tue, 14 Apr 2026 17:34:13 -1000 Subject: [PATCH 1/2] Bug-- in RO-Crate translator Using igraph to determine graph isomorphism in tests (WAY faster than networkx) --- .github/workflows/build.yml | 1 + tests/test_helpers.py | 19 +++++++++++++- .../test_translators_loggers.py | 25 ++++++++++++++++--- wfcommons/wfbench/translator/streamflow.py | 2 +- wfcommons/wfinstances/logs/ro_crate.py | 1 - 5 files changed, 41 insertions(+), 7 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 9633b36d..86c0c764 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -32,6 +32,7 @@ jobs: pip install docker pip install pygraphviz pip install pydot + pip install igraph - name: Check package install run: | diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 724425f8..c2bab93d 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -152,7 +152,24 @@ def _compare_workflows(workflow_1: Workflow, workflow_2: Workflow): # Test the number of tasks assert (len(workflow_1.tasks) == len(workflow_2.tasks)) # Test the task graph topology - assert (networkx.is_isomorphic(workflow_1, workflow_2)) + + # Slow (too slow for some DAGs) isomorphic check with networkx + # assert (networkx.is_isomorphic(workflow_1, workflow_2)) + + # Fast isomorphic check using igraph + import igraph as ig + + def nx_to_igraph(G): + g = ig.Graph(directed=True) + g.add_vertices(list(G.nodes())) + g.add_edges(list(G.edges())) + return g + + g1 = nx_to_igraph(workflow_1) + g2 = nx_to_igraph(workflow_2) + + assert(g1.isomorphic(g2)) + # Test the total file size sum workflow1_input_bytes, workflow2_input_bytes = 0, 0 workflow1_output_bytes, workflow2_output_bytes = 0, 0 diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 2eccca46..586ad9a7 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -24,7 +24,8 @@ from tests.test_helpers import _shutdown_docker_container_and_remove_image from tests.test_helpers import _compare_workflows -from wfcommons import BlastRecipe +from wfcommons import BlastRecipe, EpigenomicsRecipe, BwaRecipe, CyclesRecipe, GenomeRecipe, MontageRecipe, \ + RnaseqRecipe, SeismologyRecipe, SoykbRecipe, SrasearchRecipe from wfcommons.common import Workflow, Task from wfcommons.wfbench import WorkflowBenchmark from wfcommons.wfbench import DaskTranslator @@ -49,10 +50,28 @@ def _create_workflow_benchmark() -> (WorkflowBenchmark, int): # Create a workflow benchmark object to generate specifications based on a recipe (in /tmp/, whatever) desired_num_tasks = 45 benchmark_full_path = "/tmp/blast-benchmark-{desired_num_tasks}.json" + # benchmark_full_path = f"/tmp/epigenomics-benchmark-{desired_num_tasks}.json" + # benchmark_full_path = f"/tmp/bwa-benchmark-{desired_num_tasks}.json" + # benchmark_full_path = f"/tmp/cycles-benchmark-{desired_num_tasks}.json" + # benchmark_full_path = f"/tmp/genome-benchmark-{desired_num_tasks}.json" + # benchmark_full_path = f"/tmp/montage-benchmark-{desired_num_tasks}.json" + # benchmark_full_path = f"/tmp/rnaseq-benchmark-{desired_num_tasks}.json" + # benchmark_full_path = f"/tmp/seismology-benchmark-{desired_num_tasks}.json" + # benchmark_full_path = f"/tmp/soykb-benchmark-{desired_num_tasks}.json" + # benchmark_full_path = f"/tmp/srasearch-benchmark-{desired_num_tasks}.json" shutil.rmtree(benchmark_full_path, ignore_errors=True) benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=desired_num_tasks) + # benchmark = WorkflowBenchmark(recipe=EpigenomicsRecipe, num_tasks=desired_num_tasks) + # benchmark = WorkflowBenchmark(recipe=BwaRecipe, num_tasks=desired_num_tasks) + # benchmark = WorkflowBenchmark(recipe=CyclesRecipe, num_tasks=desired_num_tasks) + # benchmark = WorkflowBenchmark(recipe=GenomeRecipe, num_tasks=desired_num_tasks) + # benchmark = WorkflowBenchmark(recipe=MontageRecipe, num_tasks=desired_num_tasks) + # benchmark = WorkflowBenchmark(recipe=RnaseqRecipe, num_tasks=desired_num_tasks) + # benchmark = WorkflowBenchmark(recipe=SeismologyRecipe, num_tasks=desired_num_tasks) + # benchmark = WorkflowBenchmark(recipe=SoykbRecipe, num_tasks=desired_num_tasks) + # benchmark = WorkflowBenchmark(recipe=SrasearchRecipe, num_tasks=desired_num_tasks) benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=10, data=10, percent_cpu=0.6) - with open(f"/tmp/blast-benchmark-{desired_num_tasks}.json", "r") as f: + with open(benchmark_full_path, "r") as f: generated_json = json.load(f) num_tasks = len(generated_json["workflow"]["specification"]["tasks"]) return benchmark, num_tasks @@ -196,7 +215,6 @@ def run_workflow_cwl(container, num_tasks, str_dirpath): def run_workflow_streamflow(container, num_tasks, str_dirpath): # Run the workflow! - # Note that the input file is hardcoded and Blast-specific exit_code, output = container.exec_run(cmd="streamflow run ./streamflow.yml", user="wfcommons", stdout=True, stderr=True) # print(output.decode()) @@ -343,7 +361,6 @@ def test_translator(self, backend) -> None: sys.stderr.write(f"[{backend}] Parsing the logs...\n") reconstructed_workflow : Workflow = parser.build_workflow(f"reconstructed_workflow_{backend}") reconstructed_workflow.write_json(pathlib.Path("/tmp/reconstructed_workflow.json")) - original_workflow : Workflow = benchmark.workflow _compare_workflows(original_workflow, reconstructed_workflow) diff --git a/wfcommons/wfbench/translator/streamflow.py b/wfcommons/wfbench/translator/streamflow.py index d4d3d026..c165ddf1 100644 --- a/wfcommons/wfbench/translator/streamflow.py +++ b/wfcommons/wfbench/translator/streamflow.py @@ -80,7 +80,7 @@ def _generate_inputs_file(self, output_folder: pathlib.Path) -> None: out.write(f"{task.task_id}_input:\n") for f in task.input_files: out.write(f" - class: File\n") - out.write(f" path: ./data/{f}") + out.write(f" path: ./data/{f}\n") diff --git a/wfcommons/wfinstances/logs/ro_crate.py b/wfcommons/wfinstances/logs/ro_crate.py index 4eecda91..0d9b6a90 100644 --- a/wfcommons/wfinstances/logs/ro_crate.py +++ b/wfcommons/wfinstances/logs/ro_crate.py @@ -136,7 +136,6 @@ def _create_tasks(self, create_actions, main_workflow_id): instruments = {} for create_action in create_actions: - # Handle overall workflow create_action then skip if create_action["name"] == f"Run of workflow/{main_workflow_id}": self._process_main_workflow(create_action) From 0cbea629ad95e846dce79773c6342867de03dec6 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Tue, 14 Apr 2026 18:22:05 -1000 Subject: [PATCH 2/2] typo-- --- tests/translators_loggers/test_translators_loggers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 586ad9a7..b8187e77 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -49,7 +49,7 @@ def _create_workflow_benchmark() -> (WorkflowBenchmark, int): # Create a workflow benchmark object to generate specifications based on a recipe (in /tmp/, whatever) desired_num_tasks = 45 - benchmark_full_path = "/tmp/blast-benchmark-{desired_num_tasks}.json" + benchmark_full_path = f"/tmp/blast-benchmark-{desired_num_tasks}.json" # benchmark_full_path = f"/tmp/epigenomics-benchmark-{desired_num_tasks}.json" # benchmark_full_path = f"/tmp/bwa-benchmark-{desired_num_tasks}.json" # benchmark_full_path = f"/tmp/cycles-benchmark-{desired_num_tasks}.json"