From cd52a19040278ca8ddefd2c077c877a479300916 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 10 Apr 2026 11:56:41 -1000 Subject: [PATCH 1/5] new Dockerfile --- .../translators_loggers/Dockerfile.snakemake | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 tests/translators_loggers/Dockerfile.snakemake diff --git a/tests/translators_loggers/Dockerfile.snakemake b/tests/translators_loggers/Dockerfile.snakemake new file mode 100644 index 00000000..f44202b8 --- /dev/null +++ b/tests/translators_loggers/Dockerfile.snakemake @@ -0,0 +1,60 @@ +# docker build --platform amd64 -t wfcommons-dev-snakemake -f Dockerfile.snakemake . +# docker run -it --rm -v `pwd`:/home/wfcommons wfcommons-dev-snakemake /bin/bash + +FROM amd64/ubuntu:noble + +LABEL org.containers.image.authors="henric@hawaii.edu" + +# update repositories +RUN apt-get update + +# set timezone +RUN echo "America/Los_Angeles" > /etc/timezone && export DEBIAN_FRONTEND=noninteractive && apt-get install -y tzdata + +# install useful stuff +RUN apt-get -y install pkg-config +RUN apt-get -y install git +RUN apt-get -y install wget +RUN apt-get -y install curl +RUN apt-get -y install make +RUN apt-get -y install cmake +RUN apt-get -y install cmake-data +RUN apt-get -y install sudo +RUN apt-get -y install vim --fix-missing +RUN apt-get -y install gcc +RUN apt-get -y install gcc-multilib +RUN apt-get -y install graphviz libgraphviz-dev + + +# Python stuff +RUN apt-get -y install python3 python3-pip +RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 +RUN python3 -m pip install --break-system-packages pathos pandas filelock +RUN python3 -m pip install --break-system-packages networkx scipy matplotlib pygraphviz +RUN python3 -m pip install --break-system-packages pyyaml jsonschema requests +RUN python3 -m pip install --break-system-packages --upgrade setuptools + +# Stress-ng +RUN apt-get -y install stress-ng + +# Add wfcommons user +RUN useradd -ms /bin/bash wfcommons +RUN adduser wfcommons sudo +RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers +ENV PATH="$PATH:/home/wfcommons/.local/bin/" + +USER wfcommons +WORKDIR /home/wfcommons +# Making this directory world rwx to facilitate testing +RUN chmod -R 777 /home/wfcommons + + +# Install Pixi +RUN wget -qO- https://pixi.sh/install.sh | sh +ENV PATH="$PATH:/home/wfcommons/.pixi/bin" + +# Install snakemake +RUN pixi global install snakemake conda -c conda-forge -c bioconda +RUN pixi global install snakedeploy -c conda-forge -c bioconda + + From fa150bf662bbf5c77293567eeeba9d2950ad1d9d Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Mon, 27 Apr 2026 11:54:11 -1000 Subject: [PATCH 2/5] Implemented a working Snakemake translator --- .../test_translators_loggers.py | 38 ++++-- wfcommons/wfbench/__init__.py | 1 + wfcommons/wfbench/translator/__init__.py | 1 + wfcommons/wfbench/translator/snakemake.py | 126 ++++++++++++++++++ 4 files changed, 154 insertions(+), 12 deletions(-) create mode 100644 wfcommons/wfbench/translator/snakemake.py diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 2eccca46..be869ce1 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -34,6 +34,7 @@ from wfcommons.wfbench import BashTranslator from wfcommons.wfbench import TaskVineTranslator from wfcommons.wfbench import MakeflowTranslator +from wfcommons.wfbench import SnakemakeTranslator from wfcommons.wfbench import CWLTranslator from wfcommons.wfbench import StreamflowTranslator from wfcommons.wfbench import PegasusTranslator @@ -116,6 +117,7 @@ def _additional_setup_swiftt(container): "bash": noop, "taskvine": _additional_setup_taskvine, "makeflow": noop, + "snakemake": noop, "cwl": noop, "streamflow": noop, "pegasus": _additional_setup_pegasus, @@ -182,6 +184,15 @@ def run_workflow_makeflow(container, num_tasks, str_dirpath): num_completed_jobs = len(re.findall(r'job \d+ completed', output.decode())) assert (num_completed_jobs == num_tasks) +def run_workflow_snakemake(container, num_tasks, str_dirpath): + # Run the workflow (with full logging) + exit_code, output = container.exec_run(cmd=["bash", "-c", "snakemake -s ./workflow.smk --cores 1"], + user="wfcommons", stdout=True, stderr=True) + # Check sanity + assert (exit_code == 0) + num_completed_jobs = len(re.findall(r'Finished jobid: \d+', output.decode())) + assert (num_completed_jobs - 1 == num_tasks) # Discounting the "all_tasks" rule + def run_workflow_cwl(container, num_tasks, str_dirpath): # Run the workflow! # Note that the input file is hardcoded and Blast-specific @@ -243,6 +254,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath): "bash": run_workflow_bash, "taskvine": run_workflow_taskvine, "makeflow": run_workflow_makeflow, + "snakemake": run_workflow_snakemake, "cwl": run_workflow_cwl, "streamflow": run_workflow_streamflow, "pegasus": run_workflow_pegasus, @@ -258,6 +270,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath): "bash": BashTranslator, "taskvine": TaskVineTranslator, "makeflow": MakeflowTranslator, + "snakemake": SnakemakeTranslator, "cwl": CWLTranslator, "streamflow": StreamflowTranslator, "pegasus": PegasusTranslator, @@ -270,18 +283,19 @@ class TestTranslators: @pytest.mark.parametrize( "backend", [ - "swiftt", - "dask", - "parsl", - "nextflow", - "nextflow_subworkflow", - "airflow", - "bash", - "taskvine", - "makeflow", - "cwl", - "streamflow", - "pegasus", + # "swiftt", + # "dask", + # "parsl", + # "nextflow", + # "nextflow_subworkflow", + # "airflow", + # "bash", + # "taskvine", + # "makeflow", + "snakemake", + # "cwl", + # "streamflow", + # "pegasus", ]) @pytest.mark.unit # @pytest.mark.skip(reason="tmp") diff --git a/wfcommons/wfbench/__init__.py b/wfcommons/wfbench/__init__.py index 42d62c75..1c546c45 100644 --- a/wfcommons/wfbench/__init__.py +++ b/wfcommons/wfbench/__init__.py @@ -18,6 +18,7 @@ SwiftTTranslator, TaskVineTranslator, MakeflowTranslator, + SnakemakeTranslator, CWLTranslator, StreamflowTranslator, PyCompssTranslator) diff --git a/wfcommons/wfbench/translator/__init__.py b/wfcommons/wfbench/translator/__init__.py index 1e3b2a15..07f4e0bd 100644 --- a/wfcommons/wfbench/translator/__init__.py +++ b/wfcommons/wfbench/translator/__init__.py @@ -20,3 +20,4 @@ from .swift_t import SwiftTTranslator from .taskvine import TaskVineTranslator from .makeflow import MakeflowTranslator +from .snakemake import SnakemakeTranslator diff --git a/wfcommons/wfbench/translator/snakemake.py b/wfcommons/wfbench/translator/snakemake.py new file mode 100644 index 00000000..bc91d494 --- /dev/null +++ b/wfcommons/wfbench/translator/snakemake.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2024-2025 The WfCommons Team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +import pathlib +import shutil + +from logging import Logger +from typing import Optional, Union + +from .abstract_translator import Translator +from ...common import Workflow + +this_dir = pathlib.Path(__file__).resolve().parent + +class SnakemakeTranslator(Translator): + """ + A WfFormat parser for creating Snakemake workflow applications. + + :param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance. + :type workflow: Union[Workflow, pathlib.Path], + :param logger: The logger where to log information/warning or errors (optional). + :type logger: Logger + """ + def __init__(self, + workflow: Union[Workflow, pathlib.Path], + logger: Optional[Logger] = None) -> None: + """Create an object of the translator.""" + super().__init__(workflow, logger) + self._script = "" + + def translate(self, output_folder: pathlib.Path) -> None: + """ + Translate a workflow benchmark description (WfFormat) into an actual workflow application. + + :param output_folder: The path to the folder in which the workflow benchmark will be generated. + :type output_folder: pathlib.Path + """ + + # Generate code + self._generate_code() + + # write benchmark files + output_folder.mkdir(parents=True) + with open(output_folder.joinpath("workflow.smk"), "w") as fp: + fp.write(self._script) + + # additional files + self._copy_binary_files(output_folder) + self._generate_input_files(output_folder) + + # README file + self._write_readme_file(output_folder) + + def _generate_code(self): + """ + Generate the Makeflow code + + :return: the code + :rtype: str + """ + all_rule = ("# Rule to force all task executions\n" + "rule all_tasks:\n" + "\tinput:\n") + + self._script = "\n# WfBench task rules\n" + for task_name, task in self.workflow.tasks.items(): + rule = f"rule {task_name}:\n" + # input files + rule += "\tinput:\n" + for input_file in task.input_files: + rule += f"\t\t\"data/{input_file.file_id}\",\n" + # output files + rule += "\toutput:\n" + for output_file in task.output_files: + all_rule += f"\t\t\"data/{output_file.file_id}\",\n" + rule += f"\t\t\"data/{output_file.file_id}\",\n" + # shell + rule += "\tshell:\n" + rule += "\t\t'" + task.program + " '\n" + + input_spec = "\\'[" + for file in task.input_files: + input_spec += f"\"data/{file.file_id}\"," + input_spec = input_spec[:-1] + "]\\'" + + output_spec = "\\'{{" + for file in task.output_files: + output_spec += f"\"data/{file.file_id}\":{str(file.size)}," + output_spec = output_spec[:-1] + "}}\\'" + + args = [] + for a in task.args: + if "--output-files" in a: + args.append(f"--output-files {output_spec}") + elif "--input-files" in a: + args.append(f"--input-files {input_spec}") + else: + args.append(a) + + for a in args: + rule += "\t\t'" + a + " '\n" + + self._script += rule + "\n\n" + + self._script = all_rule + self._script + return + + def _write_readme_file(self, output_folder: pathlib.Path) -> None: + """ + Write the README file. + + :param output_folder: The path of the output folder. + :type output_folder: pathlib.Path + """ + readme_file_path = output_folder.joinpath("README") + with open(readme_file_path, "w") as out: + out.write(f"In directory {str(output_folder)}:\n") + out.write(f" - The Snakemake file: workflow.smk\n") + out.write(f" - Run the workflow: snakemake -s workflow.smk --cores 1\n") From c2404a30f335ce75ffc4936b7bd1d6d8987ca073 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Mon, 27 Apr 2026 20:53:50 -1000 Subject: [PATCH 3/5] Implemented a Snakemake log parser --- .../translators_loggers/Dockerfile.snakemake | 3 +- .../test_translators_loggers.py | 5 +- wfcommons/wfbench/translator/snakemake.py | 4 +- wfcommons/wfinstances/logs/__init__.py | 1 + wfcommons/wfinstances/logs/snakemake.py | 185 ++++++++++++++++++ 5 files changed, 194 insertions(+), 4 deletions(-) create mode 100644 wfcommons/wfinstances/logs/snakemake.py diff --git a/tests/translators_loggers/Dockerfile.snakemake b/tests/translators_loggers/Dockerfile.snakemake index f44202b8..25f033a6 100644 --- a/tests/translators_loggers/Dockerfile.snakemake +++ b/tests/translators_loggers/Dockerfile.snakemake @@ -56,5 +56,6 @@ ENV PATH="$PATH:/home/wfcommons/.pixi/bin" # Install snakemake RUN pixi global install snakemake conda -c conda-forge -c bioconda RUN pixi global install snakedeploy -c conda-forge -c bioconda - +RUN ~/.pixi/envs/snakemake/bin/python -m ensurepip && \ + ~/.pixi/envs/snakemake/bin/python -m pip install snakemake-logger-plugin-snkmt diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index be869ce1..b7539d22 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -44,6 +44,7 @@ from wfcommons.wfinstances.logs import TaskVineLogsParser from wfcommons.wfinstances.logs import MakeflowLogsParser from wfcommons.wfinstances.logs import ROCrateLogsParser +from wfcommons.wfinstances.logs import SnakemakeLogsParser def _create_workflow_benchmark() -> (WorkflowBenchmark, int): @@ -186,7 +187,7 @@ def run_workflow_makeflow(container, num_tasks, str_dirpath): def run_workflow_snakemake(container, num_tasks, str_dirpath): # Run the workflow (with full logging) - exit_code, output = container.exec_run(cmd=["bash", "-c", "snakemake -s ./workflow.smk --cores 1"], + exit_code, output = container.exec_run(cmd=["bash", "-c", "snakemake -s ./workflow.smk --cores 1 --logger snkmt --logger-snkmt-db ./snkmt.sqlite"], user="wfcommons", stdout=True, stderr=True) # Check sanity assert (exit_code == 0) @@ -352,6 +353,8 @@ def test_translator(self, backend) -> None: steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"], file_extensions_to_ignore=[".out", ".err"], instruments_to_ignore=["shell.cwl"]) + elif backend == "snakemake": + parser = SnakemakeLogsParser(dirpath, snkmt_db=dirpath / "snkmt.sqlite", rules_to_ignore=["all_wfbench_tasks"]) if parser is not None: sys.stderr.write(f"[{backend}] Parsing the logs...\n") diff --git a/wfcommons/wfbench/translator/snakemake.py b/wfcommons/wfbench/translator/snakemake.py index bc91d494..37280c96 100644 --- a/wfcommons/wfbench/translator/snakemake.py +++ b/wfcommons/wfbench/translator/snakemake.py @@ -66,7 +66,7 @@ def _generate_code(self): :rtype: str """ all_rule = ("# Rule to force all task executions\n" - "rule all_tasks:\n" + "rule all_wfbench_tasks:\n" "\tinput:\n") self._script = "\n# WfBench task rules\n" @@ -123,4 +123,4 @@ def _write_readme_file(self, output_folder: pathlib.Path) -> None: with open(readme_file_path, "w") as out: out.write(f"In directory {str(output_folder)}:\n") out.write(f" - The Snakemake file: workflow.smk\n") - out.write(f" - Run the workflow: snakemake -s workflow.smk --cores 1\n") + out.write(f" - Run the workflow: snakemake -s workflow.smk --cores 1 [--logger snkmt --logger-snkmt-db ./snkmt.sqlite]\n") diff --git a/wfcommons/wfinstances/logs/__init__.py b/wfcommons/wfinstances/logs/__init__.py index 3613af2f..b5c3627d 100644 --- a/wfcommons/wfinstances/logs/__init__.py +++ b/wfcommons/wfinstances/logs/__init__.py @@ -14,3 +14,4 @@ from .pegasus import PegasusLogsParser from .pegasusrec import HierarchicalPegasusLogsParser from .ro_crate import ROCrateLogsParser +from .snakemake import SnakemakeLogsParser diff --git a/wfcommons/wfinstances/logs/snakemake.py b/wfcommons/wfinstances/logs/snakemake.py new file mode 100644 index 00000000..284ef760 --- /dev/null +++ b/wfcommons/wfinstances/logs/snakemake.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2021 The WfCommons Team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +import json +import itertools +import math +import os +import pathlib + +from datetime import datetime, timezone +from logging import Logger +from typing import List, Optional +from datetime import datetime + +from .abstract_logs_parser import LogsParser +from ...common.file import File +from ...common.machine import Machine +from ...common.task import Task, TaskType +from ...common.workflow import Workflow +import sqlite3 + +class SnakemakeLogsParser(LogsParser): + """ + Parse a Snakemake execution directory to generate workflow instance. It requires the path of + a sqlite database file generated by the `snkmt` logger plugin when the workflow is executed + as `snakemake ----logger snkmt --logger-snkmt-db ./snkmt.sqlite ...` + + :param execution_dir: Snakemake execution dir (all workflow input/output file, and also typically the Snakefile). + :type execution_dir: pathlib.Path + :param snkmt_db: The snkmt sqlite database file + :type snkmt_db: pathlib.Path + :param description: Workflow instance description. + :type description: Optional[str] + :param logger: The logger where to log information/warning or errors (optional). + :type logger: Optional[Logger] + :param rules_to_ignore: Names of Snakemake rules that should be ignored in the translation + :type rules_to_ignore: Optional[list[str]] + """ + + def __init__(self, + execution_dir: pathlib.Path, + snkmt_db: pathlib.Path, + description: Optional[str] = None, + logger: Optional[Logger] = None, + rules_to_ignore: Optional[list[str]] = None, + ) -> None: + """Create an object of the Snakemake parser.""" + + super().__init__('Snakemake', 'https://snakemake.readthedocs.io/', description, logger) + + # Sanity checks + if not execution_dir.is_dir(): + raise OSError(f'The provided path does not exist or is not a folder: {execution_dir}') + if not snkmt_db.is_file(): + raise OSError(f'The provided path does not exist or is not a folder: {snkmt_db}') + + self.execution_dir : pathlib.Path = execution_dir + self.snkmt_db: pathlib.Path = snkmt_db + + self.file_map = {} + self.file_objects = {} + self.task_map = {} + self.task_input_files = {} + self.task_output_files = {} + self.file_input_output = {} + + self.rules_to_ignore : list[str] = rules_to_ignore or [] + + + def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow: + """ + Create workflow instance based on the workflow execution logs. + + :param workflow_name: The workflow name. + :type workflow_name: Optional[str] + + :return: A workflow instance object. + :rtype: Workflow + """ + self.workflow_name = workflow_name + + # Create base workflow instance object + self.workflow = Workflow(name=self.workflow_name, + description=self.description, + runtime_system_name=self.wms_name, + runtime_system_url=self.wms_url) + + # Parse the sqlite db for to identify task + self._build_task_map() + + # Parse the sqlite db for to identify files + self._build_file_map() + + # Parse the sqlite db for to identify tasks + self._create_tasks() + + return self.workflow + + def _build_task_map(self): + conn = sqlite3.connect(self.snkmt_db) + cursor = conn.cursor() + cursor.execute("SELECT * FROM rules") + rows = cursor.fetchall() + for row in rows: + idx = row[0] + task_name = row[1] + if task_name in self.rules_to_ignore: + continue + self.task_map[idx] = task_name + self.task_input_files[idx] = [] + self.task_output_files[idx] = [] + + def _build_file_map(self): + conn = sqlite3.connect(self.snkmt_db) + cursor = conn.cursor() + cursor.execute("SELECT * FROM files") + rows = cursor.fetchall() + for row in rows: + task_idx = row[3] + if task_idx not in self.task_input_files and task_idx not in self.task_output_files: + continue + full_path = row[1] + relative_path = row[1].removeprefix(self.execution_dir.as_posix()) + file_size = os.path.getsize(f"{full_path}") + input_or_output = row[2] + if input_or_output == 'INPUT': + self.task_input_files[task_idx].append(full_path) + if full_path not in self.file_input_output: + self.file_input_output[full_path] = [[],[]] + self.file_input_output[full_path][0].append(task_idx) + elif input_or_output == 'OUTPUT': + self.task_output_files[task_idx].append(full_path) + if full_path not in self.file_input_output: + self.file_input_output[full_path] = [[],[]] + self.file_input_output[full_path][1].append(task_idx) + + if full_path not in self.file_map: + self.file_objects[full_path] = File(file_id=relative_path, size=file_size) + + def _create_tasks(self): + conn = sqlite3.connect(self.snkmt_db) + cursor = conn.cursor() + cursor.execute("SELECT * FROM jobs") + rows = cursor.fetchall() + for row in rows: + idx = row[0] + if idx not in self.task_map: # One of the to-ignore ones + continue + cmd_line = row[8] + start_date = row[12] + end_date = row[13] + fmt = "%Y-%m-%d %H:%M:%S.%f" # adjust format to match your strings + t1 = datetime.strptime(start_date, fmt) + t2 = datetime.strptime(end_date, fmt) + elapsed = (t2 - t1).total_seconds() + + input_files = [self.file_objects[path] for path in self.task_input_files[idx]] + output_files = [self.file_objects[path] for path in self.task_output_files[idx]] + + + task = Task(name=self.task_map[idx], + task_id=self.task_map[idx], + task_type=TaskType.COMPUTE, + runtime=elapsed, + executed_at=start_date, + input_files=input_files, + output_files=output_files, + logger=self.logger) + self.workflow.add_task(task) + + # File dependencies + for file in self.file_input_output: + input_to = self.file_input_output[file][0] + output_from = self.file_input_output[file][1] + for input_idx in input_to: + for output_idx in output_from: + # print(f"ADDING DEPENDENCY: {self.task_map[output_idx]} --> {self.task_map[input_idx]}") + self.workflow.add_dependency(self.task_map[output_idx], self.task_map[input_idx]) \ No newline at end of file From 5cc9ea6d0c1638be49a28eb6805668e51ff6a9b7 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Mon, 27 Apr 2026 21:20:59 -1000 Subject: [PATCH 4/5] re-enabled all tests --- .../test_translators_loggers.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index b7539d22..cd9b95db 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -284,19 +284,19 @@ class TestTranslators: @pytest.mark.parametrize( "backend", [ - # "swiftt", - # "dask", - # "parsl", - # "nextflow", - # "nextflow_subworkflow", - # "airflow", - # "bash", - # "taskvine", - # "makeflow", + "swiftt", + "dask", + "parsl", + "nextflow", + "nextflow_subworkflow", + "airflow", + "bash", + "taskvine", + "makeflow", "snakemake", - # "cwl", - # "streamflow", - # "pegasus", + "cwl", + "streamflow", + "pegasus", ]) @pytest.mark.unit # @pytest.mark.skip(reason="tmp") From da45fd8bce07ea0b43739081725a15effbf8f85a Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Tue, 28 Apr 2026 10:22:53 -1000 Subject: [PATCH 5/5] Bug-- in Snakemake log parser Tested on a non-WfCommons workflow --- wfcommons/wfinstances/logs/snakemake.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/wfcommons/wfinstances/logs/snakemake.py b/wfcommons/wfinstances/logs/snakemake.py index 284ef760..9ac7ce33 100644 --- a/wfcommons/wfinstances/logs/snakemake.py +++ b/wfcommons/wfinstances/logs/snakemake.py @@ -42,6 +42,9 @@ class SnakemakeLogsParser(LogsParser): :type logger: Optional[Logger] :param rules_to_ignore: Names of Snakemake rules that should be ignored in the translation :type rules_to_ignore: Optional[list[str]] + :param path_prefix_rewrite: A tuple that specifies that a file path prefix(for the workflow data files) + should be replaced by another prefix (this is useful when the workflow execution was on a + different machine than the log parsing) """ def __init__(self, @@ -50,6 +53,7 @@ def __init__(self, description: Optional[str] = None, logger: Optional[Logger] = None, rules_to_ignore: Optional[list[str]] = None, + path_prefix_rewrite: Optional[tuple[str, str]] = None ) -> None: """Create an object of the Snakemake parser.""" @@ -72,6 +76,7 @@ def __init__(self, self.file_input_output = {} self.rules_to_ignore : list[str] = rules_to_ignore or [] + self.path_prefix_rewrite : tuple[str, str] = path_prefix_rewrite or None def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow: @@ -127,7 +132,8 @@ def _build_file_map(self): if task_idx not in self.task_input_files and task_idx not in self.task_output_files: continue full_path = row[1] - relative_path = row[1].removeprefix(self.execution_dir.as_posix()) + if self.path_prefix_rewrite: + full_path = full_path.replace(self.path_prefix_rewrite[0], self.path_prefix_rewrite[1]) file_size = os.path.getsize(f"{full_path}") input_or_output = row[2] if input_or_output == 'INPUT': @@ -142,7 +148,7 @@ def _build_file_map(self): self.file_input_output[full_path][1].append(task_idx) if full_path not in self.file_map: - self.file_objects[full_path] = File(file_id=relative_path, size=file_size) + self.file_objects[full_path] = File(file_id=full_path, size=file_size) def _create_tasks(self): conn = sqlite3.connect(self.snkmt_db) @@ -164,7 +170,6 @@ def _create_tasks(self): input_files = [self.file_objects[path] for path in self.task_input_files[idx]] output_files = [self.file_objects[path] for path in self.task_output_files[idx]] - task = Task(name=self.task_map[idx], task_id=self.task_map[idx], task_type=TaskType.COMPUTE, @@ -181,5 +186,5 @@ def _create_tasks(self): output_from = self.file_input_output[file][1] for input_idx in input_to: for output_idx in output_from: - # print(f"ADDING DEPENDENCY: {self.task_map[output_idx]} --> {self.task_map[input_idx]}") + # print(f"Adding dependency: {self.task_map[output_idx]} --> {self.task_map[input_idx]}") self.workflow.add_dependency(self.task_map[output_idx], self.task_map[input_idx]) \ No newline at end of file