Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DA] Fix evaluation history UI #22859

Merged
merged 2 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ def fetch_asset_condition_evaluation_record_for_partition(
else None
)
return GrapheneAssetConditionEvaluation(
record.get_evaluation_with_run_ids(partitions_def).evaluation,
partitions_def,
partition_key,
record.get_evaluation_with_run_ids(partitions_def).evaluation, partition_key
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from dagster._core.definitions.declarative_automation.serialized_objects import (
AssetConditionEvaluation,
)
from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset
from dagster._core.definitions.partition import (
DefaultPartitionsSubset,
PartitionsDefinition,
PartitionsSubset,
)
from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset
from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord

Expand Down Expand Up @@ -95,11 +99,10 @@ class Meta:

def __init__(self, evaluation: AssetConditionEvaluation):
self._evaluation = evaluation
if evaluation.true_subset.bool_value:
if evaluation.true_subset.size > 0:
status = AssetConditionEvaluationStatus.TRUE
elif (
isinstance(evaluation.candidate_subset, AssetSubset)
and evaluation.candidate_subset.bool_value
elif isinstance(evaluation.candidate_subset, AssetSubset) and (
evaluation.candidate_subset.size > 0
):
status = AssetConditionEvaluationStatus.FALSE
else:
Expand Down Expand Up @@ -145,31 +148,38 @@ class GraphenePartitionedAssetConditionEvaluationNode(graphene.ObjectType):
class Meta:
name = "PartitionedAssetConditionEvaluationNode"

def __init__(
self,
evaluation: AssetConditionEvaluation,
partitions_def: Optional[PartitionsDefinition],
):
self._partitions_def = partitions_def
self._true_subset = evaluation.true_subset
def __init__(self, evaluation: AssetConditionEvaluation):
def _coerce_subset(maybe_subset):
if not isinstance(maybe_subset, AssetSubset):
return None
elif maybe_subset.is_partitioned:
return GrapheneAssetSubset(maybe_subset)

# TODO: Remove on redesign (FOU-242)
# We create a fake partitioned asset subset out of an unpartitioned asset subset in
# order to allow unpartitioned rows to not error when used in the partition-focused UI.
if maybe_subset.size == 0:
value = set()
else:
value = {"None"}

return GrapheneAssetSubset(
AssetSubset(asset_key=maybe_subset.asset_key, value=DefaultPartitionsSubset(value))
)

super().__init__(
uniqueId=evaluation.condition_snapshot.unique_id,
description=evaluation.condition_snapshot.description,
startTimestamp=evaluation.start_timestamp,
endTimestamp=evaluation.end_timestamp,
trueSubset=GrapheneAssetSubset(evaluation.true_subset),
candidateSubset=GrapheneAssetSubset(evaluation.candidate_subset)
if isinstance(evaluation.candidate_subset, AssetSubset)
else None,
trueSubset=_coerce_subset(evaluation.true_subset),
candidateSubset=_coerce_subset(evaluation.candidate_subset),
numTrue=evaluation.true_subset.size,
childUniqueIds=[
child.condition_snapshot.unique_id for child in evaluation.child_evaluations
],
)

def resolve_numTrue(self, graphene_info: ResolveInfo) -> int:
return self._true_subset.size


class GrapheneSpecificPartitionAssetConditionEvaluationNode(graphene.ObjectType):
uniqueId = graphene.NonNull(graphene.String)
Expand All @@ -187,7 +197,13 @@ def __init__(self, evaluation: AssetConditionEvaluation, partition_key: str):
self._evaluation = evaluation
self._partition_key = partition_key

if partition_key in evaluation.true_subset.subset_value:
if not evaluation.true_subset.is_partitioned:
# TODO: Remove on redesign (FOU-242)
# This code allows the page to not error when displaying a specific partition's results
# where a sub-condition is not partitioned. In these cases, we can treat the expression
# as SKIPPED
status = AssetConditionEvaluationStatus.SKIPPED
elif partition_key in evaluation.true_subset.subset_value:
status = AssetConditionEvaluationStatus.TRUE
elif (
not isinstance(evaluation.candidate_subset, AssetSubset)
Expand Down Expand Up @@ -239,36 +255,33 @@ class Meta:
name = "AssetConditionEvaluation"

def __init__(
self,
evaluation: AssetConditionEvaluation,
partitions_def: Optional[PartitionsDefinition],
partition_key: Optional[str] = None,
self, root_evaluation: AssetConditionEvaluation, partition_key: Optional[str] = None
):
# flatten the evaluation tree into a list of nodes
def _flatten(e: AssetConditionEvaluation) -> Sequence[AssetConditionEvaluation]:
return list(itertools.chain([e], *(_flatten(ce) for ce in e.child_evaluations)))

all_nodes = _flatten(evaluation)
all_evaluations = _flatten(root_evaluation)

if evaluation.true_subset.is_partitioned:
if root_evaluation.true_subset.is_partitioned:
if partition_key is None:
evaluationNodes = [
GraphenePartitionedAssetConditionEvaluationNode(evaluation, partitions_def)
for evaluation in all_nodes
GraphenePartitionedAssetConditionEvaluationNode(evaluation)
for evaluation in all_evaluations
]
else:
evaluationNodes = [
GrapheneSpecificPartitionAssetConditionEvaluationNode(evaluation, partition_key)
for evaluation in all_nodes
for evaluation in all_evaluations
]
else:
evaluationNodes = [
GrapheneUnpartitionedAssetConditionEvaluationNode(evaluation)
for evaluation in all_nodes
for evaluation in all_evaluations
]

super().__init__(
rootUniqueId=evaluation.condition_snapshot.unique_id,
rootUniqueId=root_evaluation.condition_snapshot.unique_id,
evaluationNodes=evaluationNodes,
)

Expand Down Expand Up @@ -306,9 +319,7 @@ def __init__(
numRequested=evaluation_with_run_ids.evaluation.true_subset.size,
startTimestamp=evaluation_with_run_ids.evaluation.start_timestamp,
endTimestamp=evaluation_with_run_ids.evaluation.end_timestamp,
evaluation=GrapheneAssetConditionEvaluation(
evaluation_with_run_ids.evaluation, partitions_def
),
evaluation=GrapheneAssetConditionEvaluation(evaluation_with_run_ids.evaluation),
)


Expand Down