Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
pip install docker
pip install pygraphviz
pip install pydot
pip install igraph

- name: Check package install
run: |
Expand Down
19 changes: 18 additions & 1 deletion tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,24 @@ def _compare_workflows(workflow_1: Workflow, workflow_2: Workflow):
# Test the number of tasks
assert (len(workflow_1.tasks) == len(workflow_2.tasks))
# Test the task graph topology
assert (networkx.is_isomorphic(workflow_1, workflow_2))

# Slow (too slow for some DAGs) isomorphic check with networkx
# assert (networkx.is_isomorphic(workflow_1, workflow_2))

# Fast isomorphic check using igraph
import igraph as ig

def nx_to_igraph(G):
g = ig.Graph(directed=True)
g.add_vertices(list(G.nodes()))
g.add_edges(list(G.edges()))
return g

g1 = nx_to_igraph(workflow_1)
g2 = nx_to_igraph(workflow_2)

assert(g1.isomorphic(g2))

# Test the total file size sum
workflow1_input_bytes, workflow2_input_bytes = 0, 0
workflow1_output_bytes, workflow2_output_bytes = 0, 0
Expand Down
27 changes: 22 additions & 5 deletions tests/translators_loggers/test_translators_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from tests.test_helpers import _shutdown_docker_container_and_remove_image
from tests.test_helpers import _compare_workflows

from wfcommons import BlastRecipe
from wfcommons import BlastRecipe, EpigenomicsRecipe, BwaRecipe, CyclesRecipe, GenomeRecipe, MontageRecipe, \
RnaseqRecipe, SeismologyRecipe, SoykbRecipe, SrasearchRecipe
from wfcommons.common import Workflow, Task
from wfcommons.wfbench import WorkflowBenchmark
from wfcommons.wfbench import DaskTranslator
Expand All @@ -48,11 +49,29 @@
def _create_workflow_benchmark() -> (WorkflowBenchmark, int):
# Create a workflow benchmark object to generate specifications based on a recipe (in /tmp/, whatever)
desired_num_tasks = 45
benchmark_full_path = "/tmp/blast-benchmark-{desired_num_tasks}.json"
benchmark_full_path = f"/tmp/blast-benchmark-{desired_num_tasks}.json"
# benchmark_full_path = f"/tmp/epigenomics-benchmark-{desired_num_tasks}.json"
# benchmark_full_path = f"/tmp/bwa-benchmark-{desired_num_tasks}.json"
# benchmark_full_path = f"/tmp/cycles-benchmark-{desired_num_tasks}.json"
# benchmark_full_path = f"/tmp/genome-benchmark-{desired_num_tasks}.json"
# benchmark_full_path = f"/tmp/montage-benchmark-{desired_num_tasks}.json"
# benchmark_full_path = f"/tmp/rnaseq-benchmark-{desired_num_tasks}.json"
# benchmark_full_path = f"/tmp/seismology-benchmark-{desired_num_tasks}.json"
# benchmark_full_path = f"/tmp/soykb-benchmark-{desired_num_tasks}.json"
# benchmark_full_path = f"/tmp/srasearch-benchmark-{desired_num_tasks}.json"
shutil.rmtree(benchmark_full_path, ignore_errors=True)
benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=desired_num_tasks)
# benchmark = WorkflowBenchmark(recipe=EpigenomicsRecipe, num_tasks=desired_num_tasks)
# benchmark = WorkflowBenchmark(recipe=BwaRecipe, num_tasks=desired_num_tasks)
# benchmark = WorkflowBenchmark(recipe=CyclesRecipe, num_tasks=desired_num_tasks)
# benchmark = WorkflowBenchmark(recipe=GenomeRecipe, num_tasks=desired_num_tasks)
# benchmark = WorkflowBenchmark(recipe=MontageRecipe, num_tasks=desired_num_tasks)
# benchmark = WorkflowBenchmark(recipe=RnaseqRecipe, num_tasks=desired_num_tasks)
# benchmark = WorkflowBenchmark(recipe=SeismologyRecipe, num_tasks=desired_num_tasks)
# benchmark = WorkflowBenchmark(recipe=SoykbRecipe, num_tasks=desired_num_tasks)
# benchmark = WorkflowBenchmark(recipe=SrasearchRecipe, num_tasks=desired_num_tasks)
benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=10, data=10, percent_cpu=0.6)
with open(f"/tmp/blast-benchmark-{desired_num_tasks}.json", "r") as f:
with open(benchmark_full_path, "r") as f:
generated_json = json.load(f)
num_tasks = len(generated_json["workflow"]["specification"]["tasks"])
return benchmark, num_tasks
Expand Down Expand Up @@ -196,7 +215,6 @@ def run_workflow_cwl(container, num_tasks, str_dirpath):

def run_workflow_streamflow(container, num_tasks, str_dirpath):
# Run the workflow!
# Note that the input file is hardcoded and Blast-specific
exit_code, output = container.exec_run(cmd="streamflow run ./streamflow.yml",
user="wfcommons", stdout=True, stderr=True)
# print(output.decode())
Expand Down Expand Up @@ -343,7 +361,6 @@ def test_translator(self, backend) -> None:
sys.stderr.write(f"[{backend}] Parsing the logs...\n")
reconstructed_workflow : Workflow = parser.build_workflow(f"reconstructed_workflow_{backend}")
reconstructed_workflow.write_json(pathlib.Path("/tmp/reconstructed_workflow.json"))

original_workflow : Workflow = benchmark.workflow

_compare_workflows(original_workflow, reconstructed_workflow)
Expand Down
2 changes: 1 addition & 1 deletion wfcommons/wfbench/translator/streamflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _generate_inputs_file(self, output_folder: pathlib.Path) -> None:
out.write(f"{task.task_id}_input:\n")
for f in task.input_files:
out.write(f" - class: File\n")
out.write(f" path: ./data/{f}")
out.write(f" path: ./data/{f}\n")



Expand Down
1 change: 0 additions & 1 deletion wfcommons/wfinstances/logs/ro_crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def _create_tasks(self, create_actions, main_workflow_id):
instruments = {}

for create_action in create_actions:

# Handle overall workflow create_action then skip
if create_action["name"] == f"Run of workflow/{main_workflow_id}":
self._process_main_workflow(create_action)
Expand Down
Loading