-
Notifications
You must be signed in to change notification settings - Fork 13.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36379] Improve (Global)Committer with UC disabled [1.20] #25660
base: release-1.20
Are you sure you want to change the base?
Conversation
Use more of the assertj native patterns to compare results.
cfcf7a9
to
0b326b7
Compare
Global committers used to trail a full checkpoint behind committer. That means that data appeared only after >2*checkpoint interval in the sinks that use it (e.g. delta). However, committables in the global committers are already part of the first checkpoint and are idempotent: On recovery, they are resend from the committer to the global committer. Thus, the global committer can actually be seen as stateless and doesn't need to conduct its own 2PC protocol. This commit lets the global committer collect all committables on input (as before) but immediately tries to commit when it has received all (deducible from CommitterSummary - which was always the original intent of that message). Thus, in most cases, GlobalCommitter ignores notifyCheckpointCompleted now as the state of the checkpoint can be inferred by received committables from upstream. There are special cases where a global committer is directly chained to a writer. In this case, the global committer does need to conduct a 2PC protocol in place of the committer. To differentiate these cases, the global committer now has its own transformation. (cherry picked from commit 67be29a)
Without UCs, a committer doesn't need to do anything on #processInput except collecting. It emits only on notifyCheckpointCompleted (or endInput for batch). We can also harden some contracts: * NotifyCheckpointCompleted can assert that all committables are received. * Emit committables downstream only if all committables are finished. (cherry picked from commit 7f40ab9)
0b326b7
to
49d6a5a
Compare
Reviewed by Chi on 21/11/24. Looks in hand, test failures currently |
9652997
to
5b2ce39
Compare
} | ||
|
||
private static <T> void copySafely(Consumer<T> consumer, Supplier<T> provider) { | ||
T value = provider.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering
- are name, uid or uidHash nullable? The uid seems like a mandatory non nullable identifier.
- should we error if any of these values are null?
- or is there a valid case for having any of these values as null, if so should we not copy over the null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes all of the three are nullable and that's the default value. Some of the setters throw error on null, hence we can't copy the null. So this is the safest way to "copy" the value.
Any assertions on the presence of the values is also a bit more difficult as they are set at different point during the transformation and the GlobalCommitterTransformation is added usually as part of the sink transformation, which is then recursively executed. It's a bit convoluted and only necessary because the transformation is the only place for us to figure out if we are running in streaming or batch mode...
return translateInternal(transformation, false); | ||
} | ||
|
||
private Collection<Integer> translateInternal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add junits for this method - or do you think the higher level tests already cover us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple of low level ITs (Sink(V1/V)TransformationTranslatorITCase) tests that assert like unit tests (is name/uid/uidHash correctly copied) and plenty of high-level ITs (testing batch/checkpointing settings). So I don't see the immediate use to augment it with junit. Coverage is very high.
@@ -181,7 +181,7 @@ private void commitAndEmitCheckpoints() throws IOException, InterruptedException | |||
private void commitAndEmit(CheckpointCommittableManager<CommT> committableManager) | |||
throws IOException, InterruptedException { | |||
Collection<CommittableWithLineage<CommT>> committed = committableManager.commit(committer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: as we are no longer using committed in the if, we could move committableManager.commit(committer)
inlined to line 189 and remove the committed
variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commit
is unfortunately not side-effect free; we would never proceed with your proposal. But maybe it should be side-effect free.
However, I'd like to pick that idea up on master and not the backport (this PR).
/** | ||
* Commits all due committables if all respective committables of the specific subtask and | ||
* checkpoint have been received. | ||
* | ||
* @param committer used to commit to the external system | ||
* @return successfully committed committables with meta information | ||
* @throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are these removed, we still throw these exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gives us Spotless warnings. The general consent is to either document when exceptions are thrown or not document it. Since checked exceptions are part of the signature, the javadoc still shows them as being thrown. So leaving the tags incomplete is just redundant.
Note for public API we aim for documenting all checked exceptions.
@@ -160,13 +169,37 @@ public Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> committ | |||
return committed; | |||
} | |||
|
|||
Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean onlyIfFullyReceived) { | |||
Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean assertFull) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could the junits cover all the permutations of this stream - so we know there are no side effects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. However, the next (and last) PR is touching the code again and removes the flag. It's only here temporarily to cut the PRs smaller. The next PR removes async retries (which should have never be done) and that simplifies the invariants significantly.
if (transformation instanceof OneInputTransformation) { | ||
StreamOperatorFactory<?> operatorFactory = | ||
((OneInputTransformation<?, ?>) transformation).getOperatorFactory(); | ||
if (operatorFactory instanceof CommitterOperatorFactory) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a comment to describe why we return true here would be useful.
private <R> void repeatUntilConverged(Supplier<R> producer) { | ||
R lastResult = producer.get(); | ||
R nextResult; | ||
while (!lastResult.equals(nextResult = producer.get())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get nulls here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the callsite always supplies a List. In Flink, by convention, everything is NotNull unless annotated differently.
Backport of #25456 with some adjustments in
GlobalCommitterOperater
(main doesn't haveGlobalCommitter
code anymore).