Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions tests/translators_loggers/Dockerfile.snakemake
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# docker build --platform amd64 -t wfcommons-dev-snakemake -f Dockerfile.snakemake .
# docker run -it --rm -v `pwd`:/home/wfcommons wfcommons-dev-snakemake /bin/bash

FROM amd64/ubuntu:noble

LABEL org.containers.image.authors="henric@hawaii.edu"

# update repositories
RUN apt-get update

# set timezone
RUN echo "America/Los_Angeles" > /etc/timezone && export DEBIAN_FRONTEND=noninteractive && apt-get install -y tzdata

# install useful stuff
RUN apt-get -y install pkg-config
RUN apt-get -y install git
RUN apt-get -y install wget
RUN apt-get -y install curl
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install cmake-data
RUN apt-get -y install sudo
RUN apt-get -y install vim --fix-missing
RUN apt-get -y install gcc
RUN apt-get -y install gcc-multilib
RUN apt-get -y install graphviz libgraphviz-dev


# Python stuff
RUN apt-get -y install python3 python3-pip
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1
RUN python3 -m pip install --break-system-packages pathos pandas filelock
RUN python3 -m pip install --break-system-packages networkx scipy matplotlib pygraphviz
RUN python3 -m pip install --break-system-packages pyyaml jsonschema requests
RUN python3 -m pip install --break-system-packages --upgrade setuptools

# Stress-ng
RUN apt-get -y install stress-ng

# Add wfcommons user
RUN useradd -ms /bin/bash wfcommons
RUN adduser wfcommons sudo
RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
ENV PATH="$PATH:/home/wfcommons/.local/bin/"

USER wfcommons
WORKDIR /home/wfcommons
# Making this directory world rwx to facilitate testing
RUN chmod -R 777 /home/wfcommons


# Install Pixi
RUN wget -qO- https://pixi.sh/install.sh | sh
ENV PATH="$PATH:/home/wfcommons/.pixi/bin"

# Install snakemake
RUN pixi global install snakemake conda -c conda-forge -c bioconda
RUN pixi global install snakedeploy -c conda-forge -c bioconda
RUN ~/.pixi/envs/snakemake/bin/python -m ensurepip && \
~/.pixi/envs/snakemake/bin/python -m pip install snakemake-logger-plugin-snkmt

17 changes: 17 additions & 0 deletions tests/translators_loggers/test_translators_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from wfcommons.wfbench import BashTranslator
from wfcommons.wfbench import TaskVineTranslator
from wfcommons.wfbench import MakeflowTranslator
from wfcommons.wfbench import SnakemakeTranslator
from wfcommons.wfbench import CWLTranslator
from wfcommons.wfbench import StreamflowTranslator
from wfcommons.wfbench import PegasusTranslator
Expand All @@ -43,6 +44,7 @@
from wfcommons.wfinstances.logs import TaskVineLogsParser
from wfcommons.wfinstances.logs import MakeflowLogsParser
from wfcommons.wfinstances.logs import ROCrateLogsParser
from wfcommons.wfinstances.logs import SnakemakeLogsParser


def _create_workflow_benchmark() -> (WorkflowBenchmark, int):
Expand Down Expand Up @@ -116,6 +118,7 @@ def _additional_setup_swiftt(container):
"bash": noop,
"taskvine": _additional_setup_taskvine,
"makeflow": noop,
"snakemake": noop,
"cwl": noop,
"streamflow": noop,
"pegasus": _additional_setup_pegasus,
Expand Down Expand Up @@ -182,6 +185,15 @@ def run_workflow_makeflow(container, num_tasks, str_dirpath):
num_completed_jobs = len(re.findall(r'job \d+ completed', output.decode()))
assert (num_completed_jobs == num_tasks)

def run_workflow_snakemake(container, num_tasks, str_dirpath):
# Run the workflow (with full logging)
exit_code, output = container.exec_run(cmd=["bash", "-c", "snakemake -s ./workflow.smk --cores 1 --logger snkmt --logger-snkmt-db ./snkmt.sqlite"],
user="wfcommons", stdout=True, stderr=True)
# Check sanity
assert (exit_code == 0)
num_completed_jobs = len(re.findall(r'Finished jobid: \d+', output.decode()))
assert (num_completed_jobs - 1 == num_tasks) # Discounting the "all_tasks" rule

def run_workflow_cwl(container, num_tasks, str_dirpath):
# Run the workflow!
# Note that the input file is hardcoded and Blast-specific
Expand Down Expand Up @@ -243,6 +255,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath):
"bash": run_workflow_bash,
"taskvine": run_workflow_taskvine,
"makeflow": run_workflow_makeflow,
"snakemake": run_workflow_snakemake,
"cwl": run_workflow_cwl,
"streamflow": run_workflow_streamflow,
"pegasus": run_workflow_pegasus,
Expand All @@ -258,6 +271,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath):
"bash": BashTranslator,
"taskvine": TaskVineTranslator,
"makeflow": MakeflowTranslator,
"snakemake": SnakemakeTranslator,
"cwl": CWLTranslator,
"streamflow": StreamflowTranslator,
"pegasus": PegasusTranslator,
Expand All @@ -279,6 +293,7 @@ class TestTranslators:
"bash",
"taskvine",
"makeflow",
"snakemake",
"cwl",
"streamflow",
"pegasus",
Expand Down Expand Up @@ -338,6 +353,8 @@ def test_translator(self, backend) -> None:
steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"],
file_extensions_to_ignore=[".out", ".err"],
instruments_to_ignore=["shell.cwl"])
elif backend == "snakemake":
parser = SnakemakeLogsParser(dirpath, snkmt_db=dirpath / "snkmt.sqlite", rules_to_ignore=["all_wfbench_tasks"])

if parser is not None:
sys.stderr.write(f"[{backend}] Parsing the logs...\n")
Expand Down
1 change: 1 addition & 0 deletions wfcommons/wfbench/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
SwiftTTranslator,
TaskVineTranslator,
MakeflowTranslator,
SnakemakeTranslator,
CWLTranslator,
StreamflowTranslator,
PyCompssTranslator)
1 change: 1 addition & 0 deletions wfcommons/wfbench/translator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
from .swift_t import SwiftTTranslator
from .taskvine import TaskVineTranslator
from .makeflow import MakeflowTranslator
from .snakemake import SnakemakeTranslator
126 changes: 126 additions & 0 deletions wfcommons/wfbench/translator/snakemake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024-2025 The WfCommons Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

import pathlib
import shutil

from logging import Logger
from typing import Optional, Union

from .abstract_translator import Translator
from ...common import Workflow

this_dir = pathlib.Path(__file__).resolve().parent

class SnakemakeTranslator(Translator):
"""
A WfFormat parser for creating Snakemake workflow applications.

:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
:type workflow: Union[Workflow, pathlib.Path],
:param logger: The logger where to log information/warning or errors (optional).
:type logger: Logger
"""
def __init__(self,
workflow: Union[Workflow, pathlib.Path],
logger: Optional[Logger] = None) -> None:
"""Create an object of the translator."""
super().__init__(workflow, logger)
self._script = ""

def translate(self, output_folder: pathlib.Path) -> None:
"""
Translate a workflow benchmark description (WfFormat) into an actual workflow application.

:param output_folder: The path to the folder in which the workflow benchmark will be generated.
:type output_folder: pathlib.Path
"""

# Generate code
self._generate_code()

# write benchmark files
output_folder.mkdir(parents=True)
with open(output_folder.joinpath("workflow.smk"), "w") as fp:
fp.write(self._script)

# additional files
self._copy_binary_files(output_folder)
self._generate_input_files(output_folder)

# README file
self._write_readme_file(output_folder)

def _generate_code(self):
"""
Generate the Makeflow code

:return: the code
:rtype: str
"""
all_rule = ("# Rule to force all task executions\n"
"rule all_wfbench_tasks:\n"
"\tinput:\n")

self._script = "\n# WfBench task rules\n"
for task_name, task in self.workflow.tasks.items():
rule = f"rule {task_name}:\n"
# input files
rule += "\tinput:\n"
for input_file in task.input_files:
rule += f"\t\t\"data/{input_file.file_id}\",\n"
# output files
rule += "\toutput:\n"
for output_file in task.output_files:
all_rule += f"\t\t\"data/{output_file.file_id}\",\n"
rule += f"\t\t\"data/{output_file.file_id}\",\n"
# shell
rule += "\tshell:\n"
rule += "\t\t'" + task.program + " '\n"

input_spec = "\\'["
for file in task.input_files:
input_spec += f"\"data/{file.file_id}\","
input_spec = input_spec[:-1] + "]\\'"

output_spec = "\\'{{"
for file in task.output_files:
output_spec += f"\"data/{file.file_id}\":{str(file.size)},"
output_spec = output_spec[:-1] + "}}\\'"

args = []
for a in task.args:
if "--output-files" in a:
args.append(f"--output-files {output_spec}")
elif "--input-files" in a:
args.append(f"--input-files {input_spec}")
else:
args.append(a)

for a in args:
rule += "\t\t'" + a + " '\n"

self._script += rule + "\n\n"

self._script = all_rule + self._script
return

def _write_readme_file(self, output_folder: pathlib.Path) -> None:
"""
Write the README file.

:param output_folder: The path of the output folder.
:type output_folder: pathlib.Path
"""
readme_file_path = output_folder.joinpath("README")
with open(readme_file_path, "w") as out:
out.write(f"In directory {str(output_folder)}:\n")
out.write(f" - The Snakemake file: workflow.smk\n")
out.write(f" - Run the workflow: snakemake -s workflow.smk --cores 1 [--logger snkmt --logger-snkmt-db ./snkmt.sqlite]\n")
1 change: 1 addition & 0 deletions wfcommons/wfinstances/logs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
from .pegasus import PegasusLogsParser
from .pegasusrec import HierarchicalPegasusLogsParser
from .ro_crate import ROCrateLogsParser
from .snakemake import SnakemakeLogsParser
Loading
Loading