Skip to content

Commit

Permalink
[FLINK-33191][Connector/Kafka] Make the `testKafkaValueDeserializatio…
Browse files Browse the repository at this point in the history
…nSchemaWrapper` test use `flink-shaded-jackson` since it tests `flink-shaded-jackson` ObjectNodes.

Co-authored-by: zentol <[email protected]>

This closes #57.
  • Loading branch information
MartijnVisser authored and tzulitai committed Oct 11, 2023
1 parent 3be64d7 commit 58e5003
Showing 1 changed file with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,24 @@ public void testKafkaDeserializationSchemaWrapper() throws Exception {
@Test
public void testKafkaValueDeserializationSchemaWrapper() throws Exception {
final ConsumerRecord<byte[], byte[]> consumerRecord = getConsumerRecord();
KafkaRecordDeserializationSchema<ObjectNode> schema =
KafkaRecordDeserializationSchema.valueOnly(
new JsonDeserializationSchema<>(ObjectNode.class));
KafkaRecordDeserializationSchema<
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node
.ObjectNode>
schema =
KafkaRecordDeserializationSchema.valueOnly(
new JsonDeserializationSchema<>(
org.apache.flink.shaded.jackson2.com.fasterxml.jackson
.databind.node.ObjectNode.class));
schema.open(new DummyInitializationContext());
SimpleCollector<ObjectNode> collector = new SimpleCollector<>();
SimpleCollector<
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node
.ObjectNode>
collector = new SimpleCollector<>();
schema.deserialize(consumerRecord, collector);

assertThat(collector.list).hasSize(1);
ObjectNode deserializedValue = collector.list.get(0);

org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
deserializedValue = collector.list.get(0);
assertThat(deserializedValue.get("word").asText()).isEqualTo("world");
assertThat(deserializedValue.get("key")).isNull();
assertThat(deserializedValue.get("metadata")).isNull();
Expand Down

0 comments on commit 58e5003

Please sign in to comment.