Skip to content

Commit

Permalink
Evaluate hints and resources at the workflow level, using the correct…
Browse files Browse the repository at this point in the history
… inputs.
  • Loading branch information
kinow committed Aug 21, 2022
1 parent 0e2ced5 commit 710e414
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 0 deletions.
68 changes: 68 additions & 0 deletions cwltool/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
aslist,
)
from .workflow_job import WorkflowJob
from cwl_utils import expression


def default_make_tool(
Expand Down Expand Up @@ -156,6 +157,73 @@ def job(
output_callbacks: Optional[OutputCallbackType],
runtimeContext: RuntimeContext,
) -> JobsGeneratorType:
# TODO: This is not very efficient and could be improved.
#
# See issue #1330. We needed to evaluate the requirements
# at the workflow level, so that it was not re-evaluated at
# each step level (since a command-line tool, for instance,
# won't have the inputs from args, but instead will have the
# empty/default inputs of a workflow step).
#
# The solution below evaluates the requirements and hints for
# the workflow (parent), keeping track of the name of the
# requirements and hints. For each step of the workflow and of
# the embedded tool (command-line or expression tools) it will
# then evaluate the requirements or hints that have the same
# name - even though they may be re-evaluated at the step
# level (e.g. a workflow defines a requirement resource that
# uses inputs.threads_max, and a command-line tool of the same
# workflow also defines a requirement with the same name, but
# using the command-line tool input values).
#
# This prevents evaluation at the step level (i.e. the values
# were already loaded earlier).
def _fix_hints_and_requirements(
hints_or_requirements: List[CWLObjectType],
requirements_or_hints_to_evaluate: List[str],
) -> None:
"""Fix hints and requirements of a workflow.
Internal function to iterate the hints or requirements
of steps provided and evaluate the ones that exist in
the parent process.
"""
for hint_or_requirement in hints_or_requirements:
for key, value in hint_or_requirement.items():
if key in requirements_or_hints_to_evaluate:
hint_or_requirement[key] = expression.do_eval(
ex=value,
jobinput=job_order,
requirements=self.requirements,
outdir=runtimeContext.outdir,
tmpdir=runtimeContext.tmpdir,
resources={},
context=None,
timeout=runtimeContext.eval_timeout,
)

for attr_key in ["hints", "requirements"]:
parent_entries = []
for hint_or_requirement in getattr(self, attr_key):
for key, value in hint_or_requirement.items():
hint_or_requirement[key] = expression.do_eval(
ex=value,
jobinput=job_order,
requirements=self.requirements,
outdir=runtimeContext.outdir,
tmpdir=runtimeContext.tmpdir,
resources={},
context=None,
timeout=runtimeContext.eval_timeout,
)
parent_entries.append(key)

for step in self.steps:
_fix_hints_and_requirements(getattr(step, attr_key), parent_entries)
_fix_hints_and_requirements(
getattr(step.embedded_tool, attr_key), parent_entries
)

builder = self._init_job(job_order, runtimeContext)

if runtimeContext.research_obj is not None:
Expand Down
58 changes: 58 additions & 0 deletions tests/test_reqs_hints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Test for Requirements and Hints in cwltool."""
import json
from io import StringIO

from cwltool.main import main
from .util import get_data


def test_workflow_reqs_are_evaluated_earlier_default_args() -> None:
"""Test that a Workflow process will evaluate the requirements earlier.
Uses the default input values.
This means that workflow steps, such as Expression and Command Line Tools
can both use resources without re-evaluating expressions. This is useful
when you have an expression that, for instance, dynamically decides
how many threads/cpus to use.
Issue: https://github.com/common-workflow-language/cwltool/issues/1330
"""
stream = StringIO()

assert (
main(
[get_data("tests/wf/1330.cwl")],
stdout=stream,
)
== 0
)

out = json.loads(stream.getvalue())
assert out["out"] == "2\n"


def test_workflow_reqs_are_evaluated_earlier_provided_inputs() -> None:
"""Test that a Workflow process will evaluate the requirements earlier.
Passes inputs via a job file.
This means that workflow steps, such as Expression and Command Line Tools
can both use resources without re-evaluating expressions. This is useful
when you have an expression that, for instance, dynamically decides
how many threads/cpus to use.
Issue: https://github.com/common-workflow-language/cwltool/issues/1330
"""
stream = StringIO()

assert (
main(
[get_data("tests/wf/1330.cwl"), get_data("tests/wf/1330.json")],
stdout=stream,
)
== 0
)

out = json.loads(stream.getvalue())
assert out["out"] == "1\n"
39 changes: 39 additions & 0 deletions tests/wf/1330.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Original file: tests/eager-eval-reqs-hints/wf-reqs.cwl
# From: https://github.com/common-workflow-language/cwl-v1.2/pull/195
cwlVersion: v1.2
class: Workflow

requirements:
ResourceRequirement:
coresMax: $(inputs.threads_max)

inputs:
threads_max:
type: int
default: 2

steps:
one:
in: []
run:
class: CommandLineTool
inputs:
other_input:
type: int
default: 8
baseCommand: echo
arguments: [ $(runtime.cores) ]
stdout: out.txt
outputs:
out:
type: string
outputBinding:
glob: out.txt
loadContents: true
outputEval: $(self[0].contents)
out: [out]

outputs:
out:
type: string
outputSource: one/out
3 changes: 3 additions & 0 deletions tests/wf/1330.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"threads_max": 1
}

0 comments on commit 710e414

Please sign in to comment.