Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49829][SS] Revise the optimization on adding input to state store in stream-stream join (correctness fix) #48297

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Sep 30, 2024

What changes were proposed in this pull request?

The PR proposes to revise the optimization on adding input to state store in stream-stream join.

Why are the changes needed?

Here is the logic of optimization before this PR:

val isLeftSemiWithMatch =
joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
// Add to state store only if both removal predicates do not match,
// and the row is not matched for left side of left semi join.
val shouldAddToState =
!stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) &&
!isLeftSemiWithMatch

        val isLeftSemiWithMatch =
          joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
        // Add to state store only if both removal predicates do not match,
        // and the row is not matched for left side of left semi join.
        val shouldAddToState =
          !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) &&
          !isLeftSemiWithMatch 

The optimization was added when multiple stateful operators wasn't supported. The criteria of both removal predicates do not match means the input is going to be evicted in this batch - before Spark introduced multiple stateful operators, watermark for late record and watermark for eviction were same, hence the input won't be matched with the condition after filtering out late records (Not sure about the edge case this condition was dealing with.)

After multiple stateful operators, watermark for late record and watermark for eviction are no longer the same. (watermark for late record is the watermark for eviction in prior batch - consider this as we advance the watermark "after processing all inputs", not before processing input) That said, input can be determined as not late, and can be evicted at the same batch. The above condition has to reflect this change but it was missed, hence having correctness issues on the report.

There are two major issues with the missing:

  1. missing to add the input to state store in left side prevents the input on the right side to match with "that" input. Even though the input is going to be evicted in this batch, there could be still inputs on the right side in this batch which can match with that input.

  2. missing to add the input to state store prevents that input to produce unmatched (null-outer) output, as we produce unmatched output during the eviction of state.

Does this PR introduce any user-facing change?

Yes, there are correctness issues among stream-stream join, especially when the output of the stateful operator is provided as input of stream-stream join. The correctness issue is fixed with the PR.

How was this patch tested?

New UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @HeartSaVioR .

Is this correctness issue introduced via SPARK-32862 at Apache Spark 3.1.0?

cc @xuanyuanking, @viirya too from #30076

Comment on lines -675 to -677
val shouldAddToState =
!stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) &&
!isLeftSemiWithMatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine that you considered constructing these predicates (e.g. stateKeyWatermarkPredicateFunc and stateValueWatermarkPredicateFunc) based off the watermark for late events, not the watermark for eviction. I spent some time working this out myself, and I know the subtle reason why this won't work, but I wanted to verify that you also considered this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's neither watermark for late event and watermark for eviction - stream-stream join has its own "state watermark", which is going to be an "output watermark" for stream-stream join. The predicate is relying on state watermark.

Copy link
Contributor

@neilramaswamy neilramaswamy Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but I had initially wondered why we can't construct the state watermark predicates using the watermark for late events.

For example, with the join predicate L > R + 10, we construct the state watermark to be L <= watermark_for_eviction(R) + 10, and I had initially thought that the fix for this correctness bug was L <= watermark_for_late_events(R) + 10.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's not. Here's my reasoning. Let the watermark for late events be WM_L, and let the watermark for eviction be WM_E. For a given side (WLOG, the left), the state watermark will be that L <= WM_E + k, for some positive or negative quantity k.

Assume k is non-negative. In that case, then WM_L <= WM_E <= WM_E + k. If the record is less than WM_L, it will be dropped. If it is greater than WM_L and less than WM_E + k, then it will be evicted and then have a null output. If it is greater than or equal to WM_E + k, then it might join in the future, so we need to keep it in state. The quantity WM_L + k, the state watermark using the late events watermark, doesn't actually have any real meaning, so we don't need to use it here.

Things work out similarly if k is negative, but I'll exclude that for brevity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just simply saying, the reason we break down watermark for late event and eviction applies to stream-stream join, except the fact we'll need to buffer event longer than watermark based on join condition, hence necessity of state watermark (and it's related to eviction).

// and the join type is left semi.
// For other cases, the input should be added, including the case it's going to be evicted
// in this batch. It hasn't yet evaluated with inputs from right side "for this batch".
// Learn about how the each side figures out the matches from other side.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this line means. Can you clarify?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not build a hashmap separately to match both sides and completely rely on state store. The way each side figures out the match is, looking into state store of other side. And there is a sequence of doing this, left side, and then right side.

That said, when the operator seeks for match from left side, right side is yet to be handled, hence left side can only see the right side for "prior batches". We take care of match in current batch during evaluating "right side", assuming that we put the input of left side into state store. If we skip adding the input into state store for left side, we are missing possible matches rows (correctness issue).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the classdoc covers this I should just say read through classdoc. Do you get how it works from reading through classdoc?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the semantics, but I just didn't understand what your comment meant by "Learn about how...". Your latest commit addresses my concern.

Comment on lines +695 to +698
// if the input is producing "unmatched row" in this batch
(
(joinType == RightOuter && !iteratorNotEmpty) ||
(joinType == FullOuter && !iteratorNotEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of a left anti join, isn't it also possible that we would evict this record (and !iteratorNotEmpty), so we need to add it to state to produce it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support left anti join in streaming. supported: inner, left/right outer, full outer, left semi. Please check StreamingSymmetricHashJoinExec.

@neilramaswamy
Copy link
Contributor

@dongjoon-hyun, this issue has been around since the time that we added multiple stateful operators to Structured Streaming. The Left-semi join logic that you linked previously is correct, and Jungtaek preserves that behavior in this PR.

@dongjoon-hyun
Copy link
Member

To @neilramaswamy , could you provide the affected version list? Actually, I'm tracking it to update SPARK-49829 Affected Version field correctly. Currently it claims 3.5.0+ as the affected version. To be clear, which Spark version (or JIRA ID) do you mean exactly?

@dongjoon-hyun, this issue has been around since the time that we added multiple stateful operators to Structured Streaming. The Left-semi join logic that you linked previously is correct, and Jungtaek preserves that behavior in this PR.

@HeartSaVioR
Copy link
Contributor Author

https://issues.apache.org/jira/browse/SPARK-40925
It was 3.4.0, sorry I forgot that there was a long time between we fixed a bug to support multiple stateful operators partially and we fixed the watermark mechanism to support stream-stream join followed by stateful operator. This is about former.

@dongjoon-hyun
Copy link
Member

Thank you, @HeartSaVioR .

@neilramaswamy
Copy link
Contributor

Fix makes sense, will thoroughly review tests shortly.

isNotEvictingInThisBatch ||
// if the input is producing "unmatched row" in this batch
(
(joinType == RightOuter && !iteratorNotEmpty) ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So compared to the original logic, this PR specifically adds the RightOuter and FullOuter join types for the empty iterator case in the shouldAddToState scenario. Is my understanding correct?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the change is following:

  1. For left side, we store the new input row into state store despite the fact the row will be evicted in this batch (regardless of join type). This is required because it is still yet to be checked for match with new input rows on the right side "in this batch".
  2. For right side, we don't strictly need to store all new input row into state store, but we still need to store them if they are producing "unmatched" output (right/full outer), because we are relying on state eviction to produce unmatched output.

@@ -878,6 +878,60 @@ class MultiStatefulOperatorsSuite
testOutputWatermarkInJoin(join3, input1, -40L * 1000 - 1)
}

// NOTE: This is the revise of the reproducer in SPARK-45637. CREDIT goes to @andrezjzera.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, if we want to add credit information, we can amend the PR commits to include a co-author. This way, we don’t need to add the credit information in the code comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though the approach of "co-authorship" is not fine-grained one e.g. the credit is only for test, I can do that instead if it's more comfortable to people. Also I don't see we give CREDIT like this way, so probably the preference is what you proposed.

// scalastyle:off line.size.limit
// DISCLAIM: This is a revision of below test, which was a part of report in the dev mailing
// list. CREDIT goes to @andrezjzera.
// https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86C3-L167C4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants