Experiments to run single cell analyses efficiently at scale. Using a combination of Zarr, anndata, Scanpy, and Apache Spark -- and possibly other things too.
The work in this repository is exploratory and not suitable for production.
Provides AnnDataRdd
, an AnnData implementation backed by a Spark RDD
Some ScanPy functions implemented for AnnDataRdd
s
Spark convenience functions for reading and writing Zarr files as RDDs of numpy arrays.
A command-line interface for converting between various HDF5 formats (10X's, Loom, AnnData) and Zarr equivalents:
# Download a 10X HDF5 file locally
wget -O files/ica_cord_blood_h5.h5 https://storage.googleapis.com/ll-sc-data/hca/immune-cell-census/ica_cord_blood_h5.h5
# Convert to .h5ad
python cli.py files/ica_cord_blood_h5.h5 files/ica_cord_blood.h5ad
# Convert to .zarr
python cli.py files/ica_cord_blood.h5ad files/ica_cord_blood.h5ad.zarr
.zarr
outputs can also be written directly to gs://
and s3://
URLs.
Create and activate a Python 3 virtualenv, and install the requirements:
python3 -m venv venv # python 3 is required!
. venv/bin/activate
pip install -r requirements.txt
Install and configure Spark 2.3.1:
wget http://www-us.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
tar xf spark-2.3.1-bin-hadoop2.7.tgz
export SPARK_HOME=spark-2.3.1-bin-hadoop2.7
Run Tests:
pytest
or only run particular tests, e.g.
pytest -k test_log1p
socket.gaierror: [Errno 8] nodename nor servname provided, or not known
You likely need to add a mapping for 127.0.0.1 to your /etc/hosts
:
echo '127.0.0.1 localhost' | sudo tee /etc/hosts >> /dev/null
…
File "/Users/ryan/c/hdf5-experiments/test/lib/python3.6/site-packages/pyspark/java_gateway.py", line 93, in launch_gateway
raise Exception("Java gateway process exited before sending its port number")
Exception: Java gateway process exited before sending its port number
export SPARK_LOCAL_IP=127.0.0.1
You may need to additionally set the PYSPARK_PYTHON
environment variable in your test configuration (to the venv/bin/python
binary from your virtualenv above), otherwise workers will use a different/incompatible Python.
Sample configuration:
Env vars:
We use Black to enforce Python style. If you edit any Python source, you can format it with
black *.py
The CI will fail any patch that is not correctly formatted.
All of the demos use Google Cloud, so you'll need an account there to run them.
-
Start a five-node Dataproc cluster with the following. You'll need to change the environment variables to ones appropriate to your account. Notice the initialization actions that install the required Python packages on the cluster nodes.
export DATAPROC_CLUSTER_NAME=ll-cluster-$USER export PROJECT=hca-scale export ZONE=us-east1-d export NUM_WORKERS=5 gcloud dataproc --region us-east1 \ clusters create $DATAPROC_CLUSTER_NAME \ --zone $ZONE \ --master-machine-type n1-standard-4 \ --master-boot-disk-size 500 \ --num-workers $NUM_WORKERS \ --worker-machine-type n1-standard-8 \ --worker-boot-disk-size 500 \ --image-version 1.2 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --project $PROJECT \ --metadata 'CONDA_PACKAGES=numpy,PIP_PACKAGES=gcsfs scanpy zarr git+https://github.com/tomwhite/anndata@zarr' \ --initialization-actions gs://dataproc-initialization-actions/conda/bootstrap-conda.sh,gs://dataproc-initialization-actions/conda/install-conda-env.sh
-
Run a simple Spark job. You will need to edit the output path in
scratch/cluster-log1p-anndata-gcs.py
to a GCS bucket that you have write permissions for before running this.gcloud dataproc jobs submit pyspark scratch/cluster-log1p-anndata-gcs.py \ --cluster=$DATAPROC_CLUSTER_NAME --region us-east1 --project $PROJECT \ --py-files=anndata_spark.py,scanpy_spark.py,zarr_spark.py
-
Run a Scanpy recipe. Again, you'll need to change the output path.
gcloud dataproc jobs submit pyspark scratch/cluster-recipe-anndata-gcs.py \ --cluster=$DATAPROC_CLUSTER_NAME --region us-east1 --project $PROJECT \ --py-files=anndata_spark.py,scanpy_spark.py,zarr_spark.py
-
Delete the cluster when you've finished.
gcloud dataproc --region us-east1 clusters delete --quiet $DATAPROC_CLUSTER_NAME
-
Start a Dataproc cluster as before. The cluster takes longer to start since it is installing Jupyter in addition to application packages.
export DATAPROC_CLUSTER_NAME=ll-cluster-$USER export PROJECT=hca-scale export ZONE=us-east1-d gcloud dataproc --region us-east1 \ clusters create $DATAPROC_CLUSTER_NAME \ --zone $ZONE \ --master-machine-type n1-standard-1 \ --master-boot-disk-size 500 \ --num-workers 5 \ --worker-machine-type n1-standard-8 \ --worker-boot-disk-size 500 \ --image-version 1.2 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --project $PROJECT \ --initialization-actions gs://ll-dataproc-initialization-actions/scanpy-dataproc-initialization-action.sh
-
Open a browser to the Jupyter web interface. This script opens a tunnel to the ports on the cluster.
scripts/launch-jupyter-interface.sh
-
Upload and run a Jupyter notebook. From the Jupyter main page, upload
scratch/cluster-recipe-anndata-gcs.ipynb
and run it. You can open another browser tab to track the job progress athttp://${DATAPROC_CLUSTER_NAME}-m:8088
. -
Delete the cluster when you've finished.
gcloud dataproc --region us-east1 clusters delete --quiet $DATAPROC_CLUSTER_NAME
These instructions are based on the Dask and Kubernetes documentation.
-
Start a five-node Kubernetes cluster with the following. You'll need to change the environment variables to ones appropriate to your account.
export K8S_CLUSTER_NAME=ll-k8s-cluster-$USER export DASK_CLUSTER_NAME=ll-dask-cluster-$USER export PROJECT=hca-scale export ZONE=us-east1-b export EMAIL=$USER@lasersonlab.org gcloud container clusters create $K8S_CLUSTER_NAME \ --num-nodes=5 \ --machine-type=n1-standard-8 \ --zone $ZONE \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --project $PROJECT
-
Give your account super-user permissions.
kubectl create clusterrolebinding cluster-admin-binding --clusterrole=cluster-admin --user=$EMAIL
-
Install Helm so we can use the Dask chart to install Dask.
kubectl --namespace kube-system create sa tiller kubectl create clusterrolebinding tiller --clusterrole cluster-admin --serviceaccount=kube-system:tiller helm init --service-account tiller helm repo update
-
Create a vanilla Dask cluster.
helm install --name $DASK_CLUSTER_NAME stable/dask
-
Upgrade the cluster to use our configuration (extra Python packages).
helm upgrade $DASK_CLUSTER_NAME stable/dask -f scripts/dask-config.yaml
-
Wait until the cluster has started by checking its status with the following commands:
kubectl get pods kubectl get services
-
When the services are running, open the Jupyter web page using the external IP reported by
kubectl get services
(port 80). The password isdask
. You can open another tab to monitor the Dask job (look for the service whose name ends withscheduler
). -
Upload and run a Jupyter notebook. From the Jupyter main page, upload
scratch/cluster-dask-recipe-anndata-gcs.ipynb
and run it. -
Delete the Dask cluster when you've finished.
helm delete $DASK_CLUSTER_NAME --purge
-
Delete the Kubernetes cluster.
gcloud container clusters delete $K8S_CLUSTER_NAME --zone $ZONE --quiet