diff --git a/bolt/dwio/parquet/reader/ParquetColumnReader.cpp b/bolt/dwio/parquet/reader/ParquetColumnReader.cpp index af1d57809..07df86616 100644 --- a/bolt/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/bolt/dwio/parquet/reader/ParquetColumnReader.cpp @@ -152,9 +152,12 @@ std::unique_ptr ParquetColumnReader::build( return std::make_unique( requestedType, fileType, params, scanSpec); - case TypeKind::ARRAY: + case TypeKind::ARRAY: { + BOLT_CHECK( + requestedType->type()->isArray(), "Requested type must be array"); return std::make_unique( columnReaderOptions, requestedType, fileType, params, scanSpec, pool); + } case TypeKind::MAP: return std::make_unique( diff --git a/bolt/dwio/parquet/reader/ParquetReader.cpp b/bolt/dwio/parquet/reader/ParquetReader.cpp index 369b9b250..38ea839d1 100644 --- a/bolt/dwio/parquet/reader/ParquetReader.cpp +++ b/bolt/dwio/parquet/reader/ParquetReader.cpp @@ -69,6 +69,19 @@ ::parquet::EncryptionAlgorithm FromThrift( } return algo; } + +bool isParquetReservedKeyword( + std::string_view name, + uint32_t parentSchemaIdx, + uint32_t curSchemaIdx) { + // We skip this for the top-level nodes. + return ( + (parentSchemaIdx == 0 && curSchemaIdx == 0) || + (parentSchemaIdx != 0 && + (name == "key_value" || name == "key" || name == "value" || + name == "list" || name == "element" || name == "bag" || + name == "array_element"))); +} } // namespace /// Metadata and options for reading Parquet. @@ -166,7 +179,8 @@ class ReaderBase { uint32_t& schemaIdx, uint32_t& columnIdx, const TypePtr& requestedType, - const TypePtr& parentRequestedType) const; + const TypePtr& parentRequestedType, + std::vector& columnNames) const; TypePtr convertType( const thrift::SchemaElement& schemaElement, @@ -374,6 +388,7 @@ void ReaderBase::initializeSchema() { uint32_t schemaIdx = 0; uint32_t columnIdx = 0; uint32_t maxSchemaElementIdx = fileMetaData_->schema.size() - 1; + std::vector columnNames; // Setting the parent schema index of the root("hive_schema") to be 0, which // is the root itself. This is ok because it's never required to check the // parent of the root in getParquetColumnInfo(). @@ -385,7 +400,8 @@ void ReaderBase::initializeSchema() { schemaIdx, columnIdx, options_.getFileSchema(), - nullptr); + nullptr, + columnNames); schema_ = createRowType( schemaWithId_->getChildren(), isFileColumnNamesReadAsLowerCase()); @@ -403,7 +419,8 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( uint32_t& schemaIdx, uint32_t& columnIdx, const TypePtr& requestedType, - const TypePtr& parentRequestedType) const { + const TypePtr& parentRequestedType, + std::vector& columnNames) const { BOLT_CHECK(fileMetaData_ != nullptr); BOLT_CHECK_LT(schemaIdx, fileMetaData_->schema.size()); @@ -433,6 +450,15 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( if (isFileColumnNamesReadAsLowerCase()) { folly::toLowerAscii(name); } + + if (!options_.isUseColumnNamesForColumnMapping() && + options_.getFileSchema()) { + if (isParquetReservedKeyword(name, parentSchemaIdx, curSchemaIdx)) { + columnNames.push_back(name); + } + } else { + columnNames.push_back(name); + } if (!schemaElement.__isset.type) { // inner node BOLT_CHECK( schemaElement.__isset.num_children && schemaElement.num_children > 0, @@ -452,11 +478,39 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( } TypePtr childRequestedType = nullptr; - if (requestedType && requestedType->isRow()) { - auto fileTypeIdx = - requestedType->asRow().getChildIdxIfExists(childName); - if (fileTypeIdx.has_value()) { - childRequestedType = requestedType->asRow().childAt(*fileTypeIdx); + bool followChild = true; + + { + RowTypePtr requestedRowType = nullptr; + if (requestedType) { + if (requestedType->isRow()) { + requestedRowType = + std::dynamic_pointer_cast(requestedType); + } else if ( + requestedType->isArray() && isRepeated && + requestedType->asArray().elementType()->isRow()) { + // Handle the case of unannotated array of structs (repeated group + // without LIST annotation). + requestedRowType = std::dynamic_pointer_cast( + requestedType->asArray().elementType()); + } + } + + if (requestedRowType) { + if (options_.isUseColumnNamesForColumnMapping()) { + auto fileTypeIdx = requestedRowType->getChildIdxIfExists(childName); + if (fileTypeIdx.has_value()) { + childRequestedType = requestedRowType->childAt(*fileTypeIdx); + } + } else { + // Handle schema evolution. + if (i < requestedRowType->size()) { + columnNames.push_back(requestedRowType->nameOf(i)); + childRequestedType = requestedRowType->childAt(i); + } else { + followChild = false; + } + } } } @@ -475,19 +529,24 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( } } - auto child = getParquetColumnInfo( - maxSchemaElementIdx, - maxRepeat, - maxDefine, - curSchemaIdx, - schemaIdx, - columnIdx, - childRequestedType, - requestedType); - children.push_back(std::move(child)); + if (followChild) { + auto child = getParquetColumnInfo( + maxSchemaElementIdx, + maxRepeat, + maxDefine, + curSchemaIdx, + schemaIdx, + columnIdx, + childRequestedType, + requestedType, + columnNames); + children.push_back(std::move(child)); + } } BOLT_CHECK(!children.empty()); + name = columnNames.at(curSchemaIdx); + // Detect Spark 4.0 Variant structure: STRUCT // Promote only when the requested logical type explicitly asks for // VARIANT. The raw Parquet schema alone is not specific enough because @@ -747,7 +806,7 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( isOptional, isRepeated); } else { - // Row type + // Row type in a repeated field without LIST/MAP annotation. // To support list backward compatibility, need create a new row type // instance and set all the fields as its children. auto childrenRowType = @@ -806,6 +865,7 @@ std::shared_ptr ReaderBase::getParquetColumnInfo( } } } else { // leaf node + name = columnNames.at(curSchemaIdx); const auto boltType = convertType(schemaElement, requestedType); int32_t precision = schemaElement.__isset.precision ? schemaElement.precision : 0; diff --git a/bolt/dwio/parquet/tests/examples/nested_array_struct.parquet b/bolt/dwio/parquet/tests/examples/nested_array_struct.parquet new file mode 100644 index 000000000..41a43fa35 Binary files /dev/null and b/bolt/dwio/parquet/tests/examples/nested_array_struct.parquet differ diff --git a/bolt/dwio/parquet/tests/examples/proto_repeated_string.parquet b/bolt/dwio/parquet/tests/examples/proto_repeated_string.parquet new file mode 100644 index 000000000..8a7eea601 Binary files /dev/null and b/bolt/dwio/parquet/tests/examples/proto_repeated_string.parquet differ diff --git a/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp b/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp index 4edf71ada..615e4ab20 100644 --- a/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp +++ b/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp @@ -1290,7 +1290,6 @@ TEST_F(ParquetReaderTest, readEncryptedParquetWithProjection) { const std::string sample(getExampleFilePath("encrypted_sample.parquet")); bytedance::bolt::dwio::common::ReaderOptions readerOptions{leafPool_.get()}; - readerOptions.setFileSchema(projectedType); auto reader = createReader(sample, readerOptions); auto rowReaderOpts = getReaderOpts(projectedType); diff --git a/bolt/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/bolt/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 1b1592b69..3a55e6853 100644 --- a/bolt/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/bolt/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -637,20 +637,92 @@ TEST_F(ParquetTableScanTest, singleRowStruct) { assertSelectWithFilter({"s"}, {}, "", "SELECT (0, 1)"); } -// Core dump and incorrect result are fixed. -TEST_F(ParquetTableScanTest, DISABLED_array) { +TEST_F(ParquetTableScanTest, array) { + auto filePath = getExampleFilePath("old_repeated_int.parquet"); auto vector = makeArrayVector({}); loadData( - getExampleFilePath("old_repeated_int.parquet"), + filePath, ROW({"repeatedInt"}, {ARRAY(INTEGER())}), makeRowVector( {"repeatedInt"}, { vector, })); + assertSelectWithFilter({"repeatedInt"}, {}, "", "SELECT [1,2,3]"); - assertSelectWithFilter( - {"repeatedInt"}, {}, "", "SELECT UNNEST(array[array[1,2,3]])"); + // Set the requested type for unannotated array. + auto rowType = ROW({"repeatedInt"}, {ARRAY(INTEGER())}); + auto plan = + PlanBuilder(pool_.get()).tableScan(rowType, {}, "", rowType).planNode(); + + assertQuery(plan, splits(), "SELECT [1,2,3]"); + + // Throws when reading repeated values as scalar type. + rowType = ROW({"repeatedInt"}, {INTEGER()}); + plan = + PlanBuilder(pool_.get()).tableScan(rowType, {}, "", rowType).planNode(); + BOLT_ASSERT_THROW( + assertQuery(plan, splits(), "SELECT [1,2,3]"), + "Requested type must be array"); + + rowType = ROW({"mystring"}, {ARRAY(VARCHAR())}); + plan = + PlanBuilder(pool_.get()).tableScan(rowType, {}, "", rowType).planNode(); + + { + const auto protoFilePath = + getExampleFilePath("proto_repeated_string.parquet"); + AssertQueryBuilder(plan, duckDbQueryRunner_) + .connectorSessionProperty( + kHiveConnectorId, + connector::hive::HiveConfig::kParquetUseColumnNamesSession, + "true") + .connectorSessionProperty( + kHiveConnectorId, + connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCaseSession, + "true") + .split(makeSplit(protoFilePath)) + .assertResults( + "SELECT UNNEST(array[array['hello', 'world'], array['good','bye'], array['one', 'two', 'three']])"); + } + + rowType = + ROW({"primitive", "myComplex"}, + {INTEGER(), + ARRAY( + ROW({"id", "repeatedMessage"}, + {INTEGER(), ARRAY(ROW({"someId"}, {INTEGER()}))}))}); + plan = + PlanBuilder(pool_.get()).tableScan(rowType, {}, "", rowType).planNode(); + + // Construct the expected vector. + auto someIdVector = makeArrayOfRowVector( + ROW({"someId"}, {INTEGER()}), + { + {variant::row({3})}, + {variant::row({6})}, + {variant::row({9})}, + }); + auto rowVector = makeRowVector( + {"id", "repeatedMessage"}, + { + makeFlatVector({1, 4, 7}), + someIdVector, + }); + auto expected = makeRowVector( + {"primitive", "myComplex"}, + { + makeFlatVector({2, 5, 8}), + makeArrayVector({0, 1, 2}, rowVector), + }); + + AssertQueryBuilder(plan, duckDbQueryRunner_) + .connectorSessionProperty( + kHiveConnectorId, + connector::hive::HiveConfig::kParquetUseColumnNamesSession, + "true") + .split(makeSplit(getExampleFilePath("nested_array_struct.parquet"))) + .assertResults(expected); } // Optional array with required elements.