Skip to content

Commit

Permalink
[FLINK-33817][protobuf] Allow ReadDefaultValues = False for non primi…
Browse files Browse the repository at this point in the history
…tive types on Proto3

Close #24035
  • Loading branch information
sharath1709 authored and libenchao committed Feb 26, 2024
1 parent faacf7e commit bf75870
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 33 deletions.
7 changes: 5 additions & 2 deletions docs/content/docs/connectors/table/formats/protobuf.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,12 @@ Format Options
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
This option only works if the generated class's version is proto2. If this value is set to true, the format will read empty values as the default values defined in the proto file.
If this value is set to true, the format will read empty values as the default values defined in the proto file.
If the value is set to false, the format will generate null values if the data element does not exist in the binary protobuf message.
If the proto syntax is proto3, this value will forcibly be set to true, because proto3's standard is to use default values.
If proto syntax is proto3, users need to set this to true when using protobuf versions lower than 3.15 as older versions do not support
checking for field presence which can cause runtime compilation issues. Additionally, primtive types will be set to default values
instead of null as field presence cannot be checked for them. Please be aware that setting this to true will cause the deserialization
performance to be much slower depending on schema complexity and message size.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
public class PbFormatContext {
private final PbFormatConfig pbFormatConfig;
private final List<String> splitMethodStack = new ArrayList<>();
private final boolean readDefaultValuesForPrimitiveTypes;

public PbFormatContext(PbFormatConfig pbFormatConfig) {
public PbFormatContext(
PbFormatConfig pbFormatConfig, boolean readDefaultValuesForPrimitiveTypes) {
this.pbFormatConfig = pbFormatConfig;
this.readDefaultValuesForPrimitiveTypes = readDefaultValuesForPrimitiveTypes;
}

private String createSplitMethod(
Expand Down Expand Up @@ -73,4 +76,8 @@ public List<String> getSplitMethodStack() {
public PbFormatConfig getPbFormatConfig() {
return pbFormatConfig;
}

public boolean getReadDefaultValuesForPrimitiveTypes() {
return readDefaultValuesForPrimitiveTypes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public class PbFormatOptions {
.defaultValue(false)
.withDescription(
"Optional flag to read as default values instead of null when some field does not exist in deserialization; default to false."
+ "If proto syntax is proto3, this value will be set true forcibly because proto3's standard is to use default values.");
+ "If proto syntax is proto3, users need to set this to true when using protobuf versions lower than 3.15 as older versions "
+ "do not support checking for field presence which can cause runtime compilation issues. Additionally, primtive types "
+ "will be set to default values instead of null as field presence cannot be checked for them. Please be aware that setting this"
+ " to true will cause the deserialization performance to be much slower depending on schema complexity and message size");
public static final ConfigOption<String> WRITE_NULL_STRING_LITERAL =
ConfigOptions.key("write-null-string-literal")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,15 @@ public String codegen(String resultVar, String pbObjectCode, int indent)
PbCodegenDeserializer codegen =
PbCodegenDeserializeFactory.getPbCodegenDes(elementFd, subType, formatContext);
splitAppender.appendLine("Object " + flinkRowEleVar + " = null");
if (!formatContext.getPbFormatConfig().isReadDefaultValues()) {
// only works in syntax=proto2 and readDefaultValues=false
// readDefaultValues must be true in pb3 mode
boolean readDefaultValues = formatContext.getPbFormatConfig().isReadDefaultValues();
if (PbFormatUtils.isSimpleType(subType)) {
readDefaultValues = formatContext.getReadDefaultValuesForPrimitiveTypes();
}

if (!readDefaultValues) {
// works for both syntax=proto2/proto3 and readDefaultValues=false for non-primitive
// types
// readDefaultValues must be true in pb3 mode for primitive types
String isMessageElementNonEmptyCode =
isMessageElementNonEmptyCode(
pbMessageVar,
Expand All @@ -91,7 +97,7 @@ public String codegen(String resultVar, String pbObjectCode, int indent)
codegen.codegen(
flinkRowEleVar, pbGetMessageElementCode, splitAppender.currentIndent());
splitAppender.appendSegment(code);
if (!formatContext.getPbFormatConfig().isReadDefaultValues()) {
if (!readDefaultValues) {
splitAppender.end("}");
}
splitAppender.appendLine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,15 @@ public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
true,
Thread.currentThread().getContextClassLoader());
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
boolean readDefaultValuesForPrimitiveTypes = formatConfig.isReadDefaultValues();
if (descriptor.getFile().getSyntax() == Syntax.PROTO3) {
// pb3 always read default values
formatConfig =
new PbFormatConfig(
formatConfig.getMessageClassName(),
formatConfig.isIgnoreParseErrors(),
true,
formatConfig.getWriteNullStringLiterals());
// pb3 always read default values for primitive types
readDefaultValuesForPrimitiveTypes = true;
}
PbCodegenAppender codegenAppender = new PbCodegenAppender();
PbFormatContext pbFormatContext = new PbFormatContext(formatConfig);
String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
PbFormatContext pbFormatContext =
new PbFormatContext(formatConfig, readDefaultValuesForPrimitiveTypes);
String uuid = UUID.randomUUID().toString().replace("-", "");
String generatedClassName = "GeneratedProtoToRow_" + uuid;
String generatedPackageName = ProtoToRowConverter.class.getPackage().getName();
codegenAppender.appendLine("package " + generatedPackageName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public RowToProtoConverter(RowType rowType, PbFormatConfig formatConfig)
try {
Descriptors.Descriptor descriptor =
PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
PbFormatContext formatContext = new PbFormatContext(formatConfig);
PbFormatContext formatContext = new PbFormatContext(formatConfig, false);

PbCodegenAppender codegenAppender = new PbCodegenAppender(0);
String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
String uuid = UUID.randomUUID().toString().replace("-", "");
String generatedClassName = "GeneratedRowToProto_" + uuid;
String generatedPackageName = RowToProtoConverter.class.getPackage().getName();
codegenAppender.appendLine("package " + generatedPackageName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Test conversion of proto3 data to flink internal data. Default values after conversion is tested
Expand Down Expand Up @@ -92,34 +93,26 @@ public void testReadDefaultValues() throws Exception {
Pb3Test pb3Test = Pb3Test.newBuilder().build();
RowData row = ProtobufTestHelper.pbBytesToRow(Pb3Test.class, pb3Test.toByteArray());

// primitive types should have default values
assertFalse(row.isNullAt(0));
assertFalse(row.isNullAt(1));
assertFalse(row.isNullAt(2));
assertFalse(row.isNullAt(3));
assertFalse(row.isNullAt(4));
assertFalse(row.isNullAt(5));
assertFalse(row.isNullAt(6));
assertFalse(row.isNullAt(7));
assertFalse(row.isNullAt(8));
assertFalse(row.isNullAt(9));
assertFalse(row.isNullAt(10));

assertEquals(0, row.getInt(0));
assertEquals(0L, row.getLong(1));
assertEquals("", row.getString(2).toString());
assertEquals(Float.valueOf(0.0f), Float.valueOf(row.getFloat(3)));
assertEquals(Double.valueOf(0.0d), Double.valueOf(row.getDouble(4)));
assertEquals("UNIVERSAL", row.getString(5).toString());

RowData rowData = row.getRow(6, 2);
assertEquals(0, rowData.getInt(0));
assertEquals(0L, rowData.getLong(1));

assertEquals(0, row.getArray(7).size());

assertEquals(0, row.getBinary(8).length);

assertEquals(0, row.getMap(9).size());
assertEquals(0, row.getMap(10).size());
// non-primitive types should be null
assertTrue(row.isNullAt(6));
assertTrue(row.isNullAt(7));
assertTrue(row.isNullAt(9));
assertTrue(row.isNullAt(10));
}
}

0 comments on commit bf75870

Please sign in to comment.