Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36788] Fix and cover global committer #25685

Merged
merged 3 commits into from
Nov 29, 2024

Conversation

AHeise
Copy link
Contributor

@AHeise AHeise commented Nov 25, 2024

What is the purpose of the change

With the removal of Sink V1, we missed crucial coverage of the global committer coming from the StandardSinkTopologies. That caused a previous PR #25456 to contain undetected bugs.

This PR readds the coverage and fixes the uncovered bugs.

Brief change log

  • Move GlobalCommitter to flink-runtime
  • Add the CommitterInitContext to the GlobalCommitter creation
  • Adds coverage and fixes the bug

Verifying this change

This change added tests and can be verified as follows:

SinkV2TransformationTranslatorITCase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

StandardSinkTopologies didn't expose yet the newly added ctx, such that the global committer couldn't access the metric groups
@flinkbot
Copy link
Collaborator

flinkbot commented Nov 25, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall left some minor questions and I didn't fully understand how the first two commits relate to the third fix commit.

@@ -90,7 +92,18 @@ private Collection<Integer> translateInternal(
transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME);
transformation.setParallelism(1);
transformation.setMaxParallelism(1);
return Collections.emptyList();
copySafely(transformation::setName, globalCommitterTransform::getName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which scenarios does the globalCommitterTransform have it's own attributes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set the attributes while expanding the post commit topology. We expand this transform almost at the end of the sink expansion. So it's fair to say that we have it in all cases where we set the properties during post commit expansion. It's not set if no uid or customUidHashes are user-supplied. I think name should always be set but I still use the copySafely method because you never know how things will evolve.

@AHeise
Copy link
Contributor Author

AHeise commented Nov 27, 2024

Looks good overall left some minor questions and I didn't fully understand how the first two commits relate to the third fix commit.

They are only semi-related. Basically while adding the global committer to TestSinkV2, I noticed that we lack the ctx to do so. I could pull them out into a separate PR but I also feel like these changes are minor enough to piggy-back (it's less than 50 LOC on the first two commits).

Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

So far, the global committer didn't properly get all properties set during expansion. In some cases, it didn't get expanded at all.

This commit also inline SinkTransformationTranslatorITCaseBase.java after the respective V1 implementation was removed. V2 now has the same coverage as V1 used to have.
@AHeise AHeise force-pushed the FLINK-36788-cover-global-committer branch from bbf92af to 960cda3 Compare November 28, 2024 10:51
@AHeise
Copy link
Contributor Author

AHeise commented Nov 29, 2024

@flinkbot run azure

@AHeise AHeise merged commit d7bfa77 into apache:master Nov 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants