-
Notifications
You must be signed in to change notification settings - Fork 972
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kafka sink DLQ for kafka to BQ template, kafka to gcs template #1570
base: main
Are you sure you want to change the base?
Conversation
v2/common/pom.xml
Outdated
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>kafka-clients</artifactId> | ||
<version>3.7.0</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: Check the latest version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove these file changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done now.
a5eff0c
to
035e6ef
Compare
cc: @johnjcasey |
...ka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java
Show resolved
Hide resolved
...-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/AvroDynamicTransform.java
Outdated
Show resolved
Hide resolved
...ka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java
Outdated
Show resolved
Hide resolved
v2/common/pom.xml
Outdated
</dependency> | ||
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>kafka-clients</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
v2/common module shouldn't have a dependency on Kafka, it's supposed to provide only generic classes/utils. Kafka-related code should be placed under v2/kafka-common, which already has Kafka client deps.
v2/common/src/main/java/com/google/cloud/teleport/v2/dlq/DlqUtils.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/dlq/DeadLetterQueueOptions.java
Outdated
Show resolved
Hide resolved
/** | ||
* {@link DeadLetterQueueOptions} is used for any Dead letter queue sinks for the failed records. | ||
*/ | ||
public interface DeadLetterQueueOptions extends PipelineOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this Options class is expected to have only Kafka DLQ options, I'd call it KafkaDeadLetterQueueOptions
.
v2/common/src/main/java/com/google/cloud/teleport/v2/dlq/KafkaDeadLetterQueue.java
Outdated
Show resolved
Hide resolved
@@ -58,6 +70,9 @@ public abstract class AvroWriteTransform | |||
PCollection<KafkaRecord<byte[], byte[]>>, WriteFilesResult<AvroDestination>> { | |||
private static final String subject = "UNUSED"; | |||
static final int DEFAULT_CACHE_CAPACITY = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be made private
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private String schemaRegistryURL = null; | ||
private String schemaPath = null; | ||
private boolean useMock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like all 3 can be made final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting error when changing this to final
v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/transforms/AvroWriteTransform.java
Show resolved
Hide resolved
v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/transforms/AvroWriteTransform.java
Show resolved
Hide resolved
@@ -62,7 +68,8 @@ public POutput expand(PCollection<KafkaRecord<byte[], byte[]>> kafkaRecord) { | |||
.setSchemaRegistryURL(options().getSchemaRegistryURL()) | |||
.setSchemaPath(options().getSchemaPath()) | |||
.setWindowDuration(options().getWindowDuration()) | |||
.build()); | |||
.build() | |||
.withBadRecordErrorHandlers(errorHandlers())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling withX()
after build()
seems like a strange pattern to me. Should setErrorHandlers()
be part of the builder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have only changed this(having setErrorHandlers) for the classes that have AutoValue annotations.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1570 +/- ##
============================================
- Coverage 43.54% 40.86% -2.68%
- Complexity 654 2829 +2175
============================================
Files 294 745 +451
Lines 15542 43127 +27585
Branches 1548 4595 +3047
============================================
+ Hits 6768 17626 +10858
- Misses 8255 24002 +15747
- Partials 519 1499 +980
|
...common/src/main/java/com/google/cloud/teleport/v2/kafka/dlq/KafkaDeadLetterQueueOptions.java
Outdated
Show resolved
Hide resolved
This reverts commit 3d82b67.
No description provided.