diff --git a/tests/translators_loggers/Dockerfile.snakemake b/tests/translators_loggers/Dockerfile.snakemake new file mode 100644 index 00000000..25f033a6 --- /dev/null +++ b/tests/translators_loggers/Dockerfile.snakemake @@ -0,0 +1,61 @@ +# docker build --platform amd64 -t wfcommons-dev-snakemake -f Dockerfile.snakemake . +# docker run -it --rm -v `pwd`:/home/wfcommons wfcommons-dev-snakemake /bin/bash + +FROM amd64/ubuntu:noble + +LABEL org.containers.image.authors="henric@hawaii.edu" + +# update repositories +RUN apt-get update + +# set timezone +RUN echo "America/Los_Angeles" > /etc/timezone && export DEBIAN_FRONTEND=noninteractive && apt-get install -y tzdata + +# install useful stuff +RUN apt-get -y install pkg-config +RUN apt-get -y install git +RUN apt-get -y install wget +RUN apt-get -y install curl +RUN apt-get -y install make +RUN apt-get -y install cmake +RUN apt-get -y install cmake-data +RUN apt-get -y install sudo +RUN apt-get -y install vim --fix-missing +RUN apt-get -y install gcc +RUN apt-get -y install gcc-multilib +RUN apt-get -y install graphviz libgraphviz-dev + + +# Python stuff +RUN apt-get -y install python3 python3-pip +RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 +RUN python3 -m pip install --break-system-packages pathos pandas filelock +RUN python3 -m pip install --break-system-packages networkx scipy matplotlib pygraphviz +RUN python3 -m pip install --break-system-packages pyyaml jsonschema requests +RUN python3 -m pip install --break-system-packages --upgrade setuptools + +# Stress-ng +RUN apt-get -y install stress-ng + +# Add wfcommons user +RUN useradd -ms /bin/bash wfcommons +RUN adduser wfcommons sudo +RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers +ENV PATH="$PATH:/home/wfcommons/.local/bin/" + +USER wfcommons +WORKDIR /home/wfcommons +# Making this directory world rwx to facilitate testing +RUN chmod -R 777 /home/wfcommons + + +# Install Pixi +RUN wget -qO- https://pixi.sh/install.sh | sh +ENV PATH="$PATH:/home/wfcommons/.pixi/bin" + +# Install snakemake +RUN pixi global install snakemake conda -c conda-forge -c bioconda +RUN pixi global install snakedeploy -c conda-forge -c bioconda +RUN ~/.pixi/envs/snakemake/bin/python -m ensurepip && \ + ~/.pixi/envs/snakemake/bin/python -m pip install snakemake-logger-plugin-snkmt + diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 2eccca46..cd9b95db 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -34,6 +34,7 @@ from wfcommons.wfbench import BashTranslator from wfcommons.wfbench import TaskVineTranslator from wfcommons.wfbench import MakeflowTranslator +from wfcommons.wfbench import SnakemakeTranslator from wfcommons.wfbench import CWLTranslator from wfcommons.wfbench import StreamflowTranslator from wfcommons.wfbench import PegasusTranslator @@ -43,6 +44,7 @@ from wfcommons.wfinstances.logs import TaskVineLogsParser from wfcommons.wfinstances.logs import MakeflowLogsParser from wfcommons.wfinstances.logs import ROCrateLogsParser +from wfcommons.wfinstances.logs import SnakemakeLogsParser def _create_workflow_benchmark() -> (WorkflowBenchmark, int): @@ -116,6 +118,7 @@ def _additional_setup_swiftt(container): "bash": noop, "taskvine": _additional_setup_taskvine, "makeflow": noop, + "snakemake": noop, "cwl": noop, "streamflow": noop, "pegasus": _additional_setup_pegasus, @@ -182,6 +185,15 @@ def run_workflow_makeflow(container, num_tasks, str_dirpath): num_completed_jobs = len(re.findall(r'job \d+ completed', output.decode())) assert (num_completed_jobs == num_tasks) +def run_workflow_snakemake(container, num_tasks, str_dirpath): + # Run the workflow (with full logging) + exit_code, output = container.exec_run(cmd=["bash", "-c", "snakemake -s ./workflow.smk --cores 1 --logger snkmt --logger-snkmt-db ./snkmt.sqlite"], + user="wfcommons", stdout=True, stderr=True) + # Check sanity + assert (exit_code == 0) + num_completed_jobs = len(re.findall(r'Finished jobid: \d+', output.decode())) + assert (num_completed_jobs - 1 == num_tasks) # Discounting the "all_tasks" rule + def run_workflow_cwl(container, num_tasks, str_dirpath): # Run the workflow! # Note that the input file is hardcoded and Blast-specific @@ -243,6 +255,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath): "bash": run_workflow_bash, "taskvine": run_workflow_taskvine, "makeflow": run_workflow_makeflow, + "snakemake": run_workflow_snakemake, "cwl": run_workflow_cwl, "streamflow": run_workflow_streamflow, "pegasus": run_workflow_pegasus, @@ -258,6 +271,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath): "bash": BashTranslator, "taskvine": TaskVineTranslator, "makeflow": MakeflowTranslator, + "snakemake": SnakemakeTranslator, "cwl": CWLTranslator, "streamflow": StreamflowTranslator, "pegasus": PegasusTranslator, @@ -279,6 +293,7 @@ class TestTranslators: "bash", "taskvine", "makeflow", + "snakemake", "cwl", "streamflow", "pegasus", @@ -338,6 +353,8 @@ def test_translator(self, backend) -> None: steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"], file_extensions_to_ignore=[".out", ".err"], instruments_to_ignore=["shell.cwl"]) + elif backend == "snakemake": + parser = SnakemakeLogsParser(dirpath, snkmt_db=dirpath / "snkmt.sqlite", rules_to_ignore=["all_wfbench_tasks"]) if parser is not None: sys.stderr.write(f"[{backend}] Parsing the logs...\n") diff --git a/wfcommons/wfbench/__init__.py b/wfcommons/wfbench/__init__.py index 42d62c75..1c546c45 100644 --- a/wfcommons/wfbench/__init__.py +++ b/wfcommons/wfbench/__init__.py @@ -18,6 +18,7 @@ SwiftTTranslator, TaskVineTranslator, MakeflowTranslator, + SnakemakeTranslator, CWLTranslator, StreamflowTranslator, PyCompssTranslator) diff --git a/wfcommons/wfbench/translator/__init__.py b/wfcommons/wfbench/translator/__init__.py index 1e3b2a15..07f4e0bd 100644 --- a/wfcommons/wfbench/translator/__init__.py +++ b/wfcommons/wfbench/translator/__init__.py @@ -20,3 +20,4 @@ from .swift_t import SwiftTTranslator from .taskvine import TaskVineTranslator from .makeflow import MakeflowTranslator +from .snakemake import SnakemakeTranslator diff --git a/wfcommons/wfbench/translator/snakemake.py b/wfcommons/wfbench/translator/snakemake.py new file mode 100644 index 00000000..37280c96 --- /dev/null +++ b/wfcommons/wfbench/translator/snakemake.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2024-2025 The WfCommons Team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +import pathlib +import shutil + +from logging import Logger +from typing import Optional, Union + +from .abstract_translator import Translator +from ...common import Workflow + +this_dir = pathlib.Path(__file__).resolve().parent + +class SnakemakeTranslator(Translator): + """ + A WfFormat parser for creating Snakemake workflow applications. + + :param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance. + :type workflow: Union[Workflow, pathlib.Path], + :param logger: The logger where to log information/warning or errors (optional). + :type logger: Logger + """ + def __init__(self, + workflow: Union[Workflow, pathlib.Path], + logger: Optional[Logger] = None) -> None: + """Create an object of the translator.""" + super().__init__(workflow, logger) + self._script = "" + + def translate(self, output_folder: pathlib.Path) -> None: + """ + Translate a workflow benchmark description (WfFormat) into an actual workflow application. + + :param output_folder: The path to the folder in which the workflow benchmark will be generated. + :type output_folder: pathlib.Path + """ + + # Generate code + self._generate_code() + + # write benchmark files + output_folder.mkdir(parents=True) + with open(output_folder.joinpath("workflow.smk"), "w") as fp: + fp.write(self._script) + + # additional files + self._copy_binary_files(output_folder) + self._generate_input_files(output_folder) + + # README file + self._write_readme_file(output_folder) + + def _generate_code(self): + """ + Generate the Makeflow code + + :return: the code + :rtype: str + """ + all_rule = ("# Rule to force all task executions\n" + "rule all_wfbench_tasks:\n" + "\tinput:\n") + + self._script = "\n# WfBench task rules\n" + for task_name, task in self.workflow.tasks.items(): + rule = f"rule {task_name}:\n" + # input files + rule += "\tinput:\n" + for input_file in task.input_files: + rule += f"\t\t\"data/{input_file.file_id}\",\n" + # output files + rule += "\toutput:\n" + for output_file in task.output_files: + all_rule += f"\t\t\"data/{output_file.file_id}\",\n" + rule += f"\t\t\"data/{output_file.file_id}\",\n" + # shell + rule += "\tshell:\n" + rule += "\t\t'" + task.program + " '\n" + + input_spec = "\\'[" + for file in task.input_files: + input_spec += f"\"data/{file.file_id}\"," + input_spec = input_spec[:-1] + "]\\'" + + output_spec = "\\'{{" + for file in task.output_files: + output_spec += f"\"data/{file.file_id}\":{str(file.size)}," + output_spec = output_spec[:-1] + "}}\\'" + + args = [] + for a in task.args: + if "--output-files" in a: + args.append(f"--output-files {output_spec}") + elif "--input-files" in a: + args.append(f"--input-files {input_spec}") + else: + args.append(a) + + for a in args: + rule += "\t\t'" + a + " '\n" + + self._script += rule + "\n\n" + + self._script = all_rule + self._script + return + + def _write_readme_file(self, output_folder: pathlib.Path) -> None: + """ + Write the README file. + + :param output_folder: The path of the output folder. + :type output_folder: pathlib.Path + """ + readme_file_path = output_folder.joinpath("README") + with open(readme_file_path, "w") as out: + out.write(f"In directory {str(output_folder)}:\n") + out.write(f" - The Snakemake file: workflow.smk\n") + out.write(f" - Run the workflow: snakemake -s workflow.smk --cores 1 [--logger snkmt --logger-snkmt-db ./snkmt.sqlite]\n") diff --git a/wfcommons/wfinstances/logs/__init__.py b/wfcommons/wfinstances/logs/__init__.py index 3613af2f..b5c3627d 100644 --- a/wfcommons/wfinstances/logs/__init__.py +++ b/wfcommons/wfinstances/logs/__init__.py @@ -14,3 +14,4 @@ from .pegasus import PegasusLogsParser from .pegasusrec import HierarchicalPegasusLogsParser from .ro_crate import ROCrateLogsParser +from .snakemake import SnakemakeLogsParser diff --git a/wfcommons/wfinstances/logs/snakemake.py b/wfcommons/wfinstances/logs/snakemake.py new file mode 100644 index 00000000..9ac7ce33 --- /dev/null +++ b/wfcommons/wfinstances/logs/snakemake.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2021 The WfCommons Team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +import json +import itertools +import math +import os +import pathlib + +from datetime import datetime, timezone +from logging import Logger +from typing import List, Optional +from datetime import datetime + +from .abstract_logs_parser import LogsParser +from ...common.file import File +from ...common.machine import Machine +from ...common.task import Task, TaskType +from ...common.workflow import Workflow +import sqlite3 + +class SnakemakeLogsParser(LogsParser): + """ + Parse a Snakemake execution directory to generate workflow instance. It requires the path of + a sqlite database file generated by the `snkmt` logger plugin when the workflow is executed + as `snakemake ----logger snkmt --logger-snkmt-db ./snkmt.sqlite ...` + + :param execution_dir: Snakemake execution dir (all workflow input/output file, and also typically the Snakefile). + :type execution_dir: pathlib.Path + :param snkmt_db: The snkmt sqlite database file + :type snkmt_db: pathlib.Path + :param description: Workflow instance description. + :type description: Optional[str] + :param logger: The logger where to log information/warning or errors (optional). + :type logger: Optional[Logger] + :param rules_to_ignore: Names of Snakemake rules that should be ignored in the translation + :type rules_to_ignore: Optional[list[str]] + :param path_prefix_rewrite: A tuple that specifies that a file path prefix(for the workflow data files) + should be replaced by another prefix (this is useful when the workflow execution was on a + different machine than the log parsing) + """ + + def __init__(self, + execution_dir: pathlib.Path, + snkmt_db: pathlib.Path, + description: Optional[str] = None, + logger: Optional[Logger] = None, + rules_to_ignore: Optional[list[str]] = None, + path_prefix_rewrite: Optional[tuple[str, str]] = None + ) -> None: + """Create an object of the Snakemake parser.""" + + super().__init__('Snakemake', 'https://snakemake.readthedocs.io/', description, logger) + + # Sanity checks + if not execution_dir.is_dir(): + raise OSError(f'The provided path does not exist or is not a folder: {execution_dir}') + if not snkmt_db.is_file(): + raise OSError(f'The provided path does not exist or is not a folder: {snkmt_db}') + + self.execution_dir : pathlib.Path = execution_dir + self.snkmt_db: pathlib.Path = snkmt_db + + self.file_map = {} + self.file_objects = {} + self.task_map = {} + self.task_input_files = {} + self.task_output_files = {} + self.file_input_output = {} + + self.rules_to_ignore : list[str] = rules_to_ignore or [] + self.path_prefix_rewrite : tuple[str, str] = path_prefix_rewrite or None + + + def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow: + """ + Create workflow instance based on the workflow execution logs. + + :param workflow_name: The workflow name. + :type workflow_name: Optional[str] + + :return: A workflow instance object. + :rtype: Workflow + """ + self.workflow_name = workflow_name + + # Create base workflow instance object + self.workflow = Workflow(name=self.workflow_name, + description=self.description, + runtime_system_name=self.wms_name, + runtime_system_url=self.wms_url) + + # Parse the sqlite db for to identify task + self._build_task_map() + + # Parse the sqlite db for to identify files + self._build_file_map() + + # Parse the sqlite db for to identify tasks + self._create_tasks() + + return self.workflow + + def _build_task_map(self): + conn = sqlite3.connect(self.snkmt_db) + cursor = conn.cursor() + cursor.execute("SELECT * FROM rules") + rows = cursor.fetchall() + for row in rows: + idx = row[0] + task_name = row[1] + if task_name in self.rules_to_ignore: + continue + self.task_map[idx] = task_name + self.task_input_files[idx] = [] + self.task_output_files[idx] = [] + + def _build_file_map(self): + conn = sqlite3.connect(self.snkmt_db) + cursor = conn.cursor() + cursor.execute("SELECT * FROM files") + rows = cursor.fetchall() + for row in rows: + task_idx = row[3] + if task_idx not in self.task_input_files and task_idx not in self.task_output_files: + continue + full_path = row[1] + if self.path_prefix_rewrite: + full_path = full_path.replace(self.path_prefix_rewrite[0], self.path_prefix_rewrite[1]) + file_size = os.path.getsize(f"{full_path}") + input_or_output = row[2] + if input_or_output == 'INPUT': + self.task_input_files[task_idx].append(full_path) + if full_path not in self.file_input_output: + self.file_input_output[full_path] = [[],[]] + self.file_input_output[full_path][0].append(task_idx) + elif input_or_output == 'OUTPUT': + self.task_output_files[task_idx].append(full_path) + if full_path not in self.file_input_output: + self.file_input_output[full_path] = [[],[]] + self.file_input_output[full_path][1].append(task_idx) + + if full_path not in self.file_map: + self.file_objects[full_path] = File(file_id=full_path, size=file_size) + + def _create_tasks(self): + conn = sqlite3.connect(self.snkmt_db) + cursor = conn.cursor() + cursor.execute("SELECT * FROM jobs") + rows = cursor.fetchall() + for row in rows: + idx = row[0] + if idx not in self.task_map: # One of the to-ignore ones + continue + cmd_line = row[8] + start_date = row[12] + end_date = row[13] + fmt = "%Y-%m-%d %H:%M:%S.%f" # adjust format to match your strings + t1 = datetime.strptime(start_date, fmt) + t2 = datetime.strptime(end_date, fmt) + elapsed = (t2 - t1).total_seconds() + + input_files = [self.file_objects[path] for path in self.task_input_files[idx]] + output_files = [self.file_objects[path] for path in self.task_output_files[idx]] + + task = Task(name=self.task_map[idx], + task_id=self.task_map[idx], + task_type=TaskType.COMPUTE, + runtime=elapsed, + executed_at=start_date, + input_files=input_files, + output_files=output_files, + logger=self.logger) + self.workflow.add_task(task) + + # File dependencies + for file in self.file_input_output: + input_to = self.file_input_output[file][0] + output_from = self.file_input_output[file][1] + for input_idx in input_to: + for output_idx in output_from: + # print(f"Adding dependency: {self.task_map[output_idx]} --> {self.task_map[input_idx]}") + self.workflow.add_dependency(self.task_map[output_idx], self.task_map[input_idx]) \ No newline at end of file