Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-node execution #51

Open
cpelley opened this issue Jul 16, 2024 · 1 comment
Open

Multi-node execution #51

cpelley opened this issue Jul 16, 2024 · 1 comment
Assignees

Comments

@cpelley
Copy link
Collaborator

cpelley commented Jul 16, 2024

Options available

Manual running

module load scitools

# start scheduler
dask-scheduler --no-dashboard --protocol tcp &

# start workers
dask worker <scheduler-ip>:<port> --nworkers 4 --nthreads 1 &

# start client
python dask_computation.py --address <scheduler-ip>:<port>

Summary: This approach is highly beneficial in terms of an approach that separates creating the scheduler, workers and client.
Such an approach is likely the only out-of-the box approach when considering other schedulers (e.g. ray). The downside of this approach is that it is it does not enable the dynamic scaling of the cluster.

using dask-mpi (mpirun/mpiexec)

Not yet tested.

pip install dask-mpi
mpiexec -n 4 python dask_computation.py --mpi

Summary: I didn't bother with this approach since other schedulers (i.e. not dask) are unlikely to provide such mechanisms.

Using dask-jobqueue

pip install dask-jobqueue
total_memory = 60  # Total memory per worker job/node in GB
num_workers = 8  # Number of workers per worker job/node
num_nodes = 2
walltime = '00:10:00'  # Maximum runtime of the workers
# set cores and processes to be the same for single-threaded
cluster = PBSCluster(
    protocol="tcp",  # unincrypted
    cores=num_workers,  # Number of cores per worker == processes by default
    processes=num_workers,  # Number of processes per worker job/node
    memory=f"{total_memory} GB",  # Amount of memory total per worker job i.e. node
    walltime=walltime,  # Maximum runtime of the worker job
)
cluster.scale(jobs=num_nodes)  # Scale the number of worker jobs to the number of nodes
    # Parameters
    # ----------
    # n : int
    #    Target number of workers
    # jobs : int
    #    Target number of jobs
    # memory : str
    #    Target amount of memory
    # cores : int
    #    Target number of cores

or

cluster.scale(<number of workers>)

or

cluster.adapt(minimum=1, maximum=10)  # Automatically scale the number of workers based on the workload
    # Parameters
    # ----------
    # minimum : int
    #    Minimum number of workers to keep around for the cluster
    # maximum : int
    #    Maximum number of workers to keep around for the cluster
    # minimum_memory : str
    #    Minimum amount of memory for the cluster
    # maximum_memory : str
    #    Maximum amount of memory for the cluster
    # minimum_jobs : int
    #    Minimum number of jobs
    # maximum_jobs : int
    #    Maximum number of jobs

Summary: This approach provides greatest flexibility in terms of a dynamic cluster that can scale to requirements. However it does mean having the scheduler, workers and client managed centrally. This means less than ideal for sharing a clusters amongst independent programs. Also, another downside here is that other schedulers don't necessarily allow managing the cluster setup in this same way (ray).

Testing these approaches with a script

https://gist.github.com/cpelley/ff537e9dd5fb97ee681fa7207575330b

@cpelley cpelley self-assigned this Jul 16, 2024
@cpelley
Copy link
Collaborator Author

cpelley commented Jul 30, 2024

Environment for testing dask/ray scheduling on multiple nodes.

mamba create -c conda-forge networkx improver ray dask -n multi_node_exe

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant