Skip to content

Commit

Permalink
Add initial support for setting status from workflow_manager
Browse files Browse the repository at this point in the history
  • Loading branch information
linsword13 committed Dec 12, 2024
1 parent dc518c6 commit 493648f
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 13 deletions.
16 changes: 13 additions & 3 deletions lib/ramble/ramble/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -1794,10 +1794,20 @@ def format_context(context_match, context_format):
success = True
success = success and criteria_list.passed()

if success:
self.set_status(status=experiment_status.SUCCESS)
if self.workflow_manager is not None:
wm_status = self.workflow_manager.get_status(workspace)
if wm_status == experiment_status.COMPLETE:
if success:
self.set_status(status=experiment_status.SUCCESS)
else:
self.set_status(status=experiment_status.FAILED)
else:
self.set_status(status=wm_status)
else:
self.set_status(status=experiment_status.FAILED)
if success:
self.set_status(status=experiment_status.SUCCESS)
else:
self.set_status(status=experiment_status.FAILED)

self._init_result()

Expand Down
6 changes: 2 additions & 4 deletions lib/ramble/ramble/util/command_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@ class CommandRunner:
Can be inherited to construct custom command runners.
"""

def __init__(
self, name=None, command=None, shell="bash", dry_run=False, path=None, required=True
):
def __init__(self, name=None, command=None, shell="bash", dry_run=False, path=None):
"""
Ensure required command is found in the path
"""
self.name = name
self.dry_run = dry_run
self.shell = shell
required = required and not self.dry_run
required = not self.dry_run
try:
if path is None:
self.command = which(command, required=required)
Expand Down
2 changes: 2 additions & 0 deletions lib/ramble/ramble/wmkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@
CommandRunner,
RunnerError,
)

from ramble.util.logger import logger
4 changes: 4 additions & 0 deletions lib/ramble/ramble/workflow_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def set_application(self, app_inst):
"""Set a reference to the associated app_inst"""
self.app_inst = app_inst

def get_status(self, workspace):
"""Return status of a given job"""
return None

def copy(self):
"""Deep copy a workflow manager instance"""
new_copy = type(self)(self._file_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
# option. This file may not be copied, modified, or distributed
# except according to those terms.

import os

from ramble.wmkit import *
from ramble.expander import ExpanderError
from ramble.application import experiment_status

from spack.util.executable import Executable

_ensure_job_id_snippet = r"""
job_id=$(< {experiment_run_dir}/.slurm_job)
Expand Down Expand Up @@ -123,20 +127,46 @@ def _slurm_execute_script(self):

return content


class SlurmRunner(CommandRunner):
def get_status(self, workspace):
expander = self.app_inst.expander
run_dir = expander.expand_var_name("experiment_run_dir")
job_id_file = os.path.join(run_dir, ".slurm_job")
status = experiment_status.UNKNOWN
if not os.path.isfile(job_id_file):
logger.warn("job_id file is missing")
return status
self.runner.set_dry_run(workspace.dry_run)
self.runner.set_run_dir(run_dir)
wm_status = self.runner.get_status()
if wm_status is not None and hasattr(experiment_status, wm_status):
status = getattr(experiment_status, wm_status)
return status


class SlurmRunner:
"""Runner for executing slurm commands"""

def __init__(self, dry_run=False):
super().__init__(
name="slurm", command="slurm", dry_run=dry_run, required=False
)
self.dry_run = dry_run
self.run_dir = None

def set_dry_run(self, dry_run=False):
"""
Set the dry_run state of this runner
"""
self.dry_run = dry_run

def set_run_dir(self, run_dir):
"""
Set the experiment_run_dir of this runner
"""
self.run_dir = run_dir

def generate_query_command(self, job_id):
return rf"""
status=$(squeue -h -o "%t" -j {job_id})
if [ -z "$status" ]; then
status=$(sacct -j {job_id} -o state -X -n)
status=$(sacct -j {job_id} -o state -X -n | xargs)
fi
if [ ! -z "$status" ]; then
# Define a mapping between sacct/squeue status to ramble counterpart
Expand All @@ -158,3 +188,10 @@ def generate_cancel_command(self, job_id):

def generate_hostfile_command(self):
return "scontrol show hostnames"

def get_status(self):
if self.dry_run or self.run_dir is None:
return None
query_cmd = Executable(os.path.join(self.run_dir, "query_job"))
out = query_cmd(output=str)
return out.split(":")[-1].strip()

0 comments on commit 493648f

Please sign in to comment.