Skip to content

Commit

Permalink
GH-43994: [C++][Parquet] Fix schema conversion from two-level encodin…
Browse files Browse the repository at this point in the history
…g nested list (#43995)

### Rationale for this change

The current C++ parquet implementation interprets following parquet schema as `array<struct<array:array<int>>>, which is wrong:
```
  optional group a (LIST) {
    repeated group array (LIST) {
      repeated int32 array;
    }
  }
```

### What changes are included in this PR?

According to the parquet spec, the above schema should be inferred as `array<array<int>>`.

### Are these changes tested?

Yes, a test case has been added to verify the fix.

### Are there any user-facing changes?

No.
* GitHub Issue: #43994

Authored-by: Gang Wu <[email protected]>
Signed-off-by: Gang Wu <[email protected]>
  • Loading branch information
wgtmac authored Nov 25, 2024
1 parent c21d85d commit a6fe595
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 38 deletions.
75 changes: 75 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4085,6 +4085,81 @@ TEST(TestArrowReaderAdHoc, OldDataPageV2) {
TryReadDataFile(path);
}

TEST(TestArrowReaderAdHoc, LegacyTwoLevelList) {
auto VerifyData = [](std::unique_ptr<ParquetFileReader> file_reader) {
// Expected Parquet schema of legacy two-level encoding
constexpr std::string_view kExpectedLegacyList =
"required group field_id=-1 a (List) {\n"
" repeated group field_id=-1 array (List) {\n"
" repeated int32 field_id=-1 array;\n"
" }\n"
"}\n";

// Expected Arrow schema and data
auto arrow_inner_list =
field("array", list(field("array", ::arrow::int32(), /*nullable=*/false)),
/*nullable=*/false);
auto arrow_outer_list = list(arrow_inner_list);
auto arrow_schema =
::arrow::schema({field("a", arrow_outer_list, /*nullable=*/false)});
auto expected_table = TableFromJSON(arrow_schema, {R"([[[[1,2],[3,4]]]])"});

// Verify Parquet schema
auto root_group = file_reader->metadata()->schema()->group_node();
ASSERT_EQ(1, root_group->field_count());
std::stringstream nodeStr;
PrintSchema(root_group->field(0).get(), nodeStr);
ASSERT_EQ(kExpectedLegacyList, nodeStr.str());

// Verify Arrow schema and data
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(
FileReader::Make(default_memory_pool(), std::move(file_reader), &reader));
std::shared_ptr<Table> table;
ASSERT_OK(reader->ReadTable(&table));
AssertTablesEqual(*expected_table, *table);
};

// Round-trip test for Parquet C++ reader and writer
{
// Create Parquet schema of legacy two-level encoding
auto inner_list = GroupNode::Make("array", Repetition::REPEATED,
{schema::Int32("array", Repetition::REPEATED)},
LogicalType::List());
auto outer_list =
GroupNode::Make("a", Repetition::REQUIRED, {inner_list}, LogicalType::List());
auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, {outer_list});

// Create a Parquet writer to write values of nested list
auto sink = CreateOutputStream();
auto file_writer =
ParquetFileWriter::Open(sink, std::dynamic_pointer_cast<GroupNode>(schema_node));
auto row_group_writer = file_writer->AppendRowGroup();
auto int_writer = dynamic_cast<Int32Writer*>(row_group_writer->NextColumn());
ASSERT_TRUE(int_writer != nullptr);

// Directly write a single row of nested list: [[1, 2],[3, 4]]
constexpr int64_t kNumValues = 4;
constexpr std::array<int16_t, kNumValues> kRepLevels = {0, 2, 1, 2};
constexpr std::array<int16_t, kNumValues> kDefLevels = {2, 2, 2, 2};
constexpr std::array<int32_t, kNumValues> kValues = {1, 2, 3, 4};
int_writer->WriteBatch(kNumValues, kDefLevels.data(), kRepLevels.data(),
kValues.data());
file_writer->Close();
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());

// Read schema and verify it applies two-level encoding of list type
ASSERT_NO_FATAL_FAILURE(
VerifyData(ParquetFileReader::Open(std::make_shared<BufferReader>(buffer))));
}

// Interoperability test for Parquet file generated by parquet-java
{
auto path = std::string(test::get_data_dir()) + "/old_list_structure.parquet";
ASSERT_NO_FATAL_FAILURE(VerifyData(ParquetFileReader::OpenFile(path)));
}
}

class TestArrowReaderAdHocSparkAndHvr
: public ::testing::TestWithParam<
std::tuple<std::string, std::shared_ptr<DataType>>> {};
Expand Down
111 changes: 110 additions & 1 deletion cpp/src/parquet/arrow/arrow_schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <vector>

#include "gmock/gmock-matchers.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"

Expand Down Expand Up @@ -601,6 +602,58 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
arrow_fields.push_back(::arrow::field("name", arrow_list, false));
}

// Two-level encoding List<List<Integer>>:
// optional group my_list (LIST) {
// repeated group array (LIST) {
// repeated int32 array;
// }
// }
{
auto inner_array =
PrimitiveNode::Make("array", Repetition::REPEATED, ParquetType::INT32);
auto outer_array = GroupNode::Make("array", Repetition::REPEATED, {inner_array},
ConvertedType::LIST);
parquet_fields.push_back(GroupNode::Make("my_list", Repetition::OPTIONAL,
{outer_array}, ConvertedType::LIST));
auto arrow_inner_array = ::arrow::field("array", INT32, /*nullable=*/false);
auto arrow_outer_array =
::arrow::field("array", ::arrow::list(arrow_inner_array), /*nullable=*/false);
auto arrow_list = ::arrow::list(arrow_outer_array);
arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
}

// List<Map<String, String>> in three-level list encoding:
// optional group my_list (LIST) {
// repeated group list {
// required group element (MAP) {
// repeated group key_value {
// required binary key (STRING);
// optional binary value (STRING);
// }
// }
// }
// }
{
auto key = PrimitiveNode::Make("key", Repetition::REQUIRED, ParquetType::BYTE_ARRAY,
ConvertedType::UTF8);
auto value = PrimitiveNode::Make("value", Repetition::OPTIONAL,
ParquetType::BYTE_ARRAY, ConvertedType::UTF8);
auto key_value = GroupNode::Make("key_value", Repetition::REPEATED, {key, value});
auto element =
GroupNode::Make("element", Repetition::REQUIRED, {key_value}, ConvertedType::MAP);
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, ConvertedType::LIST));

auto arrow_key = ::arrow::field("key", UTF8, /*nullable=*/false);
auto arrow_value = ::arrow::field("value", UTF8, /*nullable=*/true);
auto arrow_element = ::arrow::field(
"element", std::make_shared<::arrow::MapType>(arrow_key, arrow_value),
/*nullable=*/false);
auto arrow_list = ::arrow::list(arrow_element);
arrow_fields.push_back(::arrow::field("my_list", arrow_list, /*nullable=*/true));
}

auto arrow_schema = ::arrow::schema(arrow_fields);
ASSERT_OK(ConvertSchema(parquet_fields));

Expand Down Expand Up @@ -727,6 +780,60 @@ TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, IllegalParquetNestedSchema) {
// List<Map<String, String>> in two-level list encoding:
//
// optional group my_list (LIST) {
// repeated group array (MAP) {
// repeated group key_value {
// required binary key (STRING);
// optional binary value (STRING);
// }
// }
// }
{
auto key = PrimitiveNode::Make("key", Repetition::REQUIRED, ParquetType::BYTE_ARRAY,
ConvertedType::UTF8);
auto value = PrimitiveNode::Make("value", Repetition::OPTIONAL,
ParquetType::BYTE_ARRAY, ConvertedType::UTF8);
auto key_value = GroupNode::Make("key_value", Repetition::REPEATED, {key, value});
auto array =
GroupNode::Make("array", Repetition::REPEATED, {key_value}, ConvertedType::MAP);
std::vector<NodePtr> parquet_fields;
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {array}, ConvertedType::LIST));

EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid,
testing::HasSubstr("Group with one repeated child must be LIST-annotated."),
ConvertSchema(parquet_fields));
}

// List<List<String>>: outer list is two-level encoding, inner list is three-level
//
// optional group my_list (LIST) {
// repeated group array (LIST) {
// repeated group list {
// required binary element (STRING);
// }
// }
// }
{
auto element = PrimitiveNode::Make("element", Repetition::REQUIRED,
ParquetType::BYTE_ARRAY, ConvertedType::UTF8);
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
auto array =
GroupNode::Make("array", Repetition::REPEATED, {list}, ConvertedType::LIST);
std::vector<NodePtr> parquet_fields;
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {array}, ConvertedType::LIST));

EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, testing::HasSubstr("LIST-annotated groups must not be repeated."),
ConvertSchema(parquet_fields));
}
}

Status ArrowSchemaToParquetMetadata(std::shared_ptr<::arrow::Schema>& arrow_schema,
std::shared_ptr<KeyValueMetadata>& metadata) {
ARROW_ASSIGN_OR_RAISE(
Expand Down Expand Up @@ -1846,7 +1953,9 @@ TEST_F(TestLevels, ListErrors) {
{
::arrow::Status error = MaybeSetParquetSchema(GroupNode::Make(
"child_list", Repetition::REPEATED,
{PrimitiveNode::Make("bool", Repetition::REPEATED, ParquetType::BOOLEAN)},
{GroupNode::Make("list", Repetition::REPEATED,
{PrimitiveNode::Make("element", Repetition::REQUIRED,
ParquetType::BOOLEAN)})},
LogicalType::List()));
ASSERT_RAISES(Invalid, error);
std::string expected("LIST-annotated groups must not be repeated.");
Expand Down
103 changes: 67 additions & 36 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,11 +513,13 @@ Status PopulateLeaf(int column_index, const std::shared_ptr<Field>& field,
}

// Special case mentioned in the format spec:
// If the name is array or ends in _tuple, this should be a list of struct
// even for single child elements.
bool HasStructListName(const GroupNode& node) {
// If the name is array or uses the parent's name with `_tuple` appended,
// this should be:
// - a list of list or map type if the repeated group node is LIST- or MAP-annotated.
// - otherwise, a list of struct even for single child elements.
bool HasListElementName(const GroupNode& node, const GroupNode& parent) {
::std::string_view name{node.name()};
return name == "array" || EndsWith(name, "_tuple");
return name == "array" || name == (parent.name() + "_tuple");
}

Status GroupToStruct(const GroupNode& node, LevelInfo current_levels,
Expand Down Expand Up @@ -598,9 +600,9 @@ Status MapToSchemaField(const GroupNode& group, LevelInfo current_levels,
ctx->LinkParent(value_field, key_value_field);

// required/optional group name=whatever {
// repeated group name=key_values{
// repeated group name=key_values {
// required TYPE key;
// required/optional TYPE value;
// required/optional TYPE value;
// }
// }
//
Expand Down Expand Up @@ -634,6 +636,7 @@ Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels,
if (group.is_repeated()) {
return Status::Invalid("LIST-annotated groups must not be repeated.");
}

current_levels.Increment(group);

out->children.resize(group.field_count());
Expand All @@ -651,45 +654,73 @@ Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels,

int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated();
if (list_node.is_group()) {
// Resolve 3-level encoding
//
// required/optional group name=whatever {
// repeated group name=list {
// required/optional TYPE item;
// }
// }
//
// yields list<item: TYPE ?nullable> ?nullable
//
// We distinguish the special case that we have
//
// required/optional group name=whatever {
// repeated group name=array or $SOMETHING_tuple {
// required/optional TYPE item;
// }
// }
//
// In this latter case, the inner type of the list should be a struct
// rather than a primitive value
//
// yields list<item: struct<item: TYPE ?nullable> not null> ?nullable
const auto& list_group = static_cast<const GroupNode&>(list_node);
// Special case mentioned in the format spec:
// If the name is array or ends in _tuple, this should be a list of struct
// even for single child elements.
if (list_group.field_count() == 1 && !HasStructListName(list_group)) {
// List of primitive type
RETURN_NOT_OK(
NodeToSchemaField(*list_group.field(0), current_levels, ctx, out, child_field));
} else {
if (list_group.field_count() > 1) {
// The inner type of the list should be a struct when there are multiple fields
// in the repeated group
RETURN_NOT_OK(GroupToStruct(list_group, current_levels, ctx, out, child_field));
} else if (list_group.field_count() == 1) {
const auto& repeated_field = list_group.field(0);
if (repeated_field->is_repeated()) {
// Special case where the inner type might be a list with two-level encoding
// like below:
//
// required/optional group name=SOMETHING (LIST) {
// repeated group array (LIST) {
// repeated TYPE item;
// }
// }
//
// yields list<item: list<item: TYPE not null> not null> ?nullable
if (!list_group.logical_type()->is_list()) {
return Status::Invalid("Group with one repeated child must be LIST-annotated.");
}
// LIST-annotated group with three-level encoding cannot be repeated.
if (repeated_field->is_group() &&
!static_cast<const GroupNode&>(*repeated_field).field(0)->is_repeated()) {
return Status::Invalid("LIST-annotated groups must not be repeated.");
}
RETURN_NOT_OK(
NodeToSchemaField(*repeated_field, current_levels, ctx, out, child_field));
} else if (HasListElementName(list_group, group)) {
// We distinguish the special case that we have
//
// required/optional group name=SOMETHING {
// repeated group name=array or $SOMETHING_tuple {
// required/optional TYPE item;
// }
// }
//
// The inner type of the list should be a struct rather than a primitive value
//
// yields list<item: struct<item: TYPE ?nullable> not null> ?nullable
RETURN_NOT_OK(GroupToStruct(list_group, current_levels, ctx, out, child_field));
} else {
// Resolve 3-level encoding
//
// required/optional group name=whatever {
// repeated group name=list {
// required/optional TYPE item;
// }
// }
//
// yields list<item: TYPE ?nullable> ?nullable
RETURN_NOT_OK(
NodeToSchemaField(*repeated_field, current_levels, ctx, out, child_field));
}
} else {
return Status::Invalid("Group must have at least one child.");
}
} else {
// Two-level list encoding
//
// required/optional group LIST {
// repeated TYPE;
// }
//
// TYPE is a primitive type
//
// yields list<item: TYPE not null> ?nullable
const auto& primitive_node = static_cast<const PrimitiveNode&>(list_node);
int column_index = ctx->schema->GetColumnIndex(primitive_node);
ASSIGN_OR_RAISE(std::shared_ptr<ArrowType> type,
Expand Down

0 comments on commit a6fe595

Please sign in to comment.