Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: vep annotation (dockerised + google batch + airflow) #608

Merged
merged 24 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
59c8b51
feat: custom dockerfile to run ensembl vep
d0choa May 15, 2024
9463a8b
ci: automate vep image build and artifact registry
d0choa May 15, 2024
0ca9677
chore: update airflow google operators (not required)
d0choa May 15, 2024
e7a87a3
feat: working version of google batch airflow vep job
d0choa May 15, 2024
0cf7d7f
Merge branch 'dev' into dsdo_airflow_batch_vep
d0choa May 15, 2024
b900073
feat: working version of google batch airflow vep job
d0choa May 15, 2024
d471bf1
chore: rename jobs
d0choa May 15, 2024
f542c30
feat(VEP): adding CADD plugin
DSuveges May 15, 2024
7c20728
feat: local loftee file
d0choa May 16, 2024
55f04d9
feat: local loftee file
d0choa May 16, 2024
8d91299
feat: working with input bucket full of input files
d0choa May 16, 2024
e83be23
feat: prevent writing html
d0choa May 17, 2024
31345b9
Merge branch 'dev' into dsdo_airflow_batch_vep
d0choa May 17, 2024
7d5983a
fix: minor adjustments to retry strategy
d0choa Jun 17, 2024
89893b0
chore: update with dev
DSuveges Jun 25, 2024
4e4b195
feat(airflow): separating mounting points for input/output and cache
DSuveges Jun 25, 2024
813cba8
fix: typo in airflow dag
DSuveges Jun 25, 2024
1facf9d
Merge branch 'dev' into dsdo_airflow_batch_vep
DSuveges Jun 25, 2024
aa454c9
fix: pre-commit pain
DSuveges Jun 25, 2024
43b613c
Merge branch 'dsdo_airflow_batch_vep' of https://github.com/opentarge…
DSuveges Jun 25, 2024
ecd7a63
chore: rename airflow dag file
DSuveges Jun 25, 2024
97121cc
Merge branch 'dev' into dsdo_airflow_batch_vep
project-defiant Jun 26, 2024
d494066
Merge branch 'dev' into dsdo_airflow_batch_vep
DSuveges Jun 30, 2024
b50f9b5
Merge branch 'dev' into dsdo_airflow_batch_vep
DSuveges Jul 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions .github/workflows/artifact.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ env:
PROJECT_ID: open-targets-genetics-dev
REGION: europe-west1
GAR_LOCATION: europe-west1-docker.pkg.dev/open-targets-genetics-dev
IMAGE_NAME: gentropy-app
REPOSITORY: gentropy-app

jobs:
build-push-artifact:
Expand All @@ -33,7 +33,13 @@ jobs:
gcloud auth configure-docker ${{ env.REGION }}-docker.pkg.dev --quiet

- name: Build image
run: docker build . --tag "${{ env.GAR_LOCATION }}/${{ env.IMAGE_NAME }}/gentropy:${{ github.ref_name }}"
run: docker build . --tag "${{ env.GAR_LOCATION }}/${{ env.REPOSITORY }}/gentropy:${{ github.ref_name }}"

- name: Push image
run: docker push "${{ env.GAR_LOCATION }}/${{ env.IMAGE_NAME }}/gentropy:${{ github.ref_name }}"
run: docker push "${{ env.GAR_LOCATION }}/${{ env.REPOSITORY }}/gentropy:${{ github.ref_name }}"

- name: Build VEP image
run: docker build src/vep --tag "${{ env.GAR_LOCATION }}/${{ env.REPOSITORY }}/custom_ensembl_vep:${{ github.ref_name }}"

- name: Push VEP image
run: docker push "${{ env.GAR_LOCATION }}/${{ env.REPOSITORY }}/custom_ensembl_vep:${{ github.ref_name }}"
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ create-dev-cluster: build ## Spin up a simple dataproc cluster with all dependen
--master-machine-type n1-standard-16 \
--initialization-actions=gs://genetics_etl_python_playground/initialisation/${VERSION_NO}/install_dependencies_on_cluster.sh \
--metadata="PACKAGE=gs://genetics_etl_python_playground/initialisation/${VERSION_NO}/gentropy-${VERSION_NO}-py3-none-any.whl,CONFIGTAR=gs://genetics_etl_python_playground/initialisation/${VERSION_NO}/config.tar.gz" \
--single-node \
--num-workers 4 \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't set these specs by default, I suggest reverting the changes

--primary-worker-type n1-standard-8 \
--worker-machine-type n1-standard-4 \
--worker-boot-disk-size 500 \
--autoscaling_policy=f"projects/${PROJECT_ID}/regions/${REGION}/autoscalingPolicies/eqtl-preprocess", \
--optional-components=JUPYTER \
--enable-component-gateway

Expand Down
284 changes: 284 additions & 0 deletions src/airflow/dags/vep_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
"""Airflow DAG for the harmonisation part of the pipeline."""

from __future__ import annotations

import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, List
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python 3.10 supports list as type hints

Suggested change
from typing import Any, List
from typing import Any


import common_airflow as common
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_batch import (
CloudBatchSubmitJobOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from google.cloud import batch_v1

PROJECT_ID = "open-targets-genetics-dev"
REGION = "europe-west1"

# Required parameters:
VEP_DOCKER_IMAGE = "europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/custom_ensembl_vep:dev"

VCF_INPUT_BUCKET = "gs://genetics_etl_python_playground/vep/test_vep_input"
VEP_OUTPUT_BUCKET = "gs://genetics_etl_python_playground/vep/test_vep_output"
VEP_CACHE_BUCKET = "gs://genetics_etl_python_playground/vep/cache"

# Internal parameters for the docker image:
MOUNT_DIR = "/mnt/disks/share"

# Configuration for the machine types:
MACHINES = {
"VEPMACHINE": {
"machine_type": "e2-standard-4",
"cpu_milli": 2000,
"memory_mib": 2000,
"boot_disk_mib": 10000,
},
}


@dataclass
class PathManager:
"""It is quite complicated to keep track of all the input/output buckets, the corresponding mounting points prefixes etc..."""

VCF_INPUT_BUCKET: str
VEP_OUTPUT_BUCKET: str
VEP_CACHE_BUCKET: str
MOUNT_DIR_ROOT: str

# Derived parameters to find the list of files to process:
input_path: str | None = None
input_bucket: str | None = None

# Derived parameters to initialise the docker image:
path_dictionary: dict[str, dict[str, str]] | None = None

# Derived parameters to point to the right mouting points:
cache_dir: str | None = None
input_dir: str | None = None
output_dir: str | None = None

def __post_init__(self: PathManager) -> None:
"""Build paths based on the input parameters."""
self.path_dictionary = {
"input": {
"remote_path": self.VCF_INPUT_BUCKET.replace("gs://", ""),
"mount_point": f"{self.MOUNT_DIR_ROOT}/input",
},
"output": {
"remote_path": self.VEP_OUTPUT_BUCKET.replace("gs://", ""),
"mount_point": f"{self.MOUNT_DIR_ROOT}/output",
},
"cache": {
"remote_path": self.VEP_CACHE_BUCKET.replace("gs://", ""),
"mount_point": f"{self.MOUNT_DIR_ROOT}/cache",
},
}
# Parameters for fetching files:
self.input_path = self.VCF_INPUT_BUCKET.replace("gs://", "") + "/"
self.input_bucket = self.VCF_INPUT_BUCKET.split("/")[2]

# Parameters for VEP:
self.cache_dir = f"{self.MOUNT_DIR_ROOT}/cache"
self.input_dir = f"{self.MOUNT_DIR_ROOT}/input"
self.output_dir = f"{self.MOUNT_DIR_ROOT}/output"

def get_mount_config(self) -> list[dict[str, str]]:
"""Return the mount configuration.

Returns:
list[dict[str, str]]: The mount configuration.
"""
assert self.path_dictionary is not None, "Path dictionary not initialized."
return list(self.path_dictionary.values())


def create_container_runnable(image: str, commands: List[str]) -> batch_v1.Runnable:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is useful inside common_airflow.py. I've used it in my branch to submit Batch jobs. I suggest moving it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored this function to be able to pass params to the container (to run the gentropy step with the HYDRA_FULL_ERROR=1 env variable.
I suggest the following refactoring:

def create_container_runnable(
    image: str, commands: list[str], **kwargs: Any
) -> batch_v1.Runnable:
    """Create a container runnable for a Batch job with additional optional parameters.

    Args:
        image (str): The Docker image to use.
        commands (list[str]): The commands to run in the container.
        kwargs (Any): Additional optional parameters to set on the container.

    Returns:
        batch_v1.Runnable: The container runnable.
    """
    container = batch_v1.Runnable.Container(
        image_uri=image, entrypoint="/bin/sh", commands=commands, **kwargs
    )
    return batch_v1.Runnable(container=container)

"""Create a container runnable for a Batch job.

Args:
image (str): The Docker image to use.
commands (List[str]): The commands to run in the container.

Returns:
batch_v1.Runnable: The container runnable.
"""
runnable = batch_v1.Runnable()
runnable.container = batch_v1.Runnable.Container()
runnable.container.image_uri = image
runnable.container.entrypoint = "/bin/sh"
runnable.container.commands = commands
return runnable


def create_task_spec(image: str, commands: List[str]) -> batch_v1.TaskSpec:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is useful inside common_airflow.py. I've used it in my branch to submit Batch jobs. I suggest moving it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def create_task_spec(image: str, commands: List[str]) -> batch_v1.TaskSpec:
def create_task_spec(image: str, commands: list[str]) -> batch_v1.TaskSpec:

"""Create a task for a Batch job.

Args:
image (str): The Docker image to use.
commands (List[str]): The commands to run in the container.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
commands (List[str]): The commands to run in the container.
commands (list[str]): The commands to run in the container.


Returns:
batch_v1.TaskSpec: The task specification.
"""
task = batch_v1.TaskSpec()
task.runnables = [
create_container_runnable(image, commands)
# msg_runnable()
]
return task


def set_up_mouting_points(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is useful inside common_airflow.py. I've used it in my branch to submit Batch jobs. I suggest moving it

mounting_points: list[dict[str, str]],
) -> list[batch_v1.Volume]:
"""Set up the mounting points for the container.

Args:
mounting_points (list[dict[str, str]]): The mounting points.

Returns:
list[batch_v1.Volume]: The volumes.
"""
volumes = []
for mount in mounting_points:
gcs_bucket = batch_v1.GCS()
gcs_bucket.remote_path = mount["remote_path"]
gcs_volume = batch_v1.Volume()
gcs_volume.gcs = gcs_bucket
gcs_volume.mount_path = mount["mount_point"]
volumes.append(gcs_volume)
return volumes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I've understand the necessity of mounting the volumes in the case of VEP. Could you briefly explain?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VEP is a command line application, which cannot read or write files from google cloud location. So the folders containing cache, input and output files, need to be mounted.



def create_batch_job(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is useful inside common_airflow.py. I've used it in my branch to submit Batch jobs. I suggest moving it

task: batch_v1.TaskSpec,
machine: str,
task_env: list[batch_v1.Environment],
mounting_points: list[dict[str, str]],
) -> batch_v1.Job:
"""Create a Google Batch job.

Args:
task (batch_v1.TaskSpec): The task specification.
machine (str): The machine type to use.
task_env (list[batch_v1.Environment]): The environment variables for the task.
mounting_points (list[dict[str, str]]): List of mounting points.

Returns:
batch_v1.Job: The Batch job.
"""
resources = batch_v1.ComputeResource()
resources.cpu_milli = MACHINES[machine]["cpu_milli"]
resources.memory_mib = MACHINES[machine]["memory_mib"]
resources.boot_disk_mib = MACHINES[machine]["boot_disk_mib"]
task.compute_resource = resources

task.max_retry_count = 3
task.max_run_duration = "43200s"

# The mounting points are set up and assigned to the task:
task.volumes = set_up_mouting_points(mounting_points)

group = batch_v1.TaskGroup()
group.task_spec = task
group.task_environments = task_env

policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = MACHINES[machine]["machine_type"]
policy.provisioning_model = "SPOT"

instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
allocation_policy = batch_v1.AllocationPolicy()
allocation_policy.instances = [instances]

job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

return job


@task(task_id="vep_annotation")
def vep_annotation(pm: PathManager, **kwargs: Any) -> None:
"""Submit a Batch job to download cache for VEP.

Args:
pm (PathManager): The path manager with all the required path related information.
**kwargs (Any): Keyword arguments.
"""
# Get the filenames to process:
ti = kwargs["ti"]
filenames = [
os.path.basename(os.path.splitext(path)[0])
for path in ti.xcom_pull(task_ids="get_vep_todo_list", key="return_value")
]
# Stop process if no files was found:
assert filenames, "No files found to process."

# Based on the filenames, build the environment variables for the batch job:
task_env = [
batch_v1.Environment(
variables={
"INPUT_FILE": filename + ".tsv",
"OUTPUT_FILE": filename + ".json",
}
)
for filename in filenames
]
# Build the command to run in the container:
command = [
"-c",
rf"vep --cache --offline --format vcf --force_overwrite \
--no_stats \
--dir_cache {pm.cache_dir} \
--input_file {pm.input_dir}/$INPUT_FILE \
--output_file {pm.output_dir}/$OUTPUT_FILE --json \
--dir_plugins {pm.cache_dir}/VEP_plugins \
--sift b \
--polyphen b \
--uniprot \
--check_existing \
--exclude_null_alleles \
--canonical \
--plugin LoF,loftee_path:{pm.cache_dir}/VEP_plugins,gerp_bigwig:{pm.cache_dir}/gerp_conservation_scores.homo_sapiens.GRCh38.bw,human_ancestor_fa:{pm.cache_dir}/human_ancestor.fa.gz,conservation_file:/opt/vep/loftee.sql \
--plugin AlphaMissense,file={pm.cache_dir}/AlphaMissense_hg38.tsv.gz,transcript_match=1 \
--plugin CADD,snv={pm.cache_dir}/CADD_GRCh38_whole_genome_SNVs.tsv.gz",
]
task = create_task_spec(VEP_DOCKER_IMAGE, command)
batch_task = CloudBatchSubmitJobOperator(
task_id="vep_batch_job",
project_id=PROJECT_ID,
region=REGION,
job_name=f"vep-job-{time.strftime('%Y%m%d-%H%M%S')}",
job=create_batch_job(task, "VEPMACHINE", task_env, pm.get_mount_config()),
deferrable=False,
)
batch_task.execute(context=kwargs)


with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics — Ensembl VEP",
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
):
# Initialise parameter manager:
pm = PathManager(VCF_INPUT_BUCKET, VEP_OUTPUT_BUCKET, VEP_CACHE_BUCKET, MOUNT_DIR)

# Get a list of files to process from the input bucket:
get_vep_todo_list = GCSListObjectsOperator(
task_id="get_vep_todo_list",
bucket=pm.input_bucket,
prefix=pm.input_path,
match_glob="**tsv",
)

get_vep_todo_list >> vep_annotation(pm)
2 changes: 1 addition & 1 deletion src/airflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
apache-airflow-providers-google==10.10.1
apache-airflow-providers-google==10.17.0
apache-airflow-providers-apache-beam==5.6.1
psycopg2-binary==2.9.9
31 changes: 31 additions & 0 deletions src/vep/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
FROM ensemblorg/ensembl-vep:release_111.0

USER root

RUN apt-get update && apt-get -y install \
wget \
libncurses5-dev \
libncursesw5-dev \
libbz2-dev \
liblzma-dev \
sqlite3 \
libsqlite3-dev \
cpanminus \
git \
&& rm -rf /var/lib/apt/lists/*

RUN cpanm DBD::SQLite

RUN wget --progress=dot:giga https://github.com/samtools/samtools/releases/download/1.7/samtools-1.7.tar.bz2 && \
tar xjvf samtools-1.7.tar.bz2 && \
cd samtools-1.7 && \
make && \
make install

RUN wget --progress=dot:giga https://personal.broadinstitute.org/konradk/loftee_data/GRCh38/loftee.sql.gz --directory-prefix=/opt/vep/ && \
gunzip /opt/vep/loftee.sql.gz

# Make sure the mounting points exist:
RUN mkdir -p /mnt/disks/share/cache && \
mkdir -p /mnt/disks/share/input && \
mkdir -p /mnt/disks/share/output