-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: adding locus-breaker clumping method (#634)
* feat(schema): adding locus start/end position to the study locus schema * feat(clumping) adding clumping logic + docs * test(clumpin): adding tests for locus breaker method * test(clumping): adding semantic tests for locus breaker clumping * fix(config): reverting gwas significance threshold
- Loading branch information
Showing
6 changed files
with
309 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
"""Locus-breaker clumping method.""" | ||
|
||
from __future__ import annotations | ||
|
||
import sys | ||
from typing import TYPE_CHECKING | ||
|
||
import numpy as np | ||
import pyspark.sql.functions as f | ||
import pyspark.sql.types as t | ||
from pyspark.sql.window import Window | ||
|
||
from gentropy.common.spark_helpers import calculate_neglog_pvalue | ||
from gentropy.dataset.study_locus import StudyLocus | ||
|
||
if TYPE_CHECKING: | ||
|
||
from gentropy.dataset.summary_statistics import SummaryStatistics | ||
|
||
|
||
def locus_breaker( | ||
summary_statistics: SummaryStatistics, | ||
baseline_pvalue_cutoff: float, | ||
distance_cutoff: int, | ||
pvalue_cutoff: float, | ||
flankig_distance: int, | ||
) -> StudyLocus: | ||
"""Identify GWAS associated loci based on the provided p-value and distance cutoff. | ||
- The GWAS associated loci identified by this method have a varying width, and are separated by a distance greater than the provided distance cutoff. | ||
- The distance is only calculted between single point associations that reach the baseline p-value cutoff. | ||
- As the width of the selected genomic region dynamically depends on the loci, the resulting StudyLocus object will contain the locus start and end position. | ||
- To ensure completeness, the locus is extended by a flanking distance in both ends. | ||
Args: | ||
summary_statistics (SummaryStatistics): Input summary statistics dataset. | ||
baseline_pvalue_cutoff (float): baseline significance we consider for the locus. | ||
distance_cutoff (int): minimum distance that separates two loci. | ||
pvalue_cutoff (float): the minimum significance the locus should have. | ||
flankig_distance (int): the distance to extend the locus in both directions. | ||
Returns: | ||
StudyLocus: clumped study loci with locus start and end positions + lead variant from the locus. | ||
""" | ||
# Extract columns from the summary statistics: | ||
columns_sumstats_columns = summary_statistics.df.columns | ||
# Convert pvalue_cutoff to neglog scale: | ||
neglog_pv_cutoff = -np.log10(pvalue_cutoff) | ||
|
||
# First window to calculate the distance between consecutive positions: | ||
w1 = Window.partitionBy("studyId", "chromosome").orderBy("position") | ||
|
||
# Second window to calculate the locus start and end: | ||
w2 = ( | ||
Window.partitionBy("studyId", "chromosome", "locusStart") | ||
.orderBy("position") | ||
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) | ||
) | ||
|
||
# Third window to rank the variants within the locus based on neglog p-value to find top loci: | ||
w3 = Window.partitionBy("studyId", "chromosome", "locusStart", "locusEnd").orderBy( | ||
f.col("negLogPValue").desc() | ||
) | ||
|
||
return StudyLocus( | ||
_df=( | ||
# Applying the baseline p-value cutoff: | ||
summary_statistics.pvalue_filter(baseline_pvalue_cutoff) | ||
# Calculating the neglog p-value for easier sorting: | ||
.df.withColumn( | ||
"negLogPValue", | ||
calculate_neglog_pvalue( | ||
f.col("pValueMantissa"), f.col("pValueExponent") | ||
), | ||
) | ||
# Calculating the distance between consecutive positions, then identifying the locus start and end: | ||
.withColumn("next_position", f.lag(f.col("position")).over(w1)) | ||
.withColumn("distance", f.col("position") - f.col("next_position")) | ||
.withColumn( | ||
"locusStart", | ||
f.when( | ||
(f.col("distance") > distance_cutoff) | f.col("distance").isNull(), | ||
f.col("position"), | ||
), | ||
) | ||
.withColumn( | ||
"locusStart", | ||
f.when( | ||
f.last(f.col("locusStart") - flankig_distance, True).over( | ||
w1.rowsBetween(-sys.maxsize, 0) | ||
) | ||
> 0, | ||
f.last(f.col("locusStart") - flankig_distance, True).over( | ||
w1.rowsBetween(-sys.maxsize, 0) | ||
), | ||
).otherwise(f.lit(0)), | ||
) | ||
.withColumn( | ||
"locusEnd", f.max(f.col("position") + flankig_distance).over(w2) | ||
) | ||
.withColumn("rank", f.rank().over(w3)) | ||
.filter((f.col("rank") == 1) & (f.col("negLogPValue") > neglog_pv_cutoff)) | ||
.select( | ||
*columns_sumstats_columns, | ||
# To make sure that the type of locusStart and locusEnd follows schema of StudyLocus: | ||
f.col("locusStart").cast(t.IntegerType()).alias("locusStart"), | ||
f.col("locusEnd").cast(t.IntegerType()).alias("locusEnd"), | ||
StudyLocus.assign_study_locus_id( | ||
f.col("studyId"), f.col("variantId") | ||
).alias("studyLocusId"), | ||
) | ||
), | ||
_schema=StudyLocus.get_schema(), | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
"""Test locus-breaker clumping.""" | ||
|
||
from __future__ import annotations | ||
|
||
from typing import TYPE_CHECKING | ||
|
||
import pytest | ||
from gentropy.dataset.study_locus import StudyLocus | ||
from gentropy.dataset.summary_statistics import SummaryStatistics | ||
from pyspark.sql import functions as f | ||
from pyspark.sql import types as t | ||
|
||
if TYPE_CHECKING: | ||
from pyspark.sql import SparkSession | ||
|
||
|
||
def test_locus_breaker_return_type( | ||
mock_summary_statistics: SummaryStatistics, | ||
) -> None: | ||
"""Test locus clumping.""" | ||
assert isinstance( | ||
mock_summary_statistics.locus_breaker_clumping(), | ||
StudyLocus, | ||
) | ||
|
||
|
||
class TestLocusBreakerClumping: | ||
"""Test locus breaker clumping.""" | ||
|
||
# Some constants for testing: | ||
distance_cutoff = 3 | ||
pvalue_baseline_cutoff = 0.05 | ||
pvalue_cutoff = 1e-3 | ||
flanking = 2 | ||
|
||
@pytest.fixture(scope="class") | ||
def mock_input( | ||
self: TestLocusBreakerClumping, | ||
spark: SparkSession, | ||
) -> SummaryStatistics: | ||
"""Prepare mock summary statistics for clumping.""" | ||
data = [ | ||
# Block 1: Study1, chromosome 1, expected boundaries: 0-5 | ||
("s1", "chr1", "v1", 1, -2), | ||
("s1", "chr1", "v1", 2, -4), | ||
("s1", "chr1", "top_loci", 3, -5), | ||
("s1", "chr1", "v1", 4, -1), # <- will be dropped: not reaching baseline | ||
# Block 2: Study1, chromosome 1, expected boundaries: 5-10 | ||
("s1", "chr1", "top_loci", 7, -4), | ||
("s1", "chr1", "v1", 8, -2), | ||
# Block 3: Study1, chromosome 2, expected boundaries: 6-12 | ||
("s1", "chr2", "v1", 8, -2), | ||
("s1", "chr2", "v1", 9, -3), | ||
("s1", "chr2", "top_loci", 10, -5), | ||
# Block 4: Study1, chromosome 2 | ||
("s1", "chr2", "v1", 14, -2), # <- will be dropped: low p-value | ||
# Block 5: Study2, chromosome 2, expected boundaries: 6-12 | ||
("s2", "chr2", "v1", 8, -2), | ||
("s2", "chr2", "v1", 9, -3), | ||
("s2", "chr2", "top_loci", 10, -6), | ||
# Block 6: Study2, chromosome 2, expected boundaries: 12-16 | ||
("s2", "chr2", "top_loci", 14, -5), | ||
] | ||
df = spark.createDataFrame( | ||
data, ["studyId", "chromosome", "variantId", "position", "pValueExponent"] | ||
).select( | ||
"studyId", | ||
"chromosome", | ||
"variantId", | ||
f.col("position").cast(t.IntegerType()), | ||
f.col("pValueExponent").cast(t.IntegerType()), | ||
f.lit(1.0).cast(t.FloatType()).alias("pValueMantissa"), | ||
f.lit(1.0).cast(t.DoubleType()).alias("beta"), | ||
) | ||
|
||
return SummaryStatistics(_df=df, _schema=SummaryStatistics.get_schema()) | ||
|
||
@pytest.fixture(scope="class") | ||
def clumped_data( | ||
self: TestLocusBreakerClumping, mock_input: SummaryStatistics | ||
) -> StudyLocus: | ||
"""Apply method and store for clumped data.""" | ||
return mock_input.locus_breaker_clumping( | ||
self.pvalue_baseline_cutoff, | ||
self.distance_cutoff, | ||
self.pvalue_cutoff, | ||
self.flanking, | ||
).persist() | ||
|
||
def test_return_type( | ||
self: TestLocusBreakerClumping, clumped_data: StudyLocus | ||
) -> None: | ||
"""Testing return type.""" | ||
assert isinstance( | ||
clumped_data, StudyLocus | ||
), f"Unexpected return type: {type(clumped_data)}" | ||
|
||
def test_number_of_loci( | ||
self: TestLocusBreakerClumping, clumped_data: StudyLocus | ||
) -> None: | ||
"""Testing return type.""" | ||
assert ( | ||
clumped_data.df.count() == 5 | ||
), f"Unexpected number of loci: {clumped_data.df.count()}" | ||
|
||
def test_top_loci(self: TestLocusBreakerClumping, clumped_data: StudyLocus) -> None: | ||
"""Testing selected top-loci.""" | ||
top_loci_variants = clumped_data.df.select("variantId").distinct().collect() | ||
|
||
assert ( | ||
len(top_loci_variants) == 1 | ||
), f"Unexpected number of top loci: {len(top_loci_variants)} ({top_loci_variants})" | ||
|
||
assert ( | ||
top_loci_variants[0]["variantId"] == "top_loci" | ||
), f"Unexpected top locus: {top_loci_variants[0]['variantId']}" | ||
|
||
def test_locus_boundaries( | ||
self: TestLocusBreakerClumping, clumped_data: StudyLocus | ||
) -> None: | ||
"""Testing locus boundaries.""" | ||
locus_start = [ | ||
row["locusStart"] for row in clumped_data.df.select("locusStart").collect() | ||
] | ||
|
||
locus_end = [ | ||
row["locusEnd"] for row in clumped_data.df.select("locusEnd").collect() | ||
] | ||
|
||
assert locus_start == [0, 5, 6, 6, 12], f"Unexpected locus start: {locus_start}" | ||
|
||
assert locus_end == [5, 10, 12, 12, 16], f"Unexpected locus end: {locus_end}" |