diff --git a/src/airflow/dags/common_airflow.py b/src/airflow/dags/common_airflow.py index 9d4f18503..f3ec89bb2 100644 --- a/src/airflow/dags/common_airflow.py +++ b/src/airflow/dags/common_airflow.py @@ -127,9 +127,9 @@ def create_cluster( # Create a disk config section if it does not exist. cluster_config[worker_section].setdefault("disk_config", {}) # Specify the number of local SSDs. - cluster_config[worker_section]["disk_config"][ - "num_local_ssds" - ] = num_local_ssds + cluster_config[worker_section]["disk_config"]["num_local_ssds"] = ( + num_local_ssds + ) # Return the cluster creation operator. return DataprocCreateClusterOperator( diff --git a/src/airflow/dags/dag_preprocess.py b/src/airflow/dags/gnomad_preprocess.py similarity index 94% rename from src/airflow/dags/dag_preprocess.py rename to src/airflow/dags/gnomad_preprocess.py index 4439914c5..22e7fa056 100644 --- a/src/airflow/dags/dag_preprocess.py +++ b/src/airflow/dags/gnomad_preprocess.py @@ -1,4 +1,5 @@ """Airflow DAG for the Preprocess part of the pipeline.""" + from __future__ import annotations from pathlib import Path @@ -6,7 +7,7 @@ import common_airflow as common from airflow.models.dag import DAG -CLUSTER_NAME = "otg-preprocess" +CLUSTER_NAME = "gnomad-preprocess" ALL_STEPS = [ "ot_ld_index", diff --git a/src/gentropy/assets/schemas/variant_annotation.json b/src/gentropy/assets/schemas/variant_annotation.json index 826eabaf9..ab8767389 100644 --- a/src/gentropy/assets/schemas/variant_annotation.json +++ b/src/gentropy/assets/schemas/variant_annotation.json @@ -19,12 +19,6 @@ "nullable": false, "metadata": {} }, - { - "name": "gnomadVariantId", - "type": "string", - "nullable": false, - "metadata": {} - }, { "name": "referenceAllele", "type": "string", diff --git a/src/gentropy/common/utils.py b/src/gentropy/common/utils.py index 4cdda9ad2..3ec361a50 100644 --- a/src/gentropy/common/utils.py +++ b/src/gentropy/common/utils.py @@ -1,4 +1,5 @@ """Common functions in the Genetics datasets.""" + from __future__ import annotations import sys @@ -208,41 +209,6 @@ def parse_pvalue(pv: Column) -> list[Column]: ] -def convert_gnomad_position_to_ensembl( - position: Column, reference: Column, alternate: Column -) -> Column: - """Convert GnomAD variant position to Ensembl variant position. - - For indels (the reference or alternate allele is longer than 1), then adding 1 to the position, for SNPs, - the position is unchanged. More info about the problem: https://www.biostars.org/p/84686/ - - Args: - position (Column): Position of the variant in GnomAD's coordinates system. - reference (Column): The reference allele in GnomAD's coordinates system. - alternate (Column): The alternate allele in GnomAD's coordinates system. - - Returns: - Column: The position of the variant in the Ensembl genome. - - Examples: - >>> d = [(1, "A", "C"), (2, "AA", "C"), (3, "A", "AA")] - >>> df = spark.createDataFrame(d).toDF("position", "reference", "alternate") - >>> df.withColumn("new_position", convert_gnomad_position_to_ensembl(f.col("position"), f.col("reference"), f.col("alternate"))).show() - +--------+---------+---------+------------+ - |position|reference|alternate|new_position| - +--------+---------+---------+------------+ - | 1| A| C| 1| - | 2| AA| C| 3| - | 3| A| AA| 4| - +--------+---------+---------+------------+ - - - """ - return f.when( - (f.length(reference) > 1) | (f.length(alternate) > 1), position + 1 - ).otherwise(position) - - def _liftover_loci( variant_index: Table, chain_path: str, dest_reference_genome: str ) -> Table: diff --git a/src/gentropy/datasource/gnomad/ld.py b/src/gentropy/datasource/gnomad/ld.py index 02cb15c7f..471b87cae 100644 --- a/src/gentropy/datasource/gnomad/ld.py +++ b/src/gentropy/datasource/gnomad/ld.py @@ -1,4 +1,5 @@ """Step to import filtered version of a LD matrix (block matrix).""" + from __future__ import annotations import sys @@ -13,7 +14,7 @@ from pyspark.sql import Window from gentropy.common.spark_helpers import get_top_ranked_in_window, get_value_from_row -from gentropy.common.utils import _liftover_loci, convert_gnomad_position_to_ensembl +from gentropy.common.utils import _liftover_loci from gentropy.dataset.ld_index import LDIndex if TYPE_CHECKING: @@ -177,24 +178,15 @@ def _process_variant_indices( ld_index_38.to_spark() # Filter out variants where the liftover failed .filter(f.col("`locus_GRCh38.position`").isNotNull()) - .withColumn( - "chromosome", f.regexp_replace("`locus_GRCh38.contig`", "chr", "") - ) - .withColumn( - "position", - convert_gnomad_position_to_ensembl( - f.col("`locus_GRCh38.position`"), - f.col("`alleles`").getItem(0), - f.col("`alleles`").getItem(1), - ), - ) .select( - "chromosome", - "position", + f.regexp_replace("`locus_GRCh38.contig`", "chr", "").alias( + "chromosome" + ), + f.col("`locus_GRCh38.position`").alias("position"), f.concat_ws( "_", - f.col("chromosome"), - f.col("position"), + f.regexp_replace("`locus_GRCh38.contig`", "chr", ""), + f.col("`locus_GRCh38.position`"), f.col("`alleles`").getItem(0), f.col("`alleles`").getItem(1), ).alias("variantId"), diff --git a/src/gentropy/datasource/gnomad/variants.py b/src/gentropy/datasource/gnomad/variants.py index 58f5f8093..b06b4ba6c 100644 --- a/src/gentropy/datasource/gnomad/variants.py +++ b/src/gentropy/datasource/gnomad/variants.py @@ -1,4 +1,5 @@ """Import gnomAD variants dataset.""" + from __future__ import annotations from dataclasses import dataclass, field @@ -9,7 +10,7 @@ from gentropy.dataset.variant_annotation import VariantAnnotation if TYPE_CHECKING: - from hail.expr.expressions import Int32Expression, StringExpression + pass @dataclass @@ -39,29 +40,6 @@ class GnomADVariants: ] ) - @staticmethod - def _convert_gnomad_position_to_ensembl_hail( - position: Int32Expression, - reference: StringExpression, - alternate: StringExpression, - ) -> Int32Expression: - """Convert GnomAD variant position to Ensembl variant position in hail table. - - For indels (the reference or alternate allele is longer than 1), then adding 1 to the position, for SNPs, the position is unchanged. - More info about the problem: https://www.biostars.org/p/84686/ - - Args: - position (Int32Expression): Position of the variant in the GnomAD genome. - reference (StringExpression): The reference allele. - alternate (StringExpression): The alternate allele - - Returns: - Int32Expression: The position of the variant according to Ensembl genome. - """ - return hl.if_else( - (reference.length() > 1) | (alternate.length() > 1), position + 1, position - ) - def as_variant_annotation(self: GnomADVariants) -> VariantAnnotation: """Generate variant annotation dataset from gnomAD. @@ -93,7 +71,7 @@ def as_variant_annotation(self: GnomADVariants) -> VariantAnnotation: return VariantAnnotation( _df=( ht.select( - gnomadVariantId=hl.str("-").join( + variantId=hl.str("_").join( [ ht.locus.contig.replace("chr", ""), hl.str(ht.locus.position), @@ -102,21 +80,7 @@ def as_variant_annotation(self: GnomADVariants) -> VariantAnnotation: ] ), chromosome=ht.locus.contig.replace("chr", ""), - position=GnomADVariants._convert_gnomad_position_to_ensembl_hail( - ht.locus.position, ht.alleles[0], ht.alleles[1] - ), - variantId=hl.str("_").join( - [ - ht.locus.contig.replace("chr", ""), - hl.str( - GnomADVariants._convert_gnomad_position_to_ensembl_hail( - ht.locus.position, ht.alleles[0], ht.alleles[1] - ) - ), - ht.alleles[0], - ht.alleles[1], - ] - ), + position=ht.locus.position, chromosomeB37=ht.locus_GRCh37.contig.replace("chr", ""), positionB37=ht.locus_GRCh37.position, referenceAllele=ht.alleles[0], diff --git a/src/gentropy/datasource/gwas_catalog/associations.py b/src/gentropy/datasource/gwas_catalog/associations.py index 5ff499f2c..658facea7 100644 --- a/src/gentropy/datasource/gwas_catalog/associations.py +++ b/src/gentropy/datasource/gwas_catalog/associations.py @@ -1,4 +1,5 @@ """Study Locus for GWAS Catalog data source.""" + from __future__ import annotations import importlib.resources as pkg_resources @@ -31,6 +32,40 @@ class GWASCatalogCuratedAssociationsParser: """GWAS Catalog curated associations parser.""" + @staticmethod + def convert_gnomad_position_to_ensembl( + position: Column, reference: Column, alternate: Column + ) -> Column: + """Convert GnomAD variant position to Ensembl variant position. + + For indels (the reference or alternate allele is longer than 1), then adding 1 to the position, for SNPs, + the position is unchanged. More info about the problem: https://www.biostars.org/p/84686/ + + Args: + position (Column): Position of the variant in GnomAD's coordinates system. + reference (Column): The reference allele in GnomAD's coordinates system. + alternate (Column): The alternate allele in GnomAD's coordinates system. + + Returns: + Column: The position of the variant in the Ensembl genome. + + Examples: + >>> d = [(1, "A", "C"), (2, "AA", "C"), (3, "A", "AA")] + >>> df = spark.createDataFrame(d).toDF("position", "reference", "alternate") + >>> df.withColumn("new_position", GWASCatalogCuratedAssociationsParser.convert_gnomad_position_to_ensembl(f.col("position"), f.col("reference"), f.col("alternate"))).show() + +--------+---------+---------+------------+ + |position|reference|alternate|new_position| + +--------+---------+---------+------------+ + | 1| A| C| 1| + | 2| AA| C| 3| + | 3| A| AA| 4| + +--------+---------+---------+------------+ + + """ + return f.when( + (f.length(reference) > 1) | (f.length(alternate) > 1), position + 1 + ).otherwise(position) + @staticmethod def _parse_pvalue(pvalue: Column) -> tuple[Column, Column]: """Parse p-value column. @@ -178,7 +213,8 @@ def _map_to_variant_annotation_variants( gwas_associations_subset = gwas_associations.select( "studyLocusId", f.col("CHR_ID").alias("chromosome"), - f.col("CHR_POS").cast(IntegerType()).alias("position"), + # The positions from GWAS Catalog are from ensembl that causes discrepancy for indels: + f.col("CHR_POS").cast(IntegerType()).alias("ensemblPosition"), # List of all SNPs associated with the variant GWASCatalogCuratedAssociationsParser._collect_rsids( f.split(f.col("SNPS"), "; ").getItem(0), @@ -194,6 +230,11 @@ def _map_to_variant_annotation_variants( va_subset = variant_annotation.df.select( "variantId", "chromosome", + # Calculate the position in Ensembl coordinates for indels: + GWASCatalogCuratedAssociationsParser.convert_gnomad_position_to_ensembl( + f.col("position"), f.col("referenceAllele"), f.col("alternateAllele") + ).alias("ensemblPosition"), + # Keeping GnomAD position: "position", f.col("rsIds").alias("rsIdsGnomad"), "referenceAllele", @@ -202,9 +243,11 @@ def _map_to_variant_annotation_variants( variant_annotation.max_maf().alias("maxMaf"), ).join( f.broadcast( - gwas_associations_subset.select("chromosome", "position").distinct() + gwas_associations_subset.select( + "chromosome", "ensemblPosition" + ).distinct() ), - on=["chromosome", "position"], + on=["chromosome", "ensemblPosition"], how="inner", ) @@ -213,7 +256,7 @@ def _map_to_variant_annotation_variants( filtered_associations = ( gwas_associations_subset.join( f.broadcast(va_subset), - on=["chromosome", "position"], + on=["chromosome", "ensemblPosition"], how="left", ) .withColumn(