From b44c7d2322a0659eb23f3bcb7a592961f6ed095f Mon Sep 17 00:00:00 2001 From: "xuweixin.rex" Date: Wed, 8 Apr 2026 16:10:22 +0800 Subject: [PATCH 1/3] [feat][cp] Allow transferring a vector to a given memory pool Summary: This diff adds an API `BaseVector::transferOrCopyTo(MemoryPool* pool)` to allow transferring the ownership of a vector to a target memory pool. If a buffer in this vector cannot be transferred (e.g., BufferView), it is copied to the target pool. After the call, the vector is entirely owned by the new pool such that the original pool can be destructed even if this vector is not released yet. (Note that this API transfers a buffer to the target pool even if it's multiply referenced.) from https://github.com/facebookincubator/velox/pull/13814 --- bolt/buffer/Buffer.h | 70 +++++++ bolt/common/memory/MemoryPool.cpp | 6 +- bolt/common/memory/tests/MemoryPoolTest.cpp | 207 ++++++++++++++++++++ bolt/vector/BaseVector.cpp | 12 ++ bolt/vector/BaseVector.h | 10 +- bolt/vector/BiasVector.h | 4 + bolt/vector/ComplexVector.cpp | 30 +++ bolt/vector/ComplexVector.h | 8 + bolt/vector/ConstantVector.h | 10 + bolt/vector/DictionaryVector.h | 9 + bolt/vector/FlatVector.h | 18 ++ bolt/vector/FunctionVector.h | 4 + bolt/vector/LazyVector.h | 7 + bolt/vector/SequenceVector.h | 4 + bolt/vector/fuzzer/VectorFuzzer.cpp | 15 ++ bolt/vector/fuzzer/VectorFuzzer.h | 3 + bolt/vector/tests/VectorTest.cpp | 162 +++++++++++++++ 17 files changed, 573 insertions(+), 6 deletions(-) diff --git a/bolt/buffer/Buffer.h b/bolt/buffer/Buffer.h index 64a1af529..1c7102b8b 100644 --- a/bolt/buffer/Buffer.h +++ b/bolt/buffer/Buffer.h @@ -212,6 +212,10 @@ class Buffer { sizeof(T), is_pod_like_v, buffer, offset, length); } + virtual bool transferTo(bolt::memory::MemoryPool* /*pool*/) { + BOLT_NYI("{} unsupported", __FUNCTION__); + } + protected: // Writes a magic word at 'capacity_'. No-op for a BufferView. The actual // logic is inside a separate virtual function, allowing override by derived @@ -509,6 +513,42 @@ class AlignedBuffer : public Buffer { return newBuffer; } + template + static BufferPtr copy( + const BufferPtr& buffer, + bolt::memory::MemoryPool* pool) { + if (buffer == nullptr) { + return nullptr; + } + + // The reason we use uint8_t is because mutableNulls()->size() will return + // in byte count. We also don't bother initializing since copyFrom will be + // overwriting anyway. + BufferPtr newBuffer; + if constexpr (std::is_same_v) { + newBuffer = AlignedBuffer::allocate(buffer->size(), pool); + } else { + const auto numElements = checkedDivide(buffer->size(), sizeof(T)); + newBuffer = AlignedBuffer::allocate(numElements, pool); + } + + newBuffer->copyFrom(buffer.get(), newBuffer->size()); + + return newBuffer; + } + + bool transferTo(bolt::memory::MemoryPool* pool) override { + if (pool_ == pool) { + return true; + } + if (pool_->transferTo( + pool, this, checkedPlus(kPaddedSize, capacity_))) { + setPool(pool); + return true; + } + return false; + } + protected: AlignedBuffer(bolt::memory::MemoryPool* pool, size_t capacity) : Buffer( @@ -549,6 +589,12 @@ class AlignedBuffer : public Buffer { } } + void setPool(bolt::memory::MemoryPool* pool) { + bolt::memory::MemoryPool** poolPtr = + const_cast(&pool_); + *poolPtr = pool; + } + protected: void setEndGuardImpl() override { *reinterpret_cast(data_ + capacity_) = kEndGuard; @@ -680,6 +726,23 @@ class NonPODAlignedBuffer : public Buffer { this, checkedPlus(AlignedBuffer::kPaddedSize, capacity_)); } + bool transferTo(bolt::memory::MemoryPool* pool) override { + if (pool_ == pool) { + return true; + } + + if (pool_->transferTo( + pool, + this, + checkedPlus(AlignedBuffer::kPaddedSize, capacity_))) { + bolt::memory::MemoryPool** poolPtr = + const_cast(&pool_); + *poolPtr = pool; + return true; + } + return false; + } + // Needs to use this class from static methods of AlignedBuffer friend class AlignedBuffer; }; @@ -707,6 +770,13 @@ class BufferView : public Buffer { return true; } + bool transferTo(bolt::memory::MemoryPool* pool) override { + if (pool_ == pool) { + return true; + } + return false; + } + private: BufferView(const uint8_t* data, size_t size, Releaser releaser, bool podType) // A BufferView must be created over the data held by a cache diff --git a/bolt/common/memory/MemoryPool.cpp b/bolt/common/memory/MemoryPool.cpp index 4e3a068bf..958089dd1 100644 --- a/bolt/common/memory/MemoryPool.cpp +++ b/bolt/common/memory/MemoryPool.cpp @@ -606,7 +606,7 @@ void* MemoryPoolImpl::allocate( toString(), allocator_->getAndClearFailureMessage())); } - RECORD_ALLOC(buffer, size); + RECORD_ALLOC_SPEC(this, buffer, size); return buffer; } @@ -630,7 +630,7 @@ void* MemoryPoolImpl::allocateZeroFilled( toString(), allocator_->getAndClearFailureMessage())); } - RECORD_ALLOC(buffer, size); + RECORD_ALLOC_SPEC(this, buffer, size); return buffer; } @@ -664,7 +664,7 @@ void* MemoryPoolImpl::reallocate( if (p) { RECORD_FREE(p, size); } - RECORD_ALLOC(newP, newSize); + RECORD_ALLOC_SPEC(this, newP, newSize); } else { RECORD_GROW(p, newP, size, newSize); } diff --git a/bolt/common/memory/tests/MemoryPoolTest.cpp b/bolt/common/memory/tests/MemoryPoolTest.cpp index 9f100b5be..3fa7a31f0 100644 --- a/bolt/common/memory/tests/MemoryPoolTest.cpp +++ b/bolt/common/memory/tests/MemoryPoolTest.cpp @@ -3862,6 +3862,213 @@ TEST_P(MemoryPoolTest, allocationWithCoveredCollateral) { pool->freeContiguous(contiguousAllocation); } +TEST_P(MemoryPoolTest, transferTo) { + MemoryManager::Options options; + options.alignment = MemoryAllocator::kMinAlignment; + options.allocatorCapacity = kDefaultCapacity; + setupMemory(options); + auto manager = getMemoryManager(); + + auto largestSizeClass = manager->allocator()->largestSizeClass(); + std::vector pageCounts{ + largestSizeClass, + largestSizeClass + 1, + largestSizeClass / 10, + 1, + largestSizeClass * 2, + largestSizeClass * 3 + 1}; + + auto assertEqualBytes = [](const memory::MemoryPool* pool, + int64_t usedBytes, + int64_t peakBytes, + int64_t reservedBytes) { + EXPECT_EQ(pool->currentBytes(), usedBytes); + EXPECT_EQ(pool->peakBytes(), peakBytes); + EXPECT_EQ(pool->reservedBytes(), reservedBytes); + }; + + auto assertZeroByte = [](const memory::MemoryPool* pool) { + EXPECT_EQ(pool->currentBytes(), 0); + EXPECT_EQ(pool->reservedBytes(), 0); + }; + + auto getMemoryBytes = [](const memory::MemoryPool* pool) { + return std::make_tuple( + pool->currentBytes(), pool->peakBytes(), pool->reservedBytes()); + }; + + auto createPools = [&manager](bool betweenDifferentRoots) { + auto root1 = manager->addRootPool("root1"); + auto root2 = manager->addRootPool("root2"); + std::shared_ptr from; + std::shared_ptr to; + if (betweenDifferentRoots) { + from = root1->addLeafChild("from"); + to = root2->addLeafChild("to"); + } else { + from = root1->addLeafChild("from"); + to = root1->addLeafChild("to"); + } + return std::make_tuple(root1, root2, from, to); + }; + + auto testTransferAllocate = [&assertZeroByte, + &assertEqualBytes, + &getMemoryBytes, + &createPools](bool betweenDifferentRoots) { + auto [root1, root2, from, to] = createPools(betweenDifferentRoots); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + + const auto kSize = 1024; + int64_t usedBytes, rootUsedBytes; + int64_t peakBytes, rootPeakBytes; + int64_t reservedBytes, rootReservedBytes; + auto buffer = from->allocate(kSize); + // Transferring between non-leaf pools is not allowed. + EXPECT_FALSE(from->root()->transferTo(to.get(), buffer, kSize)); + EXPECT_FALSE(from->transferTo(to->root(), buffer, kSize)); + + std::tie(usedBytes, peakBytes, reservedBytes) = getMemoryBytes(from.get()); + std::tie(rootUsedBytes, rootPeakBytes, rootReservedBytes) = + getMemoryBytes(from->root()); + from->transferTo(to.get(), buffer, kSize); + assertEqualBytes(to.get(), usedBytes, peakBytes, reservedBytes); + if (from->root() == to->root()) { + rootPeakBytes *= 2; + } + assertEqualBytes( + to->root(), rootUsedBytes, rootPeakBytes, rootReservedBytes); + to->free(buffer, kSize); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + }; + + auto testTransferAllocateZeroFilled = + [&assertZeroByte, &assertEqualBytes, &getMemoryBytes, &createPools]( + bool betweenDifferentRoots) { + auto [root1, root2, from, to] = createPools(betweenDifferentRoots); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + + const auto kSize = 1024; + int64_t usedBytes, rootUsedBytes; + int64_t peakBytes, rootPeakBytes; + int64_t reservedBytes, rootReservedBytes; + auto buffer = from->allocateZeroFilled(8, kSize / 8); + std::tie(usedBytes, peakBytes, reservedBytes) = + getMemoryBytes(from.get()); + std::tie(rootUsedBytes, rootPeakBytes, rootReservedBytes) = + getMemoryBytes(from->root()); + from->transferTo(to.get(), buffer, kSize); + assertEqualBytes(to.get(), usedBytes, peakBytes, reservedBytes); + if (from->root() == to->root()) { + rootPeakBytes *= 2; + } + assertEqualBytes( + to->root(), rootUsedBytes, rootPeakBytes, rootReservedBytes); + to->free(buffer, kSize); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + }; + + auto testTransferAllocateContiguous = + [&assertZeroByte, &assertEqualBytes, &getMemoryBytes, &createPools]( + uint64_t pageCount, bool betweenDifferentRoots) { + auto [root1, root2, from, to] = createPools(betweenDifferentRoots); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + + int64_t usedBytes, rootUsedBytes; + int64_t peakBytes, rootPeakBytes; + int64_t reservedBytes, rootReservedBytes; + ContiguousAllocation out; + from->allocateContiguous(pageCount, out); + std::tie(usedBytes, peakBytes, reservedBytes) = + getMemoryBytes(from.get()); + std::tie(rootUsedBytes, rootPeakBytes, rootReservedBytes) = + getMemoryBytes(from->root()); + from->transferTo(to.get(), out.data(), out.size()); + assertEqualBytes(to.get(), usedBytes, peakBytes, reservedBytes); + if (from->root() == to->root()) { + rootPeakBytes *= 2; + } + assertEqualBytes( + to->root(), rootUsedBytes, rootPeakBytes, rootReservedBytes); + to->freeContiguous(out); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + }; + + auto testTransferAllocateNonContiguous = + [&assertZeroByte, &assertEqualBytes, &getMemoryBytes, &createPools]( + uint64_t pageCount, bool betweenDifferentRoots) { + auto [root1, root2, from, to] = createPools(betweenDifferentRoots); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + + int64_t usedBytes, rootUsedBytes; + int64_t peakBytes, rootPeakBytes; + int64_t reservedBytes, rootReservedBytes; + Allocation out; + from->allocateNonContiguous(pageCount, out); + std::tie(usedBytes, peakBytes, reservedBytes) = + getMemoryBytes(from.get()); + std::tie(rootUsedBytes, rootPeakBytes, rootReservedBytes) = + getMemoryBytes(from->root()); + for (auto i = 0; i < out.numRuns(); ++i) { + const auto& run = out.runAt(i); + from->transferTo(to.get(), run.data(), run.numBytes()); + } + assertEqualBytes(to.get(), usedBytes, peakBytes, reservedBytes); + if (from->root() == to->root()) { + EXPECT_EQ(to->root()->currentBytes(), rootUsedBytes); + // We reserve and release memory run-by-run, so the peak bytes would + // be no greater than twice of the original peak bytes. + EXPECT_LE(to->root()->peakBytes(), rootPeakBytes * 2); + EXPECT_EQ(to->root()->reservedBytes(), rootReservedBytes); + } else { + assertEqualBytes( + to->root(), rootUsedBytes, rootPeakBytes, rootReservedBytes); + } + to->freeNonContiguous(out); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + }; + + // Test transfer between siblings of the same root pool. + testTransferAllocate(false); + testTransferAllocateZeroFilled(false); + for (auto pageCount : pageCounts) { + testTransferAllocateContiguous(pageCount, false); + testTransferAllocateNonContiguous(pageCount, false); + } + + // Test transfer between different root pools. + testTransferAllocate(true); + testTransferAllocateZeroFilled(true); + for (auto pageCount : pageCounts) { + testTransferAllocateContiguous(pageCount, true); + testTransferAllocateNonContiguous(pageCount, true); + } +} + BOLT_INSTANTIATE_TEST_SUITE_P( MemoryPoolTestSuite, MemoryPoolTest, diff --git a/bolt/vector/BaseVector.cpp b/bolt/vector/BaseVector.cpp index b422e095d..8cbd65c45 100644 --- a/bolt/vector/BaseVector.cpp +++ b/bolt/vector/BaseVector.cpp @@ -756,6 +756,18 @@ void BaseVector::copy( copyRanges(source, ranges); } +void BaseVector::transferOrCopyTo(bolt::memory::MemoryPool* pool) { + if (pool == pool_) { + return; + } + + if (nulls_ && !nulls_->transferTo(pool)) { + nulls_ = AlignedBuffer::copy(nulls_, pool); + rawNulls_ = nulls_->as(); + } + pool_ = pool; +} + namespace { template diff --git a/bolt/vector/BaseVector.h b/bolt/vector/BaseVector.h index ef707bf8d..72cdcef9e 100644 --- a/bolt/vector/BaseVector.h +++ b/bolt/vector/BaseVector.h @@ -501,9 +501,11 @@ class BaseVector { bool canCopyAll); // Utility for making a deep copy of a whole vector. - static VectorPtr copy(const BaseVector& vector) { - auto result = - BaseVector::create(vector.type(), vector.size(), vector.pool()); + static VectorPtr copy( + const BaseVector& vector, + bolt::memory::MemoryPool* pool = nullptr) { + auto result = BaseVector::create( + vector.type(), vector.size(), pool ? pool : vector.pool()); result->copy(&vector, 0, 0, vector.size()); return result; } @@ -533,6 +535,8 @@ class BaseVector { BOLT_UNSUPPORTED("Can only copy into flat or complex vectors"); } + virtual void transferOrCopyTo(bolt::memory::MemoryPool* pool); + // Construct a zero-copy slice of the vector with the indicated offset and // length. virtual VectorPtr slice(vector_size_t offset, vector_size_t length) const = 0; diff --git a/bolt/vector/BiasVector.h b/bolt/vector/BiasVector.h index bf8de9694..f978f7aea 100644 --- a/bolt/vector/BiasVector.h +++ b/bolt/vector/BiasVector.h @@ -181,6 +181,10 @@ class BiasVector : public SimpleVector { BOLT_NYI(); } + void transferOrCopyTo(bolt::memory::MemoryPool* /*pool*/) override { + BOLT_UNSUPPORTED("transferTo not defined for BiasVector"); + } + private: template inline xsimd::batch loadSIMDInternal(size_t byteOffset) const { diff --git a/bolt/vector/ComplexVector.cpp b/bolt/vector/ComplexVector.cpp index 4dcab3e9e..8e86c8d32 100644 --- a/bolt/vector/ComplexVector.cpp +++ b/bolt/vector/ComplexVector.cpp @@ -451,6 +451,13 @@ bool RowVector::isWritable() const { return isNullsWritable(); } +void RowVector::transferOrCopyTo(bolt::memory::MemoryPool* pool) { + BaseVector::transferOrCopyTo(pool); + for (auto& child : children_) { + child->transferOrCopyTo(pool); + } +} + uint64_t RowVector::estimateFlatSize() const { uint64_t total = BaseVector::retainedSize(); for (const auto& child : children_) { @@ -726,6 +733,18 @@ void ArrayVectorBase::checkRanges() const { } } +void ArrayVectorBase::transferOrCopyTo(bolt::memory::MemoryPool* pool) { + BaseVector::transferOrCopyTo(pool); + if (!offsets_->transferTo(pool)) { + offsets_ = AlignedBuffer::copy(offsets_, pool); + rawOffsets_ = offsets_->as(); + } + if (!sizes_->transferTo(pool)) { + sizes_ = AlignedBuffer::copy(sizes_, pool); + rawSizes_ = sizes_->as(); + } +} + void ArrayVectorBase::validateArrayVectorBase( const VectorValidateOptions& options, vector_size_t minChildVectorSize) const { @@ -996,6 +1015,11 @@ bool ArrayVector::isWritable() const { return isNullsWritable() && BaseVector::isVectorWritable(elements_); } +void ArrayVector::transferOrCopyTo(bolt::memory::MemoryPool* pool) { + ArrayVectorBase::transferOrCopyTo(pool); + elements_->transferOrCopyTo(pool); +} + uint64_t ArrayVector::estimateFlatSize() const { return BaseVector::retainedSize() + offsets_->capacity() + sizes_->capacity() + elements_->estimateFlatSize(); @@ -1306,6 +1330,12 @@ bool MapVector::isWritable() const { BaseVector::isVectorWritable(values_); } +void MapVector::transferOrCopyTo(bolt::memory::MemoryPool* pool) { + ArrayVectorBase::transferOrCopyTo(pool); + keys_->transferOrCopyTo(pool); + values_->transferOrCopyTo(pool); +} + uint64_t MapVector::estimateFlatSize() const { return BaseVector::retainedSize() + offsets_->capacity() + sizes_->capacity() + keys_->estimateFlatSize() + diff --git a/bolt/vector/ComplexVector.h b/bolt/vector/ComplexVector.h index 14b659641..bf2db594c 100644 --- a/bolt/vector/ComplexVector.h +++ b/bolt/vector/ComplexVector.h @@ -184,6 +184,8 @@ class RowVector : public BaseVector { const BaseVector* source, const folly::Range& ranges) override; + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override; + uint64_t retainedSize() const override { auto size = BaseVector::retainedSize(); for (auto& child : children_) { @@ -613,6 +615,8 @@ struct ArrayVectorBase : BaseVector { sizes_->asMutable()[i] = size; } + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override; + /// Verify that an ArrayVector/MapVector does not contain overlapping [offset, /// size] ranges. Throws in case overlaps are found. void checkRanges() const; @@ -728,6 +732,8 @@ class ArrayVector : public ArrayVectorBase { const BaseVector* source, const folly::Range& ranges) override; + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override; + uint64_t retainedSize() const override { return BaseVector::retainedSize() + offsets_->capacity() + sizes_->capacity() + elements_->retainedSize(); @@ -867,6 +873,8 @@ class MapVector : public ArrayVectorBase { const BaseVector* source, const folly::Range& ranges) override; + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override; + uint64_t retainedSize() const override { return BaseVector::retainedSize() + offsets_->capacity() + sizes_->capacity() + keys_->retainedSize() + values_->retainedSize(); diff --git a/bolt/vector/ConstantVector.h b/bolt/vector/ConstantVector.h index 51a033192..28fa8ddfb 100644 --- a/bolt/vector/ConstantVector.h +++ b/bolt/vector/ConstantVector.h @@ -397,6 +397,16 @@ class ConstantVector final : public SimpleVector { } } + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override { + BaseVector::transferOrCopyTo(pool); + if (valueVector_) { + valueVector_->transferOrCopyTo(pool); + } + if (stringBuffer_ && !stringBuffer_->transferTo(pool)) { + stringBuffer_ = AlignedBuffer::copy(stringBuffer_, pool); + } + } + protected: std::string toSummaryString() const override { std::stringstream out; diff --git a/bolt/vector/DictionaryVector.h b/bolt/vector/DictionaryVector.h index b670206dd..12f0b446a 100644 --- a/bolt/vector/DictionaryVector.h +++ b/bolt/vector/DictionaryVector.h @@ -253,6 +253,15 @@ class DictionaryVector : public SimpleVector { void validate(const VectorValidateOptions& options) const override; + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override { + BaseVector::transferOrCopyTo(pool); + dictionaryValues_->transferOrCopyTo(pool); + if (!indices_->transferTo(pool)) { + indices_ = AlignedBuffer::copy(indices_, pool); + rawIndices_ = indices_->as(); + } + } + private: // return the dictionary index for the specified vector index. inline vector_size_t getDictionaryIndex(vector_size_t idx) const { diff --git a/bolt/vector/FlatVector.h b/bolt/vector/FlatVector.h index 7ddd592d2..f56cbe2f2 100644 --- a/bolt/vector/FlatVector.h +++ b/bolt/vector/FlatVector.h @@ -322,6 +322,24 @@ class FlatVector final : public SimpleVector { const BaseVector* source, const folly::Range& ranges) override; + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override { + BaseVector::transferOrCopyTo(pool); + if (values_ && !values_->transferTo(pool)) { + values_ = AlignedBuffer::copy(values_, pool); + rawValues_ = const_cast(values_->as()); + } + for (auto& buffer : stringBuffers_) { + if (!buffer->transferTo(pool)) { + BOLT_CHECK_NE( + stringBufferSet_.erase(buffer.get()), + 0, + "Easure of existing string buffer should always succeed."); + buffer = AlignedBuffer::copy(buffer, pool); + BOLT_CHECK(stringBufferSet_.insert(buffer.get()).second); + } + } + } + void resize(vector_size_t newSize, bool setNotNull = true) override; VectorPtr slice(vector_size_t offset, vector_size_t length) const override; diff --git a/bolt/vector/FunctionVector.h b/bolt/vector/FunctionVector.h index 38b5e84db..b01269376 100644 --- a/bolt/vector/FunctionVector.h +++ b/bolt/vector/FunctionVector.h @@ -207,6 +207,10 @@ class FunctionVector : public BaseVector { BOLT_NYI(); } + void transferOrCopyTo(bolt::memory::MemoryPool* /*pool*/) override { + BOLT_UNSUPPORTED("transferTo not defined for FunctionVector"); + } + private: std::vector> functions_; std::vector rowSets_; diff --git a/bolt/vector/LazyVector.h b/bolt/vector/LazyVector.h index 6a49ce066..f644527cc 100644 --- a/bolt/vector/LazyVector.h +++ b/bolt/vector/LazyVector.h @@ -282,6 +282,13 @@ class LazyVector : public BaseVector { void validate(const VectorValidateOptions& options) const override; + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override { + BaseVector::transferOrCopyTo(pool); + if (vector_) { + vector_->transferOrCopyTo(pool); + } + } + private: static void ensureLoadedRowsImpl( const VectorPtr& vector, diff --git a/bolt/vector/SequenceVector.h b/bolt/vector/SequenceVector.h index 8fd3515b2..705479f01 100644 --- a/bolt/vector/SequenceVector.h +++ b/bolt/vector/SequenceVector.h @@ -213,6 +213,10 @@ class SequenceVector : public SimpleVector { return false; } + void transferOrCopyTo(bolt::memory::MemoryPool* /*pool*/) override { + BOLT_UNSUPPORTED("transferTo not defined for SequenceVector"); + } + private: // Prepares for use after construction. void setInternalState(); diff --git a/bolt/vector/fuzzer/VectorFuzzer.cpp b/bolt/vector/fuzzer/VectorFuzzer.cpp index 15e0e69a1..18e39d247 100644 --- a/bolt/vector/fuzzer/VectorFuzzer.cpp +++ b/bolt/vector/fuzzer/VectorFuzzer.cpp @@ -140,6 +140,17 @@ T rand(FuzzerGenerator& rng) { } } +template < + typename T, + typename std::enable_if_t, int> = 0> +inline T rand(FuzzerGenerator& rng, T min, T max) { + if constexpr (std::is_integral_v) { + return boost::random::uniform_int_distribution(min, max)(rng); + } else { + return boost::random::uniform_real_distribution(min, max)(rng); + } +} + // Generate special values for the different supported types. // Special values include NaN, MIN, MAX, 9, 99, 999, etc. template @@ -1431,6 +1442,10 @@ RowTypePtr VectorFuzzer::randRowType( return bolt::randRowType(rng_, scalarTypes, maxDepth); } +size_t VectorFuzzer::randInRange(size_t min, size_t max) { + return rand(rng_, min, max); +} + VectorPtr VectorFuzzer::wrapInLazyVector(VectorPtr baseVector) { if (hasNestedDictionaryLayers(baseVector)) { auto indices = baseVector->wrapInfo(); diff --git a/bolt/vector/fuzzer/VectorFuzzer.h b/bolt/vector/fuzzer/VectorFuzzer.h index 7031934fc..b95d6ebb1 100644 --- a/bolt/vector/fuzzer/VectorFuzzer.h +++ b/bolt/vector/fuzzer/VectorFuzzer.h @@ -311,6 +311,9 @@ class VectorFuzzer { const std::vector& scalarTypes, int maxDepth = 5); + /// Returns a random integer between min and max inclusive + size_t randInRange(size_t min, size_t max); + // Generates short decimal TypePtr with random precision and scale. TypePtr randShortDecimalType(); diff --git a/bolt/vector/tests/VectorTest.cpp b/bolt/vector/tests/VectorTest.cpp index c5a44bfc6..037421066 100644 --- a/bolt/vector/tests/VectorTest.cpp +++ b/bolt/vector/tests/VectorTest.cpp @@ -3717,5 +3717,167 @@ TEST_F(VectorTest, sliceUnknownVector) { EXPECT_NE(v->slice(0, 512), nullptr); } +TEST_F(VectorTest, transferOrCopyTo) { + auto rootPool = memory::memoryManager()->addRootPool("long-living"); + auto pool = rootPool->addLeafChild("long-living leaf"); + + VectorPtr vector; + VectorPtr expected; + + // Test primitive type. + { + auto localRootPool = memory::memoryManager()->addRootPool("short-living"); + auto localPool = localRootPool->addLeafChild("short-living leaf"); + test::VectorMaker maker{localPool.get()}; + vector = maker.flatVector( + 3, [](auto row) { return row; }, [](auto row) { return row == 2; }); + expected = BaseVector::copy(*vector, pool.get()); + vector->transferOrCopyTo(pool.get()); + } + ASSERT_EQ(vector->pool(), pool.get()); + test::assertEqualVectors(expected, vector); + + // Test complex type. + { + auto localRootPool = memory::memoryManager()->addRootPool("short-living"); + auto localPool = localRootPool->addLeafChild("short-living leaf"); + test::VectorMaker maker{localPool.get()}; + vector = maker.rowVector( + {maker.flatVector( + 3, + [](auto row) { return row; }, + [](auto row) { return row == 2; }), + maker.constantVector({true, true, true}), + maker.dictionaryVector({1, std::nullopt, 1}), + maker.arrayVector( + 3, + [](auto row) { return row; }, + [](auto row) { return row; }, + [](auto row) { return row == 0; }, + [](auto row) { return row == 1; }), + maker.mapVector( + 3, + [](auto row) { return row; }, + [](auto row) { return row; }, + [](auto row) { return row + 1; }, + [](auto row) { return row == 1; }, + [](auto row) { return row == 2; })}); + expected = BaseVector::copy(*vector, pool.get()); + vector->transferOrCopyTo(pool.get()); + } + ASSERT_EQ(vector->pool(), pool.get()); + test::assertEqualVectors(expected, vector); + + // Test with fuzzing. + // TODO: FlatMapVector doesn't support copy() yet. Add it later when it + // supports. + VectorFuzzer::Options options{ + .nullRatio = 0.2, .stringVariableLength = true, .allowLazyVector = true}; + const int kNumIterations = 500; + for (auto i = 0; i < kNumIterations; ++i) { + { + auto localRootPool = memory::memoryManager()->addRootPool("short-living"); + auto localPool = localRootPool->addLeafChild("short-living leaf"); + + VectorFuzzer fuzzer{options, localPool.get(), 123}; + auto type = fuzzer.randType(); + vector = fuzzer.fuzz(type); + expected = BaseVector::copy(*vector, pool.get()); + vector->transferOrCopyTo(pool.get()); + } + ASSERT_EQ(vector->pool(), pool.get()); + test::assertEqualVectors(expected, vector); + } + + // Test complex-typed vectors with buffers from different pools. + VectorFuzzer fuzzer{options, pool.get(), 123}; + for (auto i = 0; i < kNumIterations; ++i) { + { + auto localRootPool = memory::memoryManager()->addRootPool("short-living"); + auto localPool = localRootPool->addLeafChild("short-living leaf"); + VectorFuzzer localFuzzer{options, localPool.get(), 123}; + + auto type = fuzzer.randType(); + auto elements = localFuzzer.fuzz(type); + auto arrays = fuzzer.fuzzArray(elements, 70); + fuzzer.setOptions({}); + auto keys = fuzzer.fuzz(BIGINT()); + fuzzer.setOptions(options); + auto maps = localFuzzer.fuzzMap(keys, arrays, 50); + vector = localFuzzer.fuzzRow({maps}, 50); + + expected = BaseVector::copy(*vector, pool.get()); + vector->transferOrCopyTo(pool.get()); + } + ASSERT_EQ(vector->pool(), pool.get()); + test::assertEqualVectors(expected, vector); + } + + // Test memory pool with different allocator. + { + memory::MemoryManager anotherManager; + auto anotherRootPool = anotherManager.addRootPool("another root pool"); + auto anotherPool = anotherRootPool->addLeafChild("another leaf pool"); + VectorFuzzer localFuzzer{options, anotherPool.get(), 789}; + + auto type = fuzzer.randType(); + vector = fuzzer.fuzz(type); + expected = BaseVector::copy(*vector, pool.get()); + vector->transferOrCopyTo(pool.get()); + } + ASSERT_EQ(vector->pool(), pool.get()); + test::assertEqualVectors(expected, vector); + + // Test opaque vector. + { + auto localRootPool = memory::memoryManager()->addRootPool("short-living"); + auto localPool = localRootPool->addLeafChild("short-living leaf"); + + auto type = OPAQUE(); + auto size = 100; + vector = BaseVector::create(type, size, localPool.get()); + auto opaqueObj = std::make_shared(); + for (auto i = 0; i < size; ++i) { + vector->as>>()->set(i, opaqueObj); + } + expected = BaseVector::copy(*vector, pool.get()); + vector->transferOrCopyTo(pool.get()); + } + ASSERT_EQ(vector->pool(), pool.get()); + test::assertEqualVectors(expected, vector); + + auto testMemoryStats = [&options](size_t seed) { + auto localRoot1 = memory::memoryManager()->addRootPool("local root 1"); + auto localLeaf1 = localRoot1->addLeafChild("local leaf 1"); + auto localRoot2 = memory::memoryManager()->addRootPool("local root 2"); + auto localLeaf2 = localRoot2->addLeafChild("local leaf 2"); + + EXPECT_EQ(localLeaf1->currentBytes(), 0); + EXPECT_EQ(localLeaf1->peakBytes(), 0); + EXPECT_EQ(localLeaf2->currentBytes(), 0); + EXPECT_EQ(localLeaf2->peakBytes(), 0); + + VectorFuzzer fuzzer{options, localLeaf1.get(), seed}; + auto size = fuzzer.randInRange(0, 10000); + auto type = fuzzer.randType(); + auto vector = fuzzer.fuzz(type, size); + auto usedBytes = localLeaf1->currentBytes(); + auto peakBytes = localLeaf1->peakBytes(); + + vector->transferOrCopyTo(localLeaf2.get()); + EXPECT_LE(localLeaf2->currentBytes(), usedBytes); + EXPECT_LE(localLeaf2->peakBytes(), peakBytes); + if (localLeaf2->currentBytes() == 0) { + auto tmp = bytedance::bolt::isLazyNotLoaded(*vector) || + vector->isConstantEncoding(); + EXPECT_TRUE(tmp); + } + vector = nullptr; + }; + + for (auto i = 0; i < kNumIterations; ++i) { + testMemoryStats(kNumIterations * i + i); + } +} } // namespace } // namespace bytedance::bolt From af86a09fbd90868836bf0f64764e0fe78db0281d Mon Sep 17 00:00:00 2001 From: "xuweixin.rex" Date: Wed, 8 Apr 2026 20:13:45 +0800 Subject: [PATCH 2/3] [fix][cp] Fix StringView remapping in transferOrCopyTo() when string buffers are copied Summary: FlatVector::transferOrCopyTo() and ConstantVector::transferOrCopyTo() may copy the string buffers to new ones. However, there was a bug that they didn't udpate the StringViews in the vectors afterwards. StringViews must be updated if a string buffer is replaced by its copy, because StringViews contain pointers to addresses inside the string buffers. This diff updates StringViews in FlatVector::transferOrCopyTo() and ConstantVector::transferOrCopyTo() when a copy of a string buffer occurs. It also updated the unit test to cover these situations. In addition, this diff adds documentation of the BaseVector::transferOrCopyTo() API. It also removes FlatMapVector::transferOrCopyTo() since it's not covered in the unit test yet. from https://github.com/facebookincubator/velox/pull/15489 --- bolt/buffer/Buffer.h | 4 ++ bolt/vector/BaseVector.h | 9 +++- bolt/vector/ConstantVector.h | 10 ++++- bolt/vector/FlatVector-inl.h | 83 ++++++++++++++++++++++++++++++++++++ bolt/vector/FlatVector.h | 23 +++------- bolt/vector/SequenceVector.h | 2 +- 6 files changed, 109 insertions(+), 22 deletions(-) diff --git a/bolt/buffer/Buffer.h b/bolt/buffer/Buffer.h index 1c7102b8b..ff8efd272 100644 --- a/bolt/buffer/Buffer.h +++ b/bolt/buffer/Buffer.h @@ -212,6 +212,10 @@ class Buffer { sizeof(T), is_pod_like_v, buffer, offset, length); } + /// Transfers this buffer to 'pool'. Returns true if the transfer succeeds, or + /// false if the transfer fails. A buffer can be transferred to 'pool' if its + /// original pool and 'pool' are from the same MemoryAllocator and the buffer + /// is not a BufferView. virtual bool transferTo(bolt::memory::MemoryPool* /*pool*/) { BOLT_NYI("{} unsupported", __FUNCTION__); } diff --git a/bolt/vector/BaseVector.h b/bolt/vector/BaseVector.h index 72cdcef9e..b3d32463e 100644 --- a/bolt/vector/BaseVector.h +++ b/bolt/vector/BaseVector.h @@ -535,10 +535,15 @@ class BaseVector { BOLT_UNSUPPORTED("Can only copy into flat or complex vectors"); } + /// Transfer or copy this vector and all its buffers recursively to 'pool'. + /// The transfer of a buffer is allowed if its original pool and 'pool' are + /// from the same MemoryAllocator and the buffer is not a BufferView. If a + /// buffer is not allowed to be transferred, it is copied to pool. After this + /// call, this vector and all its buffers are owned by 'pool'. virtual void transferOrCopyTo(bolt::memory::MemoryPool* pool); - // Construct a zero-copy slice of the vector with the indicated offset and - // length. + /// Construct a zero-copy slice of the vector with the indicated offset and + /// length. virtual VectorPtr slice(vector_size_t offset, vector_size_t length) const = 0; // Returns a vector of the type of 'source' where 'indices' contains diff --git a/bolt/vector/ConstantVector.h b/bolt/vector/ConstantVector.h index 28fa8ddfb..567004e6e 100644 --- a/bolt/vector/ConstantVector.h +++ b/bolt/vector/ConstantVector.h @@ -402,8 +402,14 @@ class ConstantVector final : public SimpleVector { if (valueVector_) { valueVector_->transferOrCopyTo(pool); } - if (stringBuffer_ && !stringBuffer_->transferTo(pool)) { - stringBuffer_ = AlignedBuffer::copy(stringBuffer_, pool); + if constexpr (std::is_same_v) { + if (stringBuffer_ && !stringBuffer_->transferTo(pool)) { + auto newBuffer = AlignedBuffer::copy(stringBuffer_, pool); + auto offset = value_.data() - stringBuffer_->template as(); + value_ = + StringView(newBuffer->template as() + offset, value_.size()); + stringBuffer_ = std::move(newBuffer); + } } } diff --git a/bolt/vector/FlatVector-inl.h b/bolt/vector/FlatVector-inl.h index a54d725f5..b8a6f5589 100644 --- a/bolt/vector/FlatVector-inl.h +++ b/bolt/vector/FlatVector-inl.h @@ -716,5 +716,88 @@ inline void FlatVector::resizeValues( values_ = std::move(newValues); rawValues_ = values_->asMutable(); } + + +template +void FlatVector::transferAndUpdateStringBuffers( + bolt::memory::MemoryPool* /*pool*/) { + BOLT_CHECK(stringBuffers_.empty()); +} + +template <> +inline void FlatVector::transferAndUpdateStringBuffers( + bolt::memory::MemoryPool* pool) { + struct StringBufferRemapping { + const char* oldStart; + const char* newStart; + size_t size; + }; + std::vector stringBufferRemapping; + for (auto& buffer : stringBuffers_) { + if (!buffer->transferTo(pool)) { + BOLT_CHECK_NE( + stringBufferSet_.erase(buffer.get()), + 0, + "Easure of existing string buffer should always succeed."); + auto newBuffer = AlignedBuffer::copy(buffer, pool); + stringBufferRemapping.push_back( + {buffer->as(), newBuffer->as(), buffer->size()}); + buffer = std::move(newBuffer); + BOLT_CHECK(stringBufferSet_.insert(buffer.get()).second); + } + } + if (stringBufferRemapping.empty()) { + return; + } + + std::sort( + stringBufferRemapping.begin(), + stringBufferRemapping.end(), + [](const StringBufferRemapping& lhs, const StringBufferRemapping& rhs) { + return lhs.oldStart < rhs.oldStart; + }); + auto rawValues = values_->asMutable(); + for (auto i = 0; i < BaseVector::length_; ++i) { + if (BaseVector::isNullAt(i)) { + continue; + } + auto& stringView = rawValues[i]; + if (stringView.isInline()) { + continue; + } + // Find the first remapping whose oldStart is strictly greater than + // stringView.data(). The remapping before it is the candidate that + // contains stringView.data(). + auto remapping = std::upper_bound( + stringBufferRemapping.cbegin(), + stringBufferRemapping.cend(), + stringView.data(), + [](const char* lhs, const StringBufferRemapping& rhs) { + return lhs < rhs.oldStart; + }); + // There is no remapping whose oldStart is smaller than or equal to + // stringView.data(). + if (remapping == stringBufferRemapping.begin()) { + continue; + } + remapping--; + if (stringView.data() >= remapping->oldStart && + stringView.data() < remapping->oldStart + remapping->size) { + auto offset = stringView.data() - remapping->oldStart; + stringView = StringView(remapping->newStart + offset, stringView.size()); + } + } +} + +template +void FlatVector::transferOrCopyTo(bolt::memory::MemoryPool* pool) { + BaseVector::transferOrCopyTo(pool); + if (values_ && !values_->transferTo(pool)) { + values_ = AlignedBuffer::copy(values_, pool); + rawValues_ = const_cast(values_->as()); + } + transferAndUpdateStringBuffers(pool); +} + } // namespace bolt } // namespace bytedance diff --git a/bolt/vector/FlatVector.h b/bolt/vector/FlatVector.h index f56cbe2f2..d8326a9a5 100644 --- a/bolt/vector/FlatVector.h +++ b/bolt/vector/FlatVector.h @@ -322,23 +322,7 @@ class FlatVector final : public SimpleVector { const BaseVector* source, const folly::Range& ranges) override; - void transferOrCopyTo(bolt::memory::MemoryPool* pool) override { - BaseVector::transferOrCopyTo(pool); - if (values_ && !values_->transferTo(pool)) { - values_ = AlignedBuffer::copy(values_, pool); - rawValues_ = const_cast(values_->as()); - } - for (auto& buffer : stringBuffers_) { - if (!buffer->transferTo(pool)) { - BOLT_CHECK_NE( - stringBufferSet_.erase(buffer.get()), - 0, - "Easure of existing string buffer should always succeed."); - buffer = AlignedBuffer::copy(buffer, pool); - BOLT_CHECK(stringBufferSet_.insert(buffer.get()).second); - } - } - } + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override; void resize(vector_size_t newSize, bool setNotNull = true) override; @@ -640,6 +624,11 @@ class FlatVector final : public SimpleVector { } } + // Transfer or copy string buffers to 'pool'. Update StringViews in values_ to + // reference addresses in the new buffers. Non-StringView-typed FlatVector + // should not have string buffers. + void transferAndUpdateStringBuffers(bolt::memory::MemoryPool* pool); + // Contiguous values. // If strings, these are bolt::StringViews into memory held by // 'stringBuffers_' diff --git a/bolt/vector/SequenceVector.h b/bolt/vector/SequenceVector.h index 705479f01..f9f9f47b1 100644 --- a/bolt/vector/SequenceVector.h +++ b/bolt/vector/SequenceVector.h @@ -214,7 +214,7 @@ class SequenceVector : public SimpleVector { } void transferOrCopyTo(bolt::memory::MemoryPool* /*pool*/) override { - BOLT_UNSUPPORTED("transferTo not defined for SequenceVector"); + BOLT_NYI("{} unsupported", __FUNCTION__); } private: From 3dd4289889b7319362082521c0e2e9936d3a0748 Mon Sep 17 00:00:00 2001 From: "xuweixin.rex" Date: Wed, 15 Apr 2026 14:57:19 +0800 Subject: [PATCH 3/3] format code --- bolt/vector/FlatVector-inl.h | 1 - 1 file changed, 1 deletion(-) diff --git a/bolt/vector/FlatVector-inl.h b/bolt/vector/FlatVector-inl.h index b8a6f5589..7bdd6f79a 100644 --- a/bolt/vector/FlatVector-inl.h +++ b/bolt/vector/FlatVector-inl.h @@ -717,7 +717,6 @@ inline void FlatVector::resizeValues( rawValues_ = values_->asMutable(); } - template void FlatVector::transferAndUpdateStringBuffers( bolt::memory::MemoryPool* /*pool*/) {