Skip to content

Commit

Permalink
Merge branch 'main' into transfer_matrix
Browse files Browse the repository at this point in the history
  • Loading branch information
arahlin committed Oct 4, 2023
2 parents 0d3a9d3 + 089adfa commit ff79b69
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 129 deletions.
168 changes: 52 additions & 116 deletions xfaster/batch_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,14 @@ def get_job_logfile():
logfile : str
Path to log file.
"""
if os.getenv("PBS_O_WORKDIR"):
if os.getenv("PBS_ENVIRONMENT") != "PBS_INTERACTIVE":
workdir = os.getenv("PBS_O_WORKDIR")
jobname = os.getenv("PBS_JOBNAME")
jobid = os.getenv("PBS_JOBID").split(".", 1)[0]
logfile = os.path.join(workdir, "{}.u{}".format(jobname, jobid))
else:
logfile = None
elif os.getenv("SLURM_SUBMIT_DIR"):
if os.getenv("SLURM_SUBMIT_DIR"):
workdir = os.getenv("SLURM_SUBMIT_DIR")
jobname = os.getenv("SLURM_JOB_NAME")
jobid = os.getenv("SLURM_JOB_ID").split(".", 1)[0]
if jobname == "bash":
logfile = None
else:
logfile = os.path.join(workdir, "{}-{}.log".format(jobname, jobid))
# TODO generate different logs for multiple processes in same job?
else:
logfile = None
return logfile
Expand Down Expand Up @@ -89,26 +80,26 @@ def batch_sub(
workdir=None,
batch_args=[],
omp_threads=None,
mpi_procs=None,
mpi_args="",
env_script=None,
env=None,
nice=0,
nice=None,
echo=True,
delete=True,
submit=True,
scheduler="slurm",
debug=False,
exclude=None,
verbose=False,
srun=None,
srun_args="",
):
"""
Create and submit a SLURM or PBS job.
Create and submit a SLURM job.
Arguments
---------
cmd : string or list of strings
A command sequence to run via SLURM or PBS.
A command sequence to run via SLURM.
The command will be inserted into a qsub submission script
with all of the options specified in the remaining arguments.
name : string, optional
Expand All @@ -118,12 +109,11 @@ def batch_sub(
Or pass a string (eg '4gb') to use directly.
nodes : int or string, optional
Number of nodes to use in job
If a string, will be passed as-is to PBS -l node= resource
If using SLURM and a string, will overwrite node_list if None
node_list : string or list of strings
List of nodes that can be used for job. SLURM-only.
ppn : int, optional
Numper of processes per node
Number of processes per node
cput : string or float or datetime.timedelta, optional
Amount of CPU time requested.
String values should be in the format HH:MM:SS, e.g. '10:00:00'.
Expand All @@ -133,9 +123,9 @@ def batch_sub(
String values should be in the format HH:MM:SS, e.g. '10:00:00'.
Numerical values are interpreted as a number of hours.
output : string, optional
PBS standard output filename.
Scheduler standard output filename.
error : string, optional
PBS error output filename.
Scheduler error output filename.
queue : string, optional
The name of the queue to which to submit jobs
dep_afterok : string or list of strings
Expand All @@ -146,26 +136,17 @@ def batch_sub(
This is where the output and error files will be created
by default. Default: current directory.
batch_args : string or list of strings, optional
Any additional arguments to pass to slurm/pbs.
Any additional arguments to pass to slurm.
omp_threads : int, optional
Number of OpenMP threads to use per process
mpi_procs : int
Number of MPI processes to use.
``mpirun`` calls will be added to all lines of cmd as needed.
If cmd contains ``mpirun`` or ``mpiexec``, this does nothing.
mpi_args : string
Additional command line arguments for inserted ``mpirun`` commands.
If cmd contains ``mpirun`` or ``mpiexec``, this does nothing.
env_script : string, optional
Path to script to source during job script preamble
For loading modules, setting environment variables, etc
env : dict, optional
Dictionary of environment variables to set in job script
nice : int, optional
Adjust scheduling priority (SLURM only). Range from -5000 (highest
priority) to 5000 (lowest priority).
Note: actual submitted --nice value is 5000 higher, since negative
values require special privilege.
Adjust scheduling priority (SLURM only).
Unprivileged users can only increase niceness (lower job priority).
echo : bool, optional
Whether to use bash "set -x" in job script to echo commands to stdout.
delete : bool, optional
Expand All @@ -174,13 +155,19 @@ def batch_sub(
If True (default) submit the job script once create. Will override the
default option when False, to keep the script
scheduler : string, optional
Which scheduler system to write a script for. One of "pbs" or "slurm"
Which scheduler system to write a script for. Only "slurm" is currently
supported.
debug : bool, optional
If True, print the contents of the job script to stdout for debugging.
exclude : string or list of strings
List of nodes that will be excluded for job. SLURM-only.
verbose : bool, optional
Print the working directory, and the job ID if submitted successfully.
srun : bool, optional
Invoke jobs with srun. Defaults to True when using SLURM, set to False
to disable. Use `srun_args` to pass extra arguments to this command.
srun_args : string, optional
If `srun=True`, additional arguments to pass to the `srun` command.
Returns
-------
Expand All @@ -203,8 +190,6 @@ def batch_sub(
if mem is not None and not isinstance(mem, str):
if mem < 0:
mem = None
elif scheduler == "pbs":
mem = "{:d}mb".format(int(np.ceil(mem * 1024.0)))
elif scheduler == "slurm":
mem = "{:d}".format(int(np.ceil(mem * 1024.0)))
if isinstance(dep_afterok, str):
Expand All @@ -220,32 +205,13 @@ def batch_sub(
if scheduler == "slurm" and node_list is None:
node_list = nodes
nodes = 1
if srun is None:
srun = scheduler == "slurm"

job_script = ["#!/usr/bin/env bash"]

# TODO can maybe replace manual option with some automatic detection
if scheduler == "pbs":
# create PBS header
if name:
job_script += ["#PBS -N {:s}".format(name)]
if mem:
job_script += ["#PBS -l mem={:s}".format(mem)]
if nodes and ppn:
job_script += ["#PBS -l nodes={}:ppn={}".format(nodes, ppn)]
if cput:
job_script += ["#PBS -l cput={:s}".format(format_time(cput))]
if wallt:
job_script += ["#PBS -l walltime={:s}".format(format_time(wallt))]
if output:
job_script += ["#PBS -o {:s}".format(output)]
if error:
job_script += ["#PBS -e {:s}".format(error)]
if queue:
job_script += ["#PBS -q {:s}".format(queue)]
if dep_afterok:
job_script += ["#PBS -W depend=afterok:{}".format(":".join(dep_afterok))]

elif scheduler == "slurm":
if scheduler == "slurm":
# create slurm header
if name:
job_script += ["#SBATCH --job-name={:s}".format(name)]
Expand Down Expand Up @@ -276,7 +242,6 @@ def batch_sub(
if wallt:
job_script += ["#SBATCH --time={:s}".format(format_time(wallt))]
if nice is not None:
nice += 5000
job_script += ["#SBATCH --nice={}".format(nice)]
if output:
job_script += ["#SBATCH --output={:s}".format(output)]
Expand All @@ -299,20 +264,27 @@ def batch_sub(
if env:
for k, v in env.items():
job_script += ["export {}={}".format(k, v)]
if scheduler == "pbs":
job_script += ["cd $PBS_O_WORKDIR"]
elif scheduler == "slurm":
if scheduler == "slurm":
job_script += ["cd $SLURM_SUBMIT_DIR"]
if omp_threads:
job_script += ["export OMP_NUM_THREADS={}".format(omp_threads)]
if srun:
job_script += ["export SRUN_CPUS_PER_TASK={}".format(omp_threads)]

# finally, add the command string to script
if mpi_procs is not None:
if "mpirun" not in cmd and "mpiexec" not in cmd:
mpi = "mpiexec -n {:d} {:s} ".format(mpi_procs, mpi_args)
cmd = "\n".join(
[(mpi + line) if line != "wait" else line for line in cmd.split("\n")]
)
cmd_prefix = None
if srun:
if scheduler != "slurm":
raise ValueError('srun=True requires scheduler="slurm"')
if not cmd.startswith("srun"):
cmd_prefix = "srun {:s} ".format(srun_args)
if cmd_prefix is not None:
cmd = "\n".join(
[
(cmd_prefix + line) if line != "wait" else line
for line in cmd.split("\n")
]
)
job_script += [cmd]
job_script = "\n".join(job_script)

Expand All @@ -334,9 +306,7 @@ def batch_sub(

# create and submit script
prefix = "{}_".format(name if name else "job")
if scheduler == "pbs":
suffix = ".qsub"
else:
if scheduler == "slurm":
suffix = ".slurm"
with tempfile.NamedTemporaryFile(
prefix=prefix, suffix=suffix, mode="w", dir=workdir, delete=delete
Expand All @@ -345,12 +315,7 @@ def batch_sub(
f.flush()
os.chmod(f.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP)
if submit:
if scheduler == "pbs":
ret = sp.check_output(["qsub"] + batch_args + [f.name]).decode("UTF-8")
jobid = ret.split("\n")[0] # parse jobid
if not re.match("[0-9]+\.[\w]+", jobid):
raise RuntimeError("qsub error:\n{}".format(ret))
elif scheduler == "slurm":
if scheduler == "slurm":
ret = sp.check_output(["sbatch"] + batch_args + [f.name]).decode(
"UTF-8"
)
Expand Down Expand Up @@ -382,7 +347,7 @@ def batch_sub(

def batch_group(cmds, group_by=1, serial=False, *args, **kwargs):
"""
Create and submit SLURM or PBS job scripts for a group of similar commands. The
Create and submit SLURM job scripts for a group of similar commands. The
commands can be grouped together into larger single jobs that run them in
parallel on multiple processors on a node.
Expand All @@ -403,7 +368,7 @@ def batch_group(cmds, group_by=1, serial=False, *args, **kwargs):
Eg. on scinet use ``group_by=8`` to efficiently use whole nodes.
serial : bool, optional
Set to ``True`` to run cmds sequentially, rather than starting them all
in parallel. This will also work with MPI/OpenMP parallel jobs.
in parallel. This will also work with OpenMP parallel jobs.
args, kwargs : arb
Additional arguments passed to batch_sub
Expand Down Expand Up @@ -452,10 +417,8 @@ def __init__(
Standardized way to add job submission arguments to a script.
Keyword arguments are used to fix values for parameters not needed by a
script. The corresponding command line arguments will not be added. For
example, if not using MPI, pass ``mpi_procs=None`` and then there will
be no ``--mpi-procs`` argument on the command line. See the
``.opt_list`` attribute for a complete list.
script. The corresponding command line arguments will not be added. See
the ``.opt_list`` attribute for a complete list.
Arguments
---------
Expand All @@ -476,7 +439,7 @@ def __init__(
Example
-------
>>> jp = sa.batch.JobArgumentParser("some_serial_job", mem=4, time=1.5,
mpi_procs=None, omp_threads=1, workdir="/path/to/logs")
omp_threads=1, workdir="/path/to/logs")
>>> AP = argparse.ArgumentParser(description="Do some job")
>>> AP.add_argument(...) # other non-job arguments
>>> jp.add_arguments(AP)
Expand Down Expand Up @@ -519,7 +482,7 @@ def _opt_list(self):
dict(
type=int,
default=None,
help="Processes per node. Default based on group, omp_threads, and mpi_procs",
help="Processes per node. Default based on group and omp_threads",
),
),
(
Expand All @@ -531,14 +494,6 @@ def _opt_list(self):
help="Nodes to exclude from jobs",
),
),
(
"pbs",
dict(
action="store_true",
default=False,
help="Create PBS (rather than SLURM) job scripts",
),
),
(
"use_cput",
dict(
Expand All @@ -559,8 +514,8 @@ def _opt_list(self):
"nice",
dict(
type=int,
default=0,
help="Priority from -5000 (hi) to 5000 (lo). SLURM only",
default=None,
help="Adjust job priority. SLURM only",
),
),
(
Expand All @@ -584,12 +539,6 @@ def _opt_list(self):
type=int, default=1, help="Number of OpenMP threads per process"
),
),
(
"mpi_procs",
dict(
type=int, default=1, help="Number of MPI processes (per node)"
),
),
(
"group",
dict(
Expand All @@ -603,15 +552,7 @@ def _opt_list(self):
dict(
action="store_true",
default=False,
help="Run grouped commands serially. Works for MPI",
),
),
(
"procs_scale",
dict(
action="store_true",
default=False,
help="Scale time and memory by number of processes",
help="Run grouped commands serially.",
),
),
]
Expand Down Expand Up @@ -698,8 +639,7 @@ def set_job_opts(self, args, load_defaults=True, **kwargs):
if args["ppn"] is None:
args["ppn"] = args["group"]

scale = 1.0 if not args["procs_scale"] else float(args["mpi_procs"])
mem_scale = scale * (args["group"] if not args["serial"] else 1.0)
mem_scale = args["group"] if not args["serial"] else 1.0
mem = self.mem * mem_scale if self.mem is not None else None

self.job_opts = dict(
Expand All @@ -710,7 +650,6 @@ def set_job_opts(self, args, load_defaults=True, **kwargs):
ppn=args["ppn"],
queue=args["queue"],
omp_threads=args["omp_threads"],
mpi_procs=args["mpi_procs"],
env_script=args["env_script"],
nice=args["nice"],
delete=args["test"],
Expand All @@ -719,12 +658,9 @@ def set_job_opts(self, args, load_defaults=True, **kwargs):
group_by=args["group"],
serial=args["serial"],
)
if args["pbs"]:
self.job_opts["scheduler"] = "pbs"
else:
self.job_opts["scheduler"] = "slurm"
self.job_opts["scheduler"] = "slurm"

time = self.time / args["cpu_speed"] / scale
time = self.time / args["cpu_speed"]
if time < 0.25:
time = 0.25
if args["use_cput"]:
Expand Down
Loading

0 comments on commit ff79b69

Please sign in to comment.