Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bolt/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,12 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
return std::make_unique<StringColumnReader>(
requestedType, fileType, params, scanSpec);

case TypeKind::ARRAY:
case TypeKind::ARRAY: {
BOLT_CHECK(
requestedType->type()->isArray(), "Requested type must be array");
return std::make_unique<ListColumnReader>(
columnReaderOptions, requestedType, fileType, params, scanSpec, pool);
}

case TypeKind::MAP:
return std::make_unique<MapColumnReader>(
Expand Down
98 changes: 79 additions & 19 deletions bolt/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ ::parquet::EncryptionAlgorithm FromThrift(
}
return algo;
}

bool isParquetReservedKeyword(
std::string_view name,
uint32_t parentSchemaIdx,
uint32_t curSchemaIdx) {
// We skip this for the top-level nodes.
return (
(parentSchemaIdx == 0 && curSchemaIdx == 0) ||
(parentSchemaIdx != 0 &&
(name == "key_value" || name == "key" || name == "value" ||
name == "list" || name == "element" || name == "bag" ||
name == "array_element")));
}
} // namespace

/// Metadata and options for reading Parquet.
Expand Down Expand Up @@ -166,7 +179,8 @@ class ReaderBase {
uint32_t& schemaIdx,
uint32_t& columnIdx,
const TypePtr& requestedType,
const TypePtr& parentRequestedType) const;
const TypePtr& parentRequestedType,
std::vector<std::string>& columnNames) const;

TypePtr convertType(
const thrift::SchemaElement& schemaElement,
Expand Down Expand Up @@ -374,6 +388,7 @@ void ReaderBase::initializeSchema() {
uint32_t schemaIdx = 0;
uint32_t columnIdx = 0;
uint32_t maxSchemaElementIdx = fileMetaData_->schema.size() - 1;
std::vector<std::string> columnNames;
// Setting the parent schema index of the root("hive_schema") to be 0, which
// is the root itself. This is ok because it's never required to check the
// parent of the root in getParquetColumnInfo().
Expand All @@ -385,7 +400,8 @@ void ReaderBase::initializeSchema() {
schemaIdx,
columnIdx,
options_.getFileSchema(),
nullptr);
nullptr,
columnNames);
schema_ = createRowType(
schemaWithId_->getChildren(), isFileColumnNamesReadAsLowerCase());

Expand All @@ -403,7 +419,8 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
uint32_t& schemaIdx,
uint32_t& columnIdx,
const TypePtr& requestedType,
const TypePtr& parentRequestedType) const {
const TypePtr& parentRequestedType,
std::vector<std::string>& columnNames) const {
BOLT_CHECK(fileMetaData_ != nullptr);
BOLT_CHECK_LT(schemaIdx, fileMetaData_->schema.size());

Expand Down Expand Up @@ -433,6 +450,15 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
if (isFileColumnNamesReadAsLowerCase()) {
folly::toLowerAscii(name);
}

if (!options_.isUseColumnNamesForColumnMapping() &&
options_.getFileSchema()) {
if (isParquetReservedKeyword(name, parentSchemaIdx, curSchemaIdx)) {
columnNames.push_back(name);
}
} else {
columnNames.push_back(name);
}
if (!schemaElement.__isset.type) { // inner node
BOLT_CHECK(
schemaElement.__isset.num_children && schemaElement.num_children > 0,
Expand All @@ -452,11 +478,39 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
}

TypePtr childRequestedType = nullptr;
if (requestedType && requestedType->isRow()) {
auto fileTypeIdx =
requestedType->asRow().getChildIdxIfExists(childName);
if (fileTypeIdx.has_value()) {
childRequestedType = requestedType->asRow().childAt(*fileTypeIdx);
bool followChild = true;
Comment thread
guhaiyan0221 marked this conversation as resolved.

{
RowTypePtr requestedRowType = nullptr;
if (requestedType) {
if (requestedType->isRow()) {
requestedRowType =
std::dynamic_pointer_cast<const RowType>(requestedType);
} else if (
requestedType->isArray() && isRepeated &&
requestedType->asArray().elementType()->isRow()) {
// Handle the case of unannotated array of structs (repeated group
// without LIST annotation).
requestedRowType = std::dynamic_pointer_cast<const RowType>(
requestedType->asArray().elementType());
}
}

if (requestedRowType) {
if (options_.isUseColumnNamesForColumnMapping()) {
auto fileTypeIdx = requestedRowType->getChildIdxIfExists(childName);
if (fileTypeIdx.has_value()) {
childRequestedType = requestedRowType->childAt(*fileTypeIdx);
}
} else {
// Handle schema evolution.
if (i < requestedRowType->size()) {
columnNames.push_back(requestedRowType->nameOf(i));
childRequestedType = requestedRowType->childAt(i);
} else {
followChild = false;
}
}
}
}

Expand All @@ -475,19 +529,24 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
}
}

auto child = getParquetColumnInfo(
maxSchemaElementIdx,
maxRepeat,
maxDefine,
curSchemaIdx,
schemaIdx,
columnIdx,
childRequestedType,
requestedType);
children.push_back(std::move(child));
if (followChild) {
auto child = getParquetColumnInfo(
maxSchemaElementIdx,
maxRepeat,
maxDefine,
curSchemaIdx,
schemaIdx,
columnIdx,
childRequestedType,
requestedType,
columnNames);
children.push_back(std::move(child));
}
}
BOLT_CHECK(!children.empty());

name = columnNames.at(curSchemaIdx);

// Detect Spark 4.0 Variant structure: STRUCT<value BINARY, metadata BINARY>
// Promote only when the requested logical type explicitly asks for
// VARIANT. The raw Parquet schema alone is not specific enough because
Expand Down Expand Up @@ -747,7 +806,7 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
isOptional,
isRepeated);
} else {
// Row type
// Row type in a repeated field without LIST/MAP annotation.
// To support list backward compatibility, need create a new row type
// instance and set all the fields as its children.
auto childrenRowType =
Expand Down Expand Up @@ -806,6 +865,7 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
}
}
} else { // leaf node
name = columnNames.at(curSchemaIdx);
const auto boltType = convertType(schemaElement, requestedType);
int32_t precision =
schemaElement.__isset.precision ? schemaElement.precision : 0;
Expand Down
Binary file not shown.
Binary file not shown.
1 change: 0 additions & 1 deletion bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,6 @@ TEST_F(ParquetReaderTest, readEncryptedParquetWithProjection) {
const std::string sample(getExampleFilePath("encrypted_sample.parquet"));

bytedance::bolt::dwio::common::ReaderOptions readerOptions{leafPool_.get()};
readerOptions.setFileSchema(projectedType);
auto reader = createReader(sample, readerOptions);

auto rowReaderOpts = getReaderOpts(projectedType);
Expand Down
82 changes: 77 additions & 5 deletions bolt/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,20 +637,92 @@ TEST_F(ParquetTableScanTest, singleRowStruct) {
assertSelectWithFilter({"s"}, {}, "", "SELECT (0, 1)");
}

// Core dump and incorrect result are fixed.
TEST_F(ParquetTableScanTest, DISABLED_array) {
TEST_F(ParquetTableScanTest, array) {
auto filePath = getExampleFilePath("old_repeated_int.parquet");
auto vector = makeArrayVector<int32_t>({});
loadData(
getExampleFilePath("old_repeated_int.parquet"),
filePath,
ROW({"repeatedInt"}, {ARRAY(INTEGER())}),
makeRowVector(
{"repeatedInt"},
{
vector,
}));
assertSelectWithFilter({"repeatedInt"}, {}, "", "SELECT [1,2,3]");

assertSelectWithFilter(
{"repeatedInt"}, {}, "", "SELECT UNNEST(array[array[1,2,3]])");
// Set the requested type for unannotated array.
auto rowType = ROW({"repeatedInt"}, {ARRAY(INTEGER())});
auto plan =
PlanBuilder(pool_.get()).tableScan(rowType, {}, "", rowType).planNode();

assertQuery(plan, splits(), "SELECT [1,2,3]");

// Throws when reading repeated values as scalar type.
rowType = ROW({"repeatedInt"}, {INTEGER()});
plan =
PlanBuilder(pool_.get()).tableScan(rowType, {}, "", rowType).planNode();
BOLT_ASSERT_THROW(
assertQuery(plan, splits(), "SELECT [1,2,3]"),
"Requested type must be array");

rowType = ROW({"mystring"}, {ARRAY(VARCHAR())});
plan =
PlanBuilder(pool_.get()).tableScan(rowType, {}, "", rowType).planNode();

{
const auto protoFilePath =
getExampleFilePath("proto_repeated_string.parquet");
AssertQueryBuilder(plan, duckDbQueryRunner_)
.connectorSessionProperty(
kHiveConnectorId,
connector::hive::HiveConfig::kParquetUseColumnNamesSession,
"true")
.connectorSessionProperty(
kHiveConnectorId,
connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCaseSession,
"true")
.split(makeSplit(protoFilePath))
.assertResults(
"SELECT UNNEST(array[array['hello', 'world'], array['good','bye'], array['one', 'two', 'three']])");
}

rowType =
ROW({"primitive", "myComplex"},
{INTEGER(),
ARRAY(
ROW({"id", "repeatedMessage"},
{INTEGER(), ARRAY(ROW({"someId"}, {INTEGER()}))}))});
plan =
PlanBuilder(pool_.get()).tableScan(rowType, {}, "", rowType).planNode();

// Construct the expected vector.
auto someIdVector = makeArrayOfRowVector(
ROW({"someId"}, {INTEGER()}),
{
{variant::row({3})},
{variant::row({6})},
{variant::row({9})},
});
auto rowVector = makeRowVector(
{"id", "repeatedMessage"},
{
makeFlatVector<int32_t>({1, 4, 7}),
someIdVector,
});
auto expected = makeRowVector(
{"primitive", "myComplex"},
{
makeFlatVector<int32_t>({2, 5, 8}),
makeArrayVector({0, 1, 2}, rowVector),
});

AssertQueryBuilder(plan, duckDbQueryRunner_)
.connectorSessionProperty(
kHiveConnectorId,
connector::hive::HiveConfig::kParquetUseColumnNamesSession,
"true")
.split(makeSplit(getExampleFilePath("nested_array_struct.parquet")))
.assertResults(expected);
}

// Optional array with required elements.
Expand Down
Loading