From a69e58048f87ef5c8083a9cf4803687243ac2252 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Tue, 31 Mar 2026 20:07:26 +0800 Subject: [PATCH 1/3] [fix][cp] Read unannotated array Summary: When the element type is scalar type, a check in `convertType` requires the requested type must not be array. However, an unannotated array in Parquet is a repeated field that is not explicitly marked as a LIST logical type. To enable reading of unannotated arrays, this PR verifies the compatibility of their element types. Follow-up for https://github.com/facebookincubator/velox/pull/13620. Corresponding PR: https://github.com/facebookincubator/velox/pull/13864 --- .../parquet/reader/ParquetColumnReader.cpp | 5 +- bolt/dwio/parquet/reader/ParquetReader.cpp | 39 +++++++-- .../examples/nested_array_struct.parquet | Bin 0 -> 775 bytes .../examples/proto_repeated_string.parquet | Bin 0 -> 411 bytes .../tests/reader/ParquetTableScanTest.cpp | 82 ++++++++++++++++-- 5 files changed, 114 insertions(+), 12 deletions(-) create mode 100644 bolt/dwio/parquet/tests/examples/nested_array_struct.parquet create mode 100644 bolt/dwio/parquet/tests/examples/proto_repeated_string.parquet diff --git a/bolt/dwio/parquet/reader/ParquetColumnReader.cpp b/bolt/dwio/parquet/reader/ParquetColumnReader.cpp index af1d57809..07df86616 100644 --- a/bolt/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/bolt/dwio/parquet/reader/ParquetColumnReader.cpp @@ -152,9 +152,12 @@ std::unique_ptr ParquetColumnReader::build( return std::make_unique( requestedType, fileType, params, scanSpec); - case TypeKind::ARRAY: + case TypeKind::ARRAY: { + BOLT_CHECK( + requestedType->type()->isArray(), "Requested type must be array"); return std::make_unique( columnReaderOptions, requestedType, fileType, params, scanSpec, pool); + } case TypeKind::MAP: return std::make_unique( diff --git a/bolt/dwio/parquet/reader/ParquetReader.cpp b/bolt/dwio/parquet/reader/ParquetReader.cpp index 369b9b250..f047eafc2 100644 --- a/bolt/dwio/parquet/reader/ParquetReader.cpp +++ b/bolt/dwio/parquet/reader/ParquetReader.cpp @@ -452,11 +452,38 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( } TypePtr childRequestedType = nullptr; - if (requestedType && requestedType->isRow()) { - auto fileTypeIdx = - requestedType->asRow().getChildIdxIfExists(childName); - if (fileTypeIdx.has_value()) { - childRequestedType = requestedType->asRow().childAt(*fileTypeIdx); + bool followChild = true; + + { + RowTypePtr requestedRowType = nullptr; + if (requestedType) { + if (requestedType->isRow()) { + requestedRowType = + std::dynamic_pointer_cast(requestedType); + } else if ( + requestedType->isArray() && isRepeated && + requestedType->asArray().elementType()->isRow()) { + // Handle the case of unannotated array of structs (repeated group + // without LIST annotation). + requestedRowType = std::dynamic_pointer_cast( + requestedType->asArray().elementType()); + } + } + + if (requestedRowType) { + if (options_.isUseColumnNamesForColumnMapping()) { + auto fileTypeIdx = requestedRowType->getChildIdxIfExists(childName); + if (fileTypeIdx.has_value()) { + childRequestedType = requestedRowType->childAt(*fileTypeIdx); + } + } else { + // Handle schema evolution. + if (i < requestedRowType->size()) { + childRequestedType = requestedRowType->childAt(i); + } else { + followChild = false; + } + } } } @@ -747,7 +774,7 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( isOptional, isRepeated); } else { - // Row type + // Row type in a repeated field without LIST/MAP annotation. // To support list backward compatibility, need create a new row type // instance and set all the fields as its children. auto childrenRowType = diff --git a/bolt/dwio/parquet/tests/examples/nested_array_struct.parquet b/bolt/dwio/parquet/tests/examples/nested_array_struct.parquet new file mode 100644 index 0000000000000000000000000000000000000000..41a43fa35d39685e56ba4849a16cba4bb1aa86ae GIT binary patch literal 775 zcmaKr-%G+!6vvO#=8u;i;*JSE3{j~lAyWtuV%0Q3*U(Y)B(q&>u(@?NBZ;2+Px=dc z?6I>s%N6u+cQ4=bIp1@3cBjdsBLbvCDhGte15a`Q8~~)V;d2WY3V@LYX{-@GMj#!6 z`;fvdgDblto20oxMhs#hdKzt*4*3w}iuUE6PW?b*Zs1NAv%1WfvAnT@2NhLn_L#fy zHFWX={q7*H z93@^0*O&w#e53@lJ`hFEV2=wL)V**Pb(8vc%<=-4iJz&t;n22J{%<(t!px$!DZLaV zDaOCYR1UR;Go`F89pTwFrqpgr1NlrDOs+J&f2GO;)PtpmW%OH3neOEccXHoWyO`6Bl5({+ea14&qL7DtETw`(h_426$BxCYApN S1!5siKXe$p;U(Ab7x)6M)yB5~ literal 0 HcmV?d00001 diff --git a/bolt/dwio/parquet/tests/examples/proto_repeated_string.parquet b/bolt/dwio/parquet/tests/examples/proto_repeated_string.parquet new file mode 100644 index 0000000000000000000000000000000000000000..8a7eea601d0164b1177ab90efb7116c6e4c900da GIT binary patch literal 411 zcmY*W!D_-l5S^$E62wc{umKMtR4+{f-b!vM4Q)Y6&|G?w#H^aKansF;gi?C#*Yq1Z zv6e;{_C4Mk<_)t^FrN}2UmBK6hDddy19SkO`+9soFOY8;=b|A8A$itAvJoQdBBnKK zKy>)#|`4$W^3YtqL)WN5pTmWh1ZGv$>{f|s#sCG%1VN%LJ&FyD4siH@<(8PDu@ z!?sWEU$)ao`yyr1x2MQ?k}~ewv*0eAE$3kr261?gx~fYY8oxy0auLs;o*#@41L)=X h7Au}q6}>(e6`sLs-{PvZ8BpWYeN#xd)c_*=mLHLcZu|fM literal 0 HcmV?d00001 diff --git a/bolt/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/bolt/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 1b1592b69..3a55e6853 100644 --- a/bolt/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/bolt/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -637,20 +637,92 @@ TEST_F(ParquetTableScanTest, singleRowStruct) { assertSelectWithFilter({"s"}, {}, "", "SELECT (0, 1)"); } -// Core dump and incorrect result are fixed. -TEST_F(ParquetTableScanTest, DISABLED_array) { +TEST_F(ParquetTableScanTest, array) { + auto filePath = getExampleFilePath("old_repeated_int.parquet"); auto vector = makeArrayVector({}); loadData( - getExampleFilePath("old_repeated_int.parquet"), + filePath, ROW({"repeatedInt"}, {ARRAY(INTEGER())}), makeRowVector( {"repeatedInt"}, { vector, })); + assertSelectWithFilter({"repeatedInt"}, {}, "", "SELECT [1,2,3]"); - assertSelectWithFilter( - {"repeatedInt"}, {}, "", "SELECT UNNEST(array[array[1,2,3]])"); + // Set the requested type for unannotated array. + auto rowType = ROW({"repeatedInt"}, {ARRAY(INTEGER())}); + auto plan = + PlanBuilder(pool_.get()).tableScan(rowType, {}, "", rowType).planNode(); + + assertQuery(plan, splits(), "SELECT [1,2,3]"); + + // Throws when reading repeated values as scalar type. + rowType = ROW({"repeatedInt"}, {INTEGER()}); + plan = + PlanBuilder(pool_.get()).tableScan(rowType, {}, "", rowType).planNode(); + BOLT_ASSERT_THROW( + assertQuery(plan, splits(), "SELECT [1,2,3]"), + "Requested type must be array"); + + rowType = ROW({"mystring"}, {ARRAY(VARCHAR())}); + plan = + PlanBuilder(pool_.get()).tableScan(rowType, {}, "", rowType).planNode(); + + { + const auto protoFilePath = + getExampleFilePath("proto_repeated_string.parquet"); + AssertQueryBuilder(plan, duckDbQueryRunner_) + .connectorSessionProperty( + kHiveConnectorId, + connector::hive::HiveConfig::kParquetUseColumnNamesSession, + "true") + .connectorSessionProperty( + kHiveConnectorId, + connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCaseSession, + "true") + .split(makeSplit(protoFilePath)) + .assertResults( + "SELECT UNNEST(array[array['hello', 'world'], array['good','bye'], array['one', 'two', 'three']])"); + } + + rowType = + ROW({"primitive", "myComplex"}, + {INTEGER(), + ARRAY( + ROW({"id", "repeatedMessage"}, + {INTEGER(), ARRAY(ROW({"someId"}, {INTEGER()}))}))}); + plan = + PlanBuilder(pool_.get()).tableScan(rowType, {}, "", rowType).planNode(); + + // Construct the expected vector. + auto someIdVector = makeArrayOfRowVector( + ROW({"someId"}, {INTEGER()}), + { + {variant::row({3})}, + {variant::row({6})}, + {variant::row({9})}, + }); + auto rowVector = makeRowVector( + {"id", "repeatedMessage"}, + { + makeFlatVector({1, 4, 7}), + someIdVector, + }); + auto expected = makeRowVector( + {"primitive", "myComplex"}, + { + makeFlatVector({2, 5, 8}), + makeArrayVector({0, 1, 2}, rowVector), + }); + + AssertQueryBuilder(plan, duckDbQueryRunner_) + .connectorSessionProperty( + kHiveConnectorId, + connector::hive::HiveConfig::kParquetUseColumnNamesSession, + "true") + .split(makeSplit(getExampleFilePath("nested_array_struct.parquet"))) + .assertResults(expected); } // Optional array with required elements. From dc8a22dca3fb06c60b92d836cf7f47645a5019fd Mon Sep 17 00:00:00 2001 From: Gu Haiyan Date: Wed, 15 Apr 2026 19:20:37 +0800 Subject: [PATCH 2/3] fix code review comments --- bolt/dwio/parquet/reader/ParquetReader.cpp | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/bolt/dwio/parquet/reader/ParquetReader.cpp b/bolt/dwio/parquet/reader/ParquetReader.cpp index f047eafc2..5415d5867 100644 --- a/bolt/dwio/parquet/reader/ParquetReader.cpp +++ b/bolt/dwio/parquet/reader/ParquetReader.cpp @@ -502,16 +502,18 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( } } - auto child = getParquetColumnInfo( - maxSchemaElementIdx, - maxRepeat, - maxDefine, - curSchemaIdx, - schemaIdx, - columnIdx, - childRequestedType, - requestedType); - children.push_back(std::move(child)); + if (followChild) { + auto child = getParquetColumnInfo( + maxSchemaElementIdx, + maxRepeat, + maxDefine, + curSchemaIdx, + schemaIdx, + columnIdx, + childRequestedType, + requestedType); + children.push_back(std::move(child)); + } } BOLT_CHECK(!children.empty()); From 0219040ccc0cbd5339e74eff889c433beb6ebcd6 Mon Sep 17 00:00:00 2001 From: guhaiyan Date: Thu, 16 Apr 2026 22:14:29 +0800 Subject: [PATCH 3/3] fix ut --- bolt/dwio/parquet/reader/ParquetReader.cpp | 39 +++++++++++++++++-- .../tests/reader/ParquetReaderTest.cpp | 1 - 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/bolt/dwio/parquet/reader/ParquetReader.cpp b/bolt/dwio/parquet/reader/ParquetReader.cpp index 5415d5867..38ea839d1 100644 --- a/bolt/dwio/parquet/reader/ParquetReader.cpp +++ b/bolt/dwio/parquet/reader/ParquetReader.cpp @@ -69,6 +69,19 @@ ::parquet::EncryptionAlgorithm FromThrift( } return algo; } + +bool isParquetReservedKeyword( + std::string_view name, + uint32_t parentSchemaIdx, + uint32_t curSchemaIdx) { + // We skip this for the top-level nodes. + return ( + (parentSchemaIdx == 0 && curSchemaIdx == 0) || + (parentSchemaIdx != 0 && + (name == "key_value" || name == "key" || name == "value" || + name == "list" || name == "element" || name == "bag" || + name == "array_element"))); +} } // namespace /// Metadata and options for reading Parquet. @@ -166,7 +179,8 @@ class ReaderBase { uint32_t& schemaIdx, uint32_t& columnIdx, const TypePtr& requestedType, - const TypePtr& parentRequestedType) const; + const TypePtr& parentRequestedType, + std::vector& columnNames) const; TypePtr convertType( const thrift::SchemaElement& schemaElement, @@ -374,6 +388,7 @@ void ReaderBase::initializeSchema() { uint32_t schemaIdx = 0; uint32_t columnIdx = 0; uint32_t maxSchemaElementIdx = fileMetaData_->schema.size() - 1; + std::vector columnNames; // Setting the parent schema index of the root("hive_schema") to be 0, which // is the root itself. This is ok because it's never required to check the // parent of the root in getParquetColumnInfo(). @@ -385,7 +400,8 @@ void ReaderBase::initializeSchema() { schemaIdx, columnIdx, options_.getFileSchema(), - nullptr); + nullptr, + columnNames); schema_ = createRowType( schemaWithId_->getChildren(), isFileColumnNamesReadAsLowerCase()); @@ -403,7 +419,8 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( uint32_t& schemaIdx, uint32_t& columnIdx, const TypePtr& requestedType, - const TypePtr& parentRequestedType) const { + const TypePtr& parentRequestedType, + std::vector& columnNames) const { BOLT_CHECK(fileMetaData_ != nullptr); BOLT_CHECK_LT(schemaIdx, fileMetaData_->schema.size()); @@ -433,6 +450,15 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( if (isFileColumnNamesReadAsLowerCase()) { folly::toLowerAscii(name); } + + if (!options_.isUseColumnNamesForColumnMapping() && + options_.getFileSchema()) { + if (isParquetReservedKeyword(name, parentSchemaIdx, curSchemaIdx)) { + columnNames.push_back(name); + } + } else { + columnNames.push_back(name); + } if (!schemaElement.__isset.type) { // inner node BOLT_CHECK( schemaElement.__isset.num_children && schemaElement.num_children > 0, @@ -479,6 +505,7 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( } else { // Handle schema evolution. if (i < requestedRowType->size()) { + columnNames.push_back(requestedRowType->nameOf(i)); childRequestedType = requestedRowType->childAt(i); } else { followChild = false; @@ -511,12 +538,15 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( schemaIdx, columnIdx, childRequestedType, - requestedType); + requestedType, + columnNames); children.push_back(std::move(child)); } } BOLT_CHECK(!children.empty()); + name = columnNames.at(curSchemaIdx); + // Detect Spark 4.0 Variant structure: STRUCT // Promote only when the requested logical type explicitly asks for // VARIANT. The raw Parquet schema alone is not specific enough because @@ -835,6 +865,7 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( } } } else { // leaf node + name = columnNames.at(curSchemaIdx); const auto boltType = convertType(schemaElement, requestedType); int32_t precision = schemaElement.__isset.precision ? schemaElement.precision : 0; diff --git a/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp b/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp index 4edf71ada..615e4ab20 100644 --- a/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp +++ b/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp @@ -1290,7 +1290,6 @@ TEST_F(ParquetReaderTest, readEncryptedParquetWithProjection) { const std::string sample(getExampleFilePath("encrypted_sample.parquet")); bytedance::bolt::dwio::common::ReaderOptions readerOptions{leafPool_.get()}; - readerOptions.setFileSchema(projectedType); auto reader = createReader(sample, readerOptions); auto rowReaderOpts = getReaderOpts(projectedType);