-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dask Integration for Analysis Grand Challenge #712
Comments
to be tested on the AGC Dask example: https://github.com/root-project/analysis-grand-challenge |
I have done a proof-of-concept test on my local REANA deployment. Everything works well.
$ helm install dask dask/dask --version 2023.1.0
$ DASK_SCHEDULER_URI=$(kubectl get services dask-scheduler | awk 'NR==2 {print "tcp://" $3 ":" substr($5,0,4)}')
$ echo $DASK_SCHEDULER_URI
tcp://10.96.4.51:8786
import dask
import dask.array
import dask.distributed
import os
DASK_SCHEDULER_URI = os.getenv("DASK_SCHEDULER_URI", "tcp://127.0.0.1:8080")
client = dask.distributed.Client(DASK_SCHEDULER_URI)
x = dask.array.random.random((10000, 10000), chunks=(1000, 1000))
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
result = z.compute()
print(result)
inputs:
files:
- dask_demo.py
workflow:
type: serial
specification:
steps:
- name: mystep
environment: 'ghcr.io/dask/dask:2023.1.0'
commands:
- DASK_SCHEDULER_URI=tcp://10.96.4.51:8786 python dask_demo.py > dask_demo.res
outputs:
files:
- dask_demo.res
$ reana-client run -w dask
$ sleep 60
$ reana-client status -w dask
$ reana-client ls -w dask
$ reana-client download -w dask dask_demo.res
$ tail dask_demo.res
[0.99483301 0.99318256 1.0114246 ... 0.99163413 0.99748661 1.01648798] Note Beware of versioning. In the above example, we are using Dask version 2023.1.0 in both the cluster and the client. If a user happens to use say the latest Dask 2023.5.1 in their job image, then the above workflow would fail due to client vs cluster version inconsistencies: $ kubectl logs service/dask-scheduler | tail
File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 5213, in add_client
await self.handle_stream(comm=comm, extra={"client": client})
File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 904, in handle_stream
handler(**merge(extra, msg))
TypeError: update_graph() got an unexpected keyword argument 'graph_header'
2023-05-31 12:46:26,523 - distributed.scheduler - INFO - Receive client connection: Client-279c10b7-ffb1-11ed-800d-4285cf89ac3c
2023-05-31 12:46:26,523 - distributed.core - INFO - Starting established connection to tcp://10.244.0.43:60864
2023-05-31 12:46:26,624 - distributed.core - INFO - Connection to tcp://10.244.0.43:60864 has been closed. This means that:
|
The proof-of-concept has been done, the RFC with the desired features for the Dask integration is being prepared in #823, and we are starting the regular work on implementing the features. Hence I'm closing this proof-of-concept issue. |
Motivation
As we talked about in our Analysis Grand Challenge meeting today, we would like to enhance REANA to support Dask for distributed analysis on Kubernetes as a backend.
Modifications
This would involve allowing the user to specify to easily enable the usage of a Dask cluster in his workflow file e. g.:
In the background, this would create a Dask cluster from the user pod. Further, this would require additional helm values in the helm Chart to configure the Dask cluster backend in the first place e. g. Kubernetes Cluster.
Refer also to this CodiMD for more details on today's meeting.
The text was updated successfully, but these errors were encountered: