Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(schema): recursive validation of arbitrarily deep nested structure #790

Merged
merged 15 commits into from
Sep 30, 2024

Conversation

DSuveges
Copy link
Contributor

@DSuveges DSuveges commented Sep 25, 2024

✨ Context

So far the validation of datasets were not fully bullet proof: deeply nested (array of array) fields were not validated and automatically passed the validation. This was an issue, because the VEP parser had a bug yielding inSilicoPredictor column with a schema array of array of struct instead of array of struct. The validation was passing, tests were passing, and the bug was discovered too late.

🛠 What does this PR implement

  • New, recursive schema comparison methods for arrays and structs.
  • Updated schema validation method in the Dataset class relying on the new comparing methods.
  • New SchemaValidationError class that is called upon failing validation.
  • The old schema flattening methods are kept because it might have other usages besides the schema validation.
  • Tests for the new schema comparison functions.
  • Temporarily skipping VEP parser tests.

@github-actions github-actions bot added bug Something isn't working size-M Dataset labels Sep 25, 2024
@d0choa
Copy link
Collaborator

d0choa commented Sep 25, 2024

🤯 WOW

@DSuveges DSuveges linked an issue Sep 25, 2024 that may be closed by this pull request
@DSuveges
Copy link
Contributor Author

Currently there are two tests failing for SchemaValidationError:

=========================== short test summary info ============================
FAILED tests/gentropy/datasource/ensembl/test_vep_variants.py::TestVEPParser::test_extract_variant_index_from_vep - gentropy.common.schemas.SchemaValidationError: Schema validation failed for VariantIndex
Errors:
  columns_with_non_matching_type: For column "inSilicoPredictors[][]" found array instead of struct
FAILED tests/gentropy/datasource/ensembl/test_vep_variants.py::TestVEPParser::test_conversion - gentropy.common.schemas.SchemaValidationError: Schema validation failed for VariantIndex
Errors:
  columns_with_non_matching_type: For column "inSilicoPredictors[][]" found array instead of struct
===== 2 failed, 414 passed, 1 skipped, 1603 warnings in 208.82s (0:03:28) ======

Details on the error:

E           gentropy.common.schemas.SchemaValidationError: Schema validation failed for VariantIndex
E           Errors:
E             columns_with_non_matching_type: For column "inSilicoPredictors[][]" found array instead of struct
src/gentropy/dataset/dataset.py:152: SchemaValidationError

These are issues due to a bug in the VEP parser. For now, I'm skipping these tests.

@DSuveges DSuveges marked this pull request as ready for review September 25, 2024 15:52
@project-defiant
Copy link
Contributor

@DSuveges If we transition to pyspark 3.5 this feature could be easly ported from https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.testing.assertSchemaEqual.html

@project-defiant
Copy link
Contributor

project-defiant commented Sep 27, 2024

@DSuveges unfortunately there is another issue with the schema validation

def test_schema_is_correct(spark: SparkSession) -> None:
    """Test schema is correct."""
    from gentropy.common.schemas import compare_struct_schemas

    # fmt: off
    schema = t.StructType(
        [
            t.StructField(
                "arr",
                t.ArrayType(
                    t.StructType(
                        [
                            t.StructField("a", t.StringType()),
                            t.StructField("b", t.IntegerType())
                        ]
                    )
                )
            ),
            t.StructField("id", t.IntegerType())
        ]
    )
    schema2 = t.StructType(
        [
            t.StructField(
                "arr",
                t.ArrayType(
                    t.StructType(
                        [
                            t.StructField("b", t.IntegerType()),
                            t.StructField("a", t.StringType())
                        ]
                    )
                )
            ),
            t.StructField("id", t.IntegerType())
        ]
    )
    df1 = spark.createDataFrame([([("a", 1,)], 1),], schema=schema)
    df2 = spark.createDataFrame([([(1,"a",)], 1),], schema=schema2)
    diff = compare_struct_schemas(
        observed_schema=df1.schema,
        expected_schema=df2.schema,
    )
    assert len(diff) != 0

The above test does not fail, although the schema is almost the same, but the nested struct fields are not ordered correctly. This is the issue the variant index schema is facing now as well.

vep_output_json_path=  "gs://ot_orchestration/releases/26.09/variants/annotated_variants"
variant_index_path = "gs://ot_orchestration/releases/26.09/variant_index"
gnomad_variant_annotations_path = "gs://genetics_etl_python_playground/static_assets/gnomad_variants"
hash_threshold = 300
from gentropy.dataset.variant_index import VariantIndex
from gentropy.datasource.ensembl.vep_parser import VariantEffectPredictorParser
variant_index = VariantEffectPredictorParser.extract_variant_index_from_vep(
        session.spark, vep_output_json_path, hash_threshold
    )


annotations = VariantIndex.from_parquet(
    session=session,
    path=gnomad_variant_annotations_path,
    recursiveFileLookup=True,
)
variant_index.df.printSchema()
annotations.df.printSchema()

results in

root
 |-- variantId: string (nullable = false)
 |-- chromosome: string (nullable = true)
 |-- position: integer (nullable = true)
 |-- referenceAllele: string (nullable = true)
 |-- alternateAllele: string (nullable = true)
 |-- inSilicoPredictors: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- method: string (nullable = true)
 |    |    |-- assessment: string (nullable = true)
 |    |    |-- score: float (nullable = true)
 |    |    |-- assessmentFlag: string (nullable = true)
 |    |    |-- targetId: string (nullable = true)
 |-- mostSevereConsequenceId: string (nullable = true)
 |-- hgvsId: string (nullable = true)
 |-- transcriptConsequences: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- variantFunctionalConsequenceIds: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- consequenceScore: float (nullable = true)
 |    |    |-- aminoAcidChange: string (nullable = true)
 |    |    |-- uniprotAccessions: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- isEnsemblCanonical: boolean (nullable = false)
 |    |    |-- codons: string (nullable = true)
 |    |    |-- distanceFromFootprint: long (nullable = true)
 |    |    |-- distanceFromTss: long (nullable = true)
 |    |    |-- appris: string (nullable = true)
 |    |    |-- maneSelect: string (nullable = true)
 |    |    |-- targetId: string (nullable = true)
 |    |    |-- impact: string (nullable = true)
 |    |    |-- lofteePrediction: string (nullable = true)
 |    |    |-- siftPrediction: float (nullable = true)
 |    |    |-- polyphenPrediction: float (nullable = true)
 |    |    |-- transcriptId: string (nullable = true)
 |    |    |-- transcriptIndex: integer (nullable = false)
 |-- rsIds: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- alleleFrequencies: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- populationName: string (nullable = true)
 |    |    |-- alleleFrequency: double (nullable = true)
 |-- dbXrefs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- source: string (nullable = true)

root
 |-- variantId: string (nullable = true)
 |-- chromosome: string (nullable = true)
 |-- position: integer (nullable = true)
 |-- referenceAllele: string (nullable = true)
 |-- alternateAllele: string (nullable = true)
 |-- inSilicoPredictors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- method: string (nullable = true)
 |    |    |-- assessment: string (nullable = true)
 |    |    |-- score: float (nullable = true)
 |    |    |-- assessmentFlag: string (nullable = true)
 |    |    |-- targetId: string (nullable = true)
 |-- mostSevereConsequenceId: string (nullable = true)
 |-- transcriptConsequences: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- variantFunctionalConsequenceIds: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- aminoAcidChange: string (nullable = true)
 |    |    |-- uniprotAccessions: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- isEnsemblCanonical: boolean (nullable = true)
 |    |    |-- codons: string (nullable = true)
 |    |    |-- distanceFromFootprint: long (nullable = true)
 |    |    |-- distanceFromTss: long (nullable = true)
 |    |    |-- appris: string (nullable = true)
 |    |    |-- maneSelect: string (nullable = true)
 |    |    |-- targetId: string (nullable = true)
 |    |    |-- impact: string (nullable = true)
 |    |    |-- lofteePrediction: string (nullable = true)
 |    |    |-- siftPrediction: float (nullable = true)
 |    |    |-- polyphenPrediction: float (nullable = true)
 |    |    |-- consequenceScore: float (nullable = true)
 |    |    |-- transcriptIndex: integer (nullable = true)
 |    |    |-- transcriptId: string (nullable = true)
 |-- rsIds: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- hgvsId: string (nullable = true)
 |-- alleleFrequencies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- populationName: string (nullable = true)
 |    |    |-- alleleFrequency: double (nullable = true)
 |-- dbXrefs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- source: string (nullable = true)

The transcriptConsequences.consequenceScore and hgvsid fields are misaligned in both schemas. The VEP variant index was generated with the code that patched the insilicoPredictors.

Edit, by looking at the code and back at the schemas, there are even more mismatches - transcriptConsequences.transcriptIndex

@DSuveges
Copy link
Contributor Author

I most likely misunderstand something, but when you say

The above test does not fail, although the schema is almost the same, but the nested struct fields are not ordered correctly. This is the issue the variant index schema is facing now as well.

Of course it doesn't fail, because you are testing for finding difference and the test indeed finds the difference (because the schemas are different), so it passes.

# What does the comparison returns:
diff

contains:

defaultdict(list, {'unexpected_columns': ['arr']})

Which indicates there's a difference in the schema. So the assertion assert len(diff) != 0 falls to True.

@project-defiant
Copy link
Contributor

project-defiant commented Sep 27, 2024

schema = t.StructType(
        [
            t.StructField(
                "arr",
                t.ArrayType(
                    t.StructType(
                        [
                            t.StructField("a", t.StringType()),
                            t.StructField("b", t.IntegerType())
                        ]
                    )
                )
            ),
            t.StructField("id", t.IntegerType())
        ]
    )
    schema2 = t.StructType(
        [
            t.StructField(
                "arr",
                t.ArrayType(
                    t.StructType(
                        [
                            t.StructField("b", t.IntegerType()),
                            t.StructField("a", t.StringType())
                        ]
                    )
                )
            ),
            t.StructField("id", t.IntegerType())
        ]
    )
    df1 = spark.createDataFrame([([("a", 1,)], 1),], schema=schema)
    df2 = spark.createDataFrame([([(1,"a",)], 1),], schema=schema2)
    diff = compare_struct_schemas(
        observed_schema=df1.schema,
        expected_schema=df2.schema,

You are right. there should be arr columns in both, I have fixed the comment

@DSuveges
Copy link
Contributor Author

As discussed offline:

  • The schema validation doesn't take the order of fields of struct into account.
  • Most application this doesn't matter, however you can't merge two arrays of structs if the order of the fields are not matching.
  • Proposed solution is to extend the safe_array_sort function with a piece of extra logic that sorts fields alphabetically.

@d0choa
Copy link
Collaborator

d0choa commented Sep 27, 2024

Try to get out of this with the minimum number of scars 😅

@DSuveges
Copy link
Contributor Author

@d0choa we can just drop the entire branch bump to pyspark 3.5 and hope for the best.

@project-defiant
Copy link
Contributor

I am almost done with the fix in safe array union.

@project-defiant
Copy link
Contributor

@d0choa @DSuveges fix attempt in #793 resulted in no failiure in the VariantIndexStep see :)

Copy link
Contributor

@ireneisdoomed ireneisdoomed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! The schema validation has become much more accurate to the actual structure (no flattening). And at the same time the implementation is easier to understand. Great testing suite too!

"""This exception is raised when a schema validation fails."""

def __init__(
self: SchemaValidationError, message: str, errors: defaultdict[str, list[str]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. So the default dict acts as a dictionary that stores errors found in the schemas but whose keys are not predetermined.

self.message = message # Explicitly set the message attribute
self.errors = errors

def __str__(self: SchemaValidationError) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the method printed when you raise the exception, is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly.

)

# If element type is a struct, resolve nesting:
elif observed_type == "struct":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To enforce both types are the same

Suggested change
elif observed_type == "struct":
elif observed_type == "struct" and expected_type == "struct":

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, explicit is better than implicit. (the equality of the two schemas were tested already)

)

# If element type is an array, resolve nesting:
elif observed_type == "array":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To enforce both types are the same

Suggested change
elif observed_type == "array":
elif observed_type == "array" and expected_type == "array":

# If element type is a struct, resolve nesting:
elif observed_type == "struct":
schema_issues = compare_struct_schemas(
observed_schema.elementType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mypy is raising an issue here because both the observed_schema and expected_schema are technically of type DataType, whereas the parameters have to be StructType. Are you having issues as well? I think mypy would be able to resolve it if the conditional above was made using the elementType, not the name of the type, i.e. elif observed_schema.elementType == StructType(). Same applies below when you call compare_array_schemas

{
f"{parent_field_name}{field.name}"
for field in observed_schema
if list(observed_schema).count(field) > 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice



def flatten_schema(schema: t.StructType, prefix: str = "") -> list[Any]:
def flatten_schema(schema: StructType, prefix: str = "") -> list[Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are now parsing each schema without flattening, I'd suggest removing this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can drop it if you think this function would not be useful in other context.

@DSuveges DSuveges merged commit 88f62d4 into dev Sep 30, 2024
5 checks passed
@DSuveges DSuveges deleted the ds_3545-schema-validation-misses-nested-arrays branch September 30, 2024 11:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Dataset size-L
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Schema validation misses nested arrays
4 participants