Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a slurm workflow manager #789

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add initial support for setting status from workflow_manager
linsword13 committed Dec 18, 2024
commit c6c8c9698e087f84badd66b9537636de80fff5b8
16 changes: 13 additions & 3 deletions lib/ramble/ramble/application.py
Original file line number Diff line number Diff line change
@@ -1794,10 +1794,20 @@ def format_context(context_match, context_format):
success = True
success = success and criteria_list.passed()

if success:
self.set_status(status=experiment_status.SUCCESS)
if self.workflow_manager is not None:
wm_status = self.workflow_manager.get_status(workspace)
if wm_status == experiment_status.COMPLETE:
if success:
self.set_status(status=experiment_status.SUCCESS)
else:
self.set_status(status=experiment_status.FAILED)
else:
self.set_status(status=wm_status)
else:
self.set_status(status=experiment_status.FAILED)
if success:
self.set_status(status=experiment_status.SUCCESS)
else:
self.set_status(status=experiment_status.FAILED)

self._init_result()

6 changes: 2 additions & 4 deletions lib/ramble/ramble/util/command_runner.py
Original file line number Diff line number Diff line change
@@ -20,16 +20,14 @@ class CommandRunner:
Can be inherited to construct custom command runners.
"""

def __init__(
self, name=None, command=None, shell="bash", dry_run=False, path=None, required=True
):
def __init__(self, name=None, command=None, shell="bash", dry_run=False, path=None):
"""
Ensure required command is found in the path
"""
self.name = name
self.dry_run = dry_run
self.shell = shell
required = required and not self.dry_run
required = not self.dry_run
try:
if path is None:
self.command = which(command, required=required)
2 changes: 2 additions & 0 deletions lib/ramble/ramble/wmkit.py
Original file line number Diff line number Diff line change
@@ -19,3 +19,5 @@
CommandRunner,
RunnerError,
)

from ramble.util.logger import logger
4 changes: 4 additions & 0 deletions lib/ramble/ramble/workflow_manager.py
Original file line number Diff line number Diff line change
@@ -44,6 +44,10 @@ def set_application(self, app_inst):
"""Set a reference to the associated app_inst"""
self.app_inst = app_inst

def get_status(self, workspace):
"""Return status of a given job"""
return None

def copy(self):
"""Deep copy a workflow manager instance"""
new_copy = type(self)(self._file_path)
Original file line number Diff line number Diff line change
@@ -6,9 +6,13 @@
# option. This file may not be copied, modified, or distributed
# except according to those terms.

import os

from ramble.wmkit import *
from ramble.expander import ExpanderError
from ramble.application import experiment_status

from spack.util.executable import Executable

_ensure_job_id_snippet = r"""
job_id=$(< {experiment_run_dir}/.slurm_job)
@@ -123,20 +127,46 @@ def _slurm_execute_script(self):

return content


class SlurmRunner(CommandRunner):
def get_status(self, workspace):
expander = self.app_inst.expander
run_dir = expander.expand_var_name("experiment_run_dir")
job_id_file = os.path.join(run_dir, ".slurm_job")
status = experiment_status.UNKNOWN
if not os.path.isfile(job_id_file):
logger.warn("job_id file is missing")
return status
self.runner.set_dry_run(workspace.dry_run)
self.runner.set_run_dir(run_dir)
wm_status = self.runner.get_status()
if wm_status is not None and hasattr(experiment_status, wm_status):
status = getattr(experiment_status, wm_status)
return status


class SlurmRunner:
"""Runner for executing slurm commands"""

def __init__(self, dry_run=False):
super().__init__(
name="slurm", command="slurm", dry_run=dry_run, required=False
)
self.dry_run = dry_run
self.run_dir = None

def set_dry_run(self, dry_run=False):
"""
Set the dry_run state of this runner
"""
self.dry_run = dry_run

def set_run_dir(self, run_dir):
"""
Set the experiment_run_dir of this runner
"""
self.run_dir = run_dir

def generate_query_command(self, job_id):
return rf"""
status=$(squeue -h -o "%t" -j {job_id})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how picky I am about this, but I would almost prefer the status map be in python rather than in bash.

And maybe some of the logic mapped into the base class too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I moved the get_status call inside the runner to be in python instead of relying on this bash script, so now ramble workspace analyze would rely on python. Do you prefer even the --executor "{batch_query}" to be handled in python instead of the bash script right now?

if [ -z "$status" ]; then
status=$(sacct -j {job_id} -o state -X -n)
status=$(sacct -j {job_id} -o state -X -n | xargs)
fi
if [ ! -z "$status" ]; then
# Define a mapping between sacct/squeue status to ramble counterpart
@@ -150,11 +180,18 @@ def generate_query_command(self, job_id):
status=${{status_map["$status"]}}
fi
fi
echo job {job_id} has status: $status
echo "job {job_id} has status: $status"
"""

def generate_cancel_command(self, job_id):
return f"scancel {job_id}"

def generate_hostfile_command(self):
return "scontrol show hostnames"

def get_status(self):
if self.dry_run or self.run_dir is None:
return None
query_cmd = Executable(os.path.join(self.run_dir, "query_job"))
out = query_cmd(output=str)
return out.split(":")[-1].strip()