Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a slurm workflow manager #789

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions etc/ramble/defaults/base_workflow_manager_repos.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# -------------------------------------------------------------------------
# This is the default ramble repository configuration. It includes the
# builtin ramble base workflow manager repository.
#
# Users can override these settings by editing the following files.
#
# Per-ramble-instance settings (overrides defaults):
# $RAMBLE_ROOT/etc/ramble/base_workflow_manager_repos.yaml
#
# Per-user settings (overrides default and site settings):
# ~/.ramble/base_workflow_manager_repos.yaml
# -------------------------------------------------------------------------
base_workflow_manager_repos:
- $ramble/var/ramble/repos/builtin
14 changes: 14 additions & 0 deletions etc/ramble/defaults/workflow_manager_repos.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# -------------------------------------------------------------------------
# This is the default ramble repository configuration. It includes the
# builtin ramble application repository.
#
# Users can override these settings by editing the following files.
#
# Per-ramble-instance settings (overrides defaults):
# $RAMBLE_ROOT/etc/ramble/repos.yaml
#
# Per-user settings (overrides default and site settings):
# ~/.ramble/repos.yaml
# -------------------------------------------------------------------------
workflow_manager_repos:
- $ramble/var/ramble/repos/builtin
86 changes: 81 additions & 5 deletions lib/ramble/ramble/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,14 @@
from enum import Enum

experiment_status = Enum(
"experiment_status", ["UNKNOWN", "SETUP", "RUNNING", "COMPLETE", "SUCCESS", "FAILED"]
"experiment_status",
["UNKNOWN", "SETUP", "RUNNING", "COMPLETE", "SUCCESS", "FAILED", "CANCELLED"],
)

_NULL_CONTEXT = "null"

_DEFAULT_CONTENT_PERM = stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH


def _get_context_display_name(context):
return (
Expand Down Expand Up @@ -179,6 +182,8 @@ def __init__(self, file_path):
self.license_path = ""
self.license_file = ""

self.workflow_manager = None

ramble.util.directives.define_directive_methods(self)

def experiment_lock(self):
Expand Down Expand Up @@ -234,7 +239,10 @@ def set_variants(self, variants):
experiment.
"""
self.variants = variants.copy()
self._set_package_manager()
self._set_workflow_manager()

def _set_package_manager(self):
if namespace.package_manager in self.variants:
pkgman_name = self.expander.expand_var(
self.variants[namespace.package_manager], typed=True
Expand Down Expand Up @@ -275,6 +283,24 @@ def set_variants(self, variants):
}
)

def _set_workflow_manager(self):
if namespace.workflow_manager in self.variants:
workflow_name = self.expander.expand_var(
self.variants[namespace.workflow_manager], typed=True
)

if workflow_name is not None:
try:
wfman_type = ramble.repository.ObjectTypes.workflow_managers
self.workflow_manager = ramble.repository.get(workflow_name, wfman_type).copy()
self.workflow_manager.set_application(self)
except ramble.repository.UnknownObjectError:
logger.die(
f"{wfman_type} is not a valid workflow manager. "
"Valid workflow managers can be listed via:\n"
"\tramble list --type workflow_managers"
)

def build_phase_order(self):
if self._pipeline_graphs is not None:
return
Expand Down Expand Up @@ -477,6 +503,10 @@ def build_used_variables(self, workspace):
for var in self.package_manager.package_manager_variables.values():
self.variables[var.name] = var.default

if self.workflow_manager is not None:
for var in self.workflow_manager.wm_vars.values():
self.variables[var.name] = var.default

##########################################
# Expand used variables to track all usage
##########################################
Expand Down Expand Up @@ -959,6 +989,9 @@ def _set_default_experiment_variables(self):
for mod_inst in self._modifier_instances:
var_sets.append(mod_inst.mode_variables())

if self.workflow_manager is not None:
var_sets.append(self.workflow_manager.wm_vars)

for var_set in var_sets:
for var, val in var_set.items():
if var not in self.variables.keys():
Expand Down Expand Up @@ -1145,6 +1178,14 @@ def _derive_variables_for_template_path(self, workspace):
)
self.variables[template_name] = expand_path

def _derive_variables_for_workflow_manager(self):
if self.workflow_manager is None:
return
# Define variables from `render_content` directives
for template_name in self.workflow_manager.render_configs.keys():
path = os.path.join(self.expander.expand_var("{experiment_run_dir}"), template_name)
self.variables[template_name] = path

def _validate_experiment(self):
"""Perform validation of an experiment before performing actions with it

Expand Down Expand Up @@ -1173,6 +1214,7 @@ def add_expand_vars(self, workspace):
self._set_input_path()

self._derive_variables_for_template_path(workspace)
self._derive_variables_for_workflow_manager()
self._vars_are_expanded = True

def _inputs_and_fetchers(self, workload=None):
Expand Down Expand Up @@ -1376,7 +1418,31 @@ def _make_experiments(self, workspace, app_inst=None):
f.write(
self.expander.expand_var(template_conf["contents"], extra_vars=exec_vars)
)
os.chmod(expand_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH)
os.chmod(expand_path, _DEFAULT_CONTENT_PERM)

wm = self.workflow_manager
if wm is not None:
for name, config in wm.render_configs.items():
path = os.path.join(experiment_run_dir, name)
logger.msg(
f"Writing content {name} defined by workflow manager "
f"{wm.name} to {path}"
)
if config.get("content_func") is not None:
if config.get("content_tpl") is not None:
logger.warn(
f"`render_content` directive {name} defined "
f"by {wm.name} has both `content_func` and "
"`content_tpl` specified. The latter is ignored"
)
func = getattr(wm, config.get("content_func"))
content = func()
else:
content = config["content_tpl"]
with open(path, "w+") as f:
f.write(self.expander.expand_var(content, extra_vars=exec_vars))
f.write("\n")
os.chmod(path, config.get("content_perm", _DEFAULT_CONTENT_PERM))

experiment_script = workspace.experiments_script
experiment_script.write(self.expander.expand_var("{batch_submit}\n"))
Expand Down Expand Up @@ -1729,10 +1795,20 @@ def format_context(context_match, context_format):
success = True
success = success and criteria_list.passed()

if success:
self.set_status(status=experiment_status.SUCCESS)
if self.workflow_manager is not None:
wm_status = self.workflow_manager.get_status(workspace)
if wm_status == experiment_status.COMPLETE:
if success:
self.set_status(status=experiment_status.SUCCESS)
else:
self.set_status(status=experiment_status.FAILED)
else:
self.set_status(status=wm_status)
else:
self.set_status(status=experiment_status.FAILED)
if success:
self.set_status(status=experiment_status.SUCCESS)
else:
self.set_status(status=experiment_status.FAILED)

self._init_result()

Expand Down
3 changes: 3 additions & 0 deletions lib/ramble/ramble/cmd/common/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
"package_manager_requirements": None,
# Package manager specific:
"package_manager_variables": None,
# Workflow manager specific:
"workflow_manager_variables": "wm_vars",
"render_configs": None,
}


Expand Down
7 changes: 7 additions & 0 deletions lib/ramble/ramble/cmd/style.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ def is_object(f):
"F403": [r"^from ramble.pkgmankit import \*$"],
**common_object_exemptions,
},
# exemptions applied only to workflow_manager.py files.
r"workflow_manager.py$": {
# Allow 'from ramble.modkit import *' in workflow_managers,
# but no other wildcards
"F403": [r"^from ramble.wmkit import \*$"],
**common_object_exemptions,
},
# exemptions applied to all files.
r".py$": {
"E501": [
Expand Down
5 changes: 5 additions & 0 deletions lib/ramble/ramble/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import sys
from contextlib import contextmanager

import ramble.schema.workflow_manager_repos
import ruamel.yaml as yaml
from ruamel.yaml.error import MarkedYAMLError

Expand Down Expand Up @@ -69,9 +70,11 @@
import ramble.schema.repos
import ramble.schema.modifier_repos
import ramble.schema.package_manager_repos
import ramble.schema.workflow_manager_repos
import ramble.schema.base_application_repos
import ramble.schema.base_modifier_repos
import ramble.schema.base_package_manager_repos
import ramble.schema.base_workflow_manager_repos

from ramble.error import RambleError
from ramble.util.logger import logger
Expand Down Expand Up @@ -99,9 +102,11 @@
"repos": ramble.schema.repos.schema,
"modifier_repos": ramble.schema.modifier_repos.schema,
"package_manager_repos": ramble.schema.package_manager_repos.schema,
"workflow_manager_repos": ramble.schema.workflow_manager_repos.schema,
"base_application_repos": ramble.schema.base_application_repos.schema,
"base_modifier_repos": ramble.schema.base_modifier_repos.schema,
"base_package_manager_repos": ramble.schema.base_package_manager_repos.schema,
"base_workflow_manager_repos": ramble.schema.base_workflow_manager_repos.schema,
}

# Same as above, but including keys for workspaces
Expand Down
2 changes: 1 addition & 1 deletion lib/ramble/ramble/language/language_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#: them
reserved_names = []

namespaces = ["ramble.app", "ramble.mod", "ramble.pkg_man", "ramble.package_manager"]
namespaces = ["ramble.app", "ramble.mod", "ramble.pkg_man", "ramble.package_manager", "ramble.wm"]


class DirectiveMeta(type):
Expand Down
83 changes: 83 additions & 0 deletions lib/ramble/ramble/language/workflow_manager_language.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright 2022-2024 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.

from typing import Optional

import ramble.language.language_helpers
import ramble.language.language_base
import ramble.language.shared_language
from ramble.language.language_base import DirectiveError


class WorkflowManagerMeta(ramble.language.shared_language.SharedMeta):
_directive_names = set()
_directives_to_be_executed = []


workflow_manager_directive = WorkflowManagerMeta.directive


@workflow_manager_directive("render_configs")
def render_content(
name: str, content_tpl: str = None, content_func: str = None, content_perm=None
):
"""Define generating rendered content based on the `content_tpl`.

Args:
name (str): Name of the rendering entry. Also used as the filename under
experiment run directory.
content_tpl (str): A template for generating the content.
content_func (str): A name referencing a method defined on the workflow manager
class. The method is called to generate a dynamic template for expansion.
content_perm: The chmod mask for the rendered file.
"""

def _define_render_content(wm):
if content_func is not None:
render_config = {"content_func": content_func}
elif content_tpl is not None:
render_config = {"content_tpl": content_tpl}
else:
raise DirectiveError(
f"`content_tpl` needs to be either a string or a function, "
f"instead {content_tpl} is given"
)
if content_perm is not None:
render_config["content_perm"] = content_perm
wm.render_configs[name] = render_config

return _define_render_content


@workflow_manager_directive("wm_vars")
def workflow_manager_variable(
name: str,
default,
description: str,
values: Optional[list] = None,
):
"""Define a variable for this wm

Args:
name: Name of variable
default: Default value if the variable is not defined
description: Description of the variable
values: Optional list of suggested values for this variable
"""

def _define_wm_variable(wm):
import ramble.workload

wm.wm_vars[name] = ramble.workload.WorkloadVariable(
name,
default=default,
description=description,
values=values,
)

return _define_wm_variable
2 changes: 2 additions & 0 deletions lib/ramble/ramble/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,6 @@ class namespace:
# For variants
package_manager = "package_manager"

workflow_manager = "workflow_manager"

metadata = "metadata"
Loading
Loading