Skip to content

Commit

Permalink
refactor: moving all variant coordinates to GnomAD (#566)
Browse files Browse the repository at this point in the history
* refactor: removing gnomAD variant id. GonomadID is the variant id

* refactor: converting ld index coordinates to gnomad

* refactor: movig GWAS Catalog to gnomad coordinates

* test: fixing doctest

* refactor: touching DAG for preparing GnomAD dataset

* fix: adding updated lockfile

* fix: replace hyphens with underscores in variant identifiers

* fix: removing commented line

---------

Co-authored-by: David Ochoa <[email protected]>
  • Loading branch information
DSuveges and d0choa committed May 15, 2024
1 parent c066327 commit bb9f9c6
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 105 deletions.
6 changes: 3 additions & 3 deletions src/airflow/dags/common_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ def create_cluster(
# Create a disk config section if it does not exist.
cluster_config[worker_section].setdefault("disk_config", {})
# Specify the number of local SSDs.
cluster_config[worker_section]["disk_config"][
"num_local_ssds"
] = num_local_ssds
cluster_config[worker_section]["disk_config"]["num_local_ssds"] = (
num_local_ssds
)

# Return the cluster creation operator.
return DataprocCreateClusterOperator(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Airflow DAG for the Preprocess part of the pipeline."""

from __future__ import annotations

from pathlib import Path

import common_airflow as common
from airflow.models.dag import DAG

CLUSTER_NAME = "otg-preprocess"
CLUSTER_NAME = "gnomad-preprocess"

ALL_STEPS = [
"ot_ld_index",
Expand Down
6 changes: 0 additions & 6 deletions src/gentropy/assets/schemas/variant_annotation.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@
"nullable": false,
"metadata": {}
},
{
"name": "gnomadVariantId",
"type": "string",
"nullable": false,
"metadata": {}
},
{
"name": "referenceAllele",
"type": "string",
Expand Down
36 changes: 1 addition & 35 deletions src/gentropy/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Common functions in the Genetics datasets."""

from __future__ import annotations

import sys
Expand Down Expand Up @@ -208,41 +209,6 @@ def parse_pvalue(pv: Column) -> list[Column]:
]


def convert_gnomad_position_to_ensembl(
position: Column, reference: Column, alternate: Column
) -> Column:
"""Convert GnomAD variant position to Ensembl variant position.
For indels (the reference or alternate allele is longer than 1), then adding 1 to the position, for SNPs,
the position is unchanged. More info about the problem: https://www.biostars.org/p/84686/
Args:
position (Column): Position of the variant in GnomAD's coordinates system.
reference (Column): The reference allele in GnomAD's coordinates system.
alternate (Column): The alternate allele in GnomAD's coordinates system.
Returns:
Column: The position of the variant in the Ensembl genome.
Examples:
>>> d = [(1, "A", "C"), (2, "AA", "C"), (3, "A", "AA")]
>>> df = spark.createDataFrame(d).toDF("position", "reference", "alternate")
>>> df.withColumn("new_position", convert_gnomad_position_to_ensembl(f.col("position"), f.col("reference"), f.col("alternate"))).show()
+--------+---------+---------+------------+
|position|reference|alternate|new_position|
+--------+---------+---------+------------+
| 1| A| C| 1|
| 2| AA| C| 3|
| 3| A| AA| 4|
+--------+---------+---------+------------+
<BLANKLINE>
"""
return f.when(
(f.length(reference) > 1) | (f.length(alternate) > 1), position + 1
).otherwise(position)


def _liftover_loci(
variant_index: Table, chain_path: str, dest_reference_genome: str
) -> Table:
Expand Down
24 changes: 8 additions & 16 deletions src/gentropy/datasource/gnomad/ld.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Step to import filtered version of a LD matrix (block matrix)."""

from __future__ import annotations

import sys
Expand All @@ -13,7 +14,7 @@
from pyspark.sql import Window

from gentropy.common.spark_helpers import get_top_ranked_in_window, get_value_from_row
from gentropy.common.utils import _liftover_loci, convert_gnomad_position_to_ensembl
from gentropy.common.utils import _liftover_loci
from gentropy.dataset.ld_index import LDIndex

if TYPE_CHECKING:
Expand Down Expand Up @@ -177,24 +178,15 @@ def _process_variant_indices(
ld_index_38.to_spark()
# Filter out variants where the liftover failed
.filter(f.col("`locus_GRCh38.position`").isNotNull())
.withColumn(
"chromosome", f.regexp_replace("`locus_GRCh38.contig`", "chr", "")
)
.withColumn(
"position",
convert_gnomad_position_to_ensembl(
f.col("`locus_GRCh38.position`"),
f.col("`alleles`").getItem(0),
f.col("`alleles`").getItem(1),
),
)
.select(
"chromosome",
"position",
f.regexp_replace("`locus_GRCh38.contig`", "chr", "").alias(
"chromosome"
),
f.col("`locus_GRCh38.position`").alias("position"),
f.concat_ws(
"_",
f.col("chromosome"),
f.col("position"),
f.regexp_replace("`locus_GRCh38.contig`", "chr", ""),
f.col("`locus_GRCh38.position`"),
f.col("`alleles`").getItem(0),
f.col("`alleles`").getItem(1),
).alias("variantId"),
Expand Down
44 changes: 4 additions & 40 deletions src/gentropy/datasource/gnomad/variants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Import gnomAD variants dataset."""

from __future__ import annotations

from dataclasses import dataclass, field
Expand All @@ -9,7 +10,7 @@
from gentropy.dataset.variant_annotation import VariantAnnotation

if TYPE_CHECKING:
from hail.expr.expressions import Int32Expression, StringExpression
pass


@dataclass
Expand Down Expand Up @@ -39,29 +40,6 @@ class GnomADVariants:
]
)

@staticmethod
def _convert_gnomad_position_to_ensembl_hail(
position: Int32Expression,
reference: StringExpression,
alternate: StringExpression,
) -> Int32Expression:
"""Convert GnomAD variant position to Ensembl variant position in hail table.
For indels (the reference or alternate allele is longer than 1), then adding 1 to the position, for SNPs, the position is unchanged.
More info about the problem: https://www.biostars.org/p/84686/
Args:
position (Int32Expression): Position of the variant in the GnomAD genome.
reference (StringExpression): The reference allele.
alternate (StringExpression): The alternate allele
Returns:
Int32Expression: The position of the variant according to Ensembl genome.
"""
return hl.if_else(
(reference.length() > 1) | (alternate.length() > 1), position + 1, position
)

def as_variant_annotation(self: GnomADVariants) -> VariantAnnotation:
"""Generate variant annotation dataset from gnomAD.
Expand Down Expand Up @@ -93,7 +71,7 @@ def as_variant_annotation(self: GnomADVariants) -> VariantAnnotation:
return VariantAnnotation(
_df=(
ht.select(
gnomadVariantId=hl.str("-").join(
variantId=hl.str("_").join(
[
ht.locus.contig.replace("chr", ""),
hl.str(ht.locus.position),
Expand All @@ -102,21 +80,7 @@ def as_variant_annotation(self: GnomADVariants) -> VariantAnnotation:
]
),
chromosome=ht.locus.contig.replace("chr", ""),
position=GnomADVariants._convert_gnomad_position_to_ensembl_hail(
ht.locus.position, ht.alleles[0], ht.alleles[1]
),
variantId=hl.str("_").join(
[
ht.locus.contig.replace("chr", ""),
hl.str(
GnomADVariants._convert_gnomad_position_to_ensembl_hail(
ht.locus.position, ht.alleles[0], ht.alleles[1]
)
),
ht.alleles[0],
ht.alleles[1],
]
),
position=ht.locus.position,
chromosomeB37=ht.locus_GRCh37.contig.replace("chr", ""),
positionB37=ht.locus_GRCh37.position,
referenceAllele=ht.alleles[0],
Expand Down
51 changes: 47 additions & 4 deletions src/gentropy/datasource/gwas_catalog/associations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Study Locus for GWAS Catalog data source."""

from __future__ import annotations

import importlib.resources as pkg_resources
Expand Down Expand Up @@ -31,6 +32,40 @@
class GWASCatalogCuratedAssociationsParser:
"""GWAS Catalog curated associations parser."""

@staticmethod
def convert_gnomad_position_to_ensembl(
position: Column, reference: Column, alternate: Column
) -> Column:
"""Convert GnomAD variant position to Ensembl variant position.
For indels (the reference or alternate allele is longer than 1), then adding 1 to the position, for SNPs,
the position is unchanged. More info about the problem: https://www.biostars.org/p/84686/
Args:
position (Column): Position of the variant in GnomAD's coordinates system.
reference (Column): The reference allele in GnomAD's coordinates system.
alternate (Column): The alternate allele in GnomAD's coordinates system.
Returns:
Column: The position of the variant in the Ensembl genome.
Examples:
>>> d = [(1, "A", "C"), (2, "AA", "C"), (3, "A", "AA")]
>>> df = spark.createDataFrame(d).toDF("position", "reference", "alternate")
>>> df.withColumn("new_position", GWASCatalogCuratedAssociationsParser.convert_gnomad_position_to_ensembl(f.col("position"), f.col("reference"), f.col("alternate"))).show()
+--------+---------+---------+------------+
|position|reference|alternate|new_position|
+--------+---------+---------+------------+
| 1| A| C| 1|
| 2| AA| C| 3|
| 3| A| AA| 4|
+--------+---------+---------+------------+
<BLANKLINE>
"""
return f.when(
(f.length(reference) > 1) | (f.length(alternate) > 1), position + 1
).otherwise(position)

@staticmethod
def _parse_pvalue(pvalue: Column) -> tuple[Column, Column]:
"""Parse p-value column.
Expand Down Expand Up @@ -178,7 +213,8 @@ def _map_to_variant_annotation_variants(
gwas_associations_subset = gwas_associations.select(
"studyLocusId",
f.col("CHR_ID").alias("chromosome"),
f.col("CHR_POS").cast(IntegerType()).alias("position"),
# The positions from GWAS Catalog are from ensembl that causes discrepancy for indels:
f.col("CHR_POS").cast(IntegerType()).alias("ensemblPosition"),
# List of all SNPs associated with the variant
GWASCatalogCuratedAssociationsParser._collect_rsids(
f.split(f.col("SNPS"), "; ").getItem(0),
Expand All @@ -194,6 +230,11 @@ def _map_to_variant_annotation_variants(
va_subset = variant_annotation.df.select(
"variantId",
"chromosome",
# Calculate the position in Ensembl coordinates for indels:
GWASCatalogCuratedAssociationsParser.convert_gnomad_position_to_ensembl(
f.col("position"), f.col("referenceAllele"), f.col("alternateAllele")
).alias("ensemblPosition"),
# Keeping GnomAD position:
"position",
f.col("rsIds").alias("rsIdsGnomad"),
"referenceAllele",
Expand All @@ -202,9 +243,11 @@ def _map_to_variant_annotation_variants(
variant_annotation.max_maf().alias("maxMaf"),
).join(
f.broadcast(
gwas_associations_subset.select("chromosome", "position").distinct()
gwas_associations_subset.select(
"chromosome", "ensemblPosition"
).distinct()
),
on=["chromosome", "position"],
on=["chromosome", "ensemblPosition"],
how="inner",
)

Expand All @@ -213,7 +256,7 @@ def _map_to_variant_annotation_variants(
filtered_associations = (
gwas_associations_subset.join(
f.broadcast(va_subset),
on=["chromosome", "position"],
on=["chromosome", "ensemblPosition"],
how="left",
)
.withColumn(
Expand Down

0 comments on commit bb9f9c6

Please sign in to comment.