diff --git a/bolt/buffer/Buffer.h b/bolt/buffer/Buffer.h index 64a1af529..ff8efd272 100644 --- a/bolt/buffer/Buffer.h +++ b/bolt/buffer/Buffer.h @@ -212,6 +212,14 @@ class Buffer { sizeof(T), is_pod_like_v, buffer, offset, length); } + /// Transfers this buffer to 'pool'. Returns true if the transfer succeeds, or + /// false if the transfer fails. A buffer can be transferred to 'pool' if its + /// original pool and 'pool' are from the same MemoryAllocator and the buffer + /// is not a BufferView. + virtual bool transferTo(bolt::memory::MemoryPool* /*pool*/) { + BOLT_NYI("{} unsupported", __FUNCTION__); + } + protected: // Writes a magic word at 'capacity_'. No-op for a BufferView. The actual // logic is inside a separate virtual function, allowing override by derived @@ -509,6 +517,42 @@ class AlignedBuffer : public Buffer { return newBuffer; } + template + static BufferPtr copy( + const BufferPtr& buffer, + bolt::memory::MemoryPool* pool) { + if (buffer == nullptr) { + return nullptr; + } + + // The reason we use uint8_t is because mutableNulls()->size() will return + // in byte count. We also don't bother initializing since copyFrom will be + // overwriting anyway. + BufferPtr newBuffer; + if constexpr (std::is_same_v) { + newBuffer = AlignedBuffer::allocate(buffer->size(), pool); + } else { + const auto numElements = checkedDivide(buffer->size(), sizeof(T)); + newBuffer = AlignedBuffer::allocate(numElements, pool); + } + + newBuffer->copyFrom(buffer.get(), newBuffer->size()); + + return newBuffer; + } + + bool transferTo(bolt::memory::MemoryPool* pool) override { + if (pool_ == pool) { + return true; + } + if (pool_->transferTo( + pool, this, checkedPlus(kPaddedSize, capacity_))) { + setPool(pool); + return true; + } + return false; + } + protected: AlignedBuffer(bolt::memory::MemoryPool* pool, size_t capacity) : Buffer( @@ -549,6 +593,12 @@ class AlignedBuffer : public Buffer { } } + void setPool(bolt::memory::MemoryPool* pool) { + bolt::memory::MemoryPool** poolPtr = + const_cast(&pool_); + *poolPtr = pool; + } + protected: void setEndGuardImpl() override { *reinterpret_cast(data_ + capacity_) = kEndGuard; @@ -680,6 +730,23 @@ class NonPODAlignedBuffer : public Buffer { this, checkedPlus(AlignedBuffer::kPaddedSize, capacity_)); } + bool transferTo(bolt::memory::MemoryPool* pool) override { + if (pool_ == pool) { + return true; + } + + if (pool_->transferTo( + pool, + this, + checkedPlus(AlignedBuffer::kPaddedSize, capacity_))) { + bolt::memory::MemoryPool** poolPtr = + const_cast(&pool_); + *poolPtr = pool; + return true; + } + return false; + } + // Needs to use this class from static methods of AlignedBuffer friend class AlignedBuffer; }; @@ -707,6 +774,13 @@ class BufferView : public Buffer { return true; } + bool transferTo(bolt::memory::MemoryPool* pool) override { + if (pool_ == pool) { + return true; + } + return false; + } + private: BufferView(const uint8_t* data, size_t size, Releaser releaser, bool podType) // A BufferView must be created over the data held by a cache diff --git a/bolt/common/memory/MemoryPool.cpp b/bolt/common/memory/MemoryPool.cpp index 4e3a068bf..958089dd1 100644 --- a/bolt/common/memory/MemoryPool.cpp +++ b/bolt/common/memory/MemoryPool.cpp @@ -606,7 +606,7 @@ void* MemoryPoolImpl::allocate( toString(), allocator_->getAndClearFailureMessage())); } - RECORD_ALLOC(buffer, size); + RECORD_ALLOC_SPEC(this, buffer, size); return buffer; } @@ -630,7 +630,7 @@ void* MemoryPoolImpl::allocateZeroFilled( toString(), allocator_->getAndClearFailureMessage())); } - RECORD_ALLOC(buffer, size); + RECORD_ALLOC_SPEC(this, buffer, size); return buffer; } @@ -664,7 +664,7 @@ void* MemoryPoolImpl::reallocate( if (p) { RECORD_FREE(p, size); } - RECORD_ALLOC(newP, newSize); + RECORD_ALLOC_SPEC(this, newP, newSize); } else { RECORD_GROW(p, newP, size, newSize); } diff --git a/bolt/common/memory/tests/MemoryPoolTest.cpp b/bolt/common/memory/tests/MemoryPoolTest.cpp index 9f100b5be..3fa7a31f0 100644 --- a/bolt/common/memory/tests/MemoryPoolTest.cpp +++ b/bolt/common/memory/tests/MemoryPoolTest.cpp @@ -3862,6 +3862,213 @@ TEST_P(MemoryPoolTest, allocationWithCoveredCollateral) { pool->freeContiguous(contiguousAllocation); } +TEST_P(MemoryPoolTest, transferTo) { + MemoryManager::Options options; + options.alignment = MemoryAllocator::kMinAlignment; + options.allocatorCapacity = kDefaultCapacity; + setupMemory(options); + auto manager = getMemoryManager(); + + auto largestSizeClass = manager->allocator()->largestSizeClass(); + std::vector pageCounts{ + largestSizeClass, + largestSizeClass + 1, + largestSizeClass / 10, + 1, + largestSizeClass * 2, + largestSizeClass * 3 + 1}; + + auto assertEqualBytes = [](const memory::MemoryPool* pool, + int64_t usedBytes, + int64_t peakBytes, + int64_t reservedBytes) { + EXPECT_EQ(pool->currentBytes(), usedBytes); + EXPECT_EQ(pool->peakBytes(), peakBytes); + EXPECT_EQ(pool->reservedBytes(), reservedBytes); + }; + + auto assertZeroByte = [](const memory::MemoryPool* pool) { + EXPECT_EQ(pool->currentBytes(), 0); + EXPECT_EQ(pool->reservedBytes(), 0); + }; + + auto getMemoryBytes = [](const memory::MemoryPool* pool) { + return std::make_tuple( + pool->currentBytes(), pool->peakBytes(), pool->reservedBytes()); + }; + + auto createPools = [&manager](bool betweenDifferentRoots) { + auto root1 = manager->addRootPool("root1"); + auto root2 = manager->addRootPool("root2"); + std::shared_ptr from; + std::shared_ptr to; + if (betweenDifferentRoots) { + from = root1->addLeafChild("from"); + to = root2->addLeafChild("to"); + } else { + from = root1->addLeafChild("from"); + to = root1->addLeafChild("to"); + } + return std::make_tuple(root1, root2, from, to); + }; + + auto testTransferAllocate = [&assertZeroByte, + &assertEqualBytes, + &getMemoryBytes, + &createPools](bool betweenDifferentRoots) { + auto [root1, root2, from, to] = createPools(betweenDifferentRoots); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + + const auto kSize = 1024; + int64_t usedBytes, rootUsedBytes; + int64_t peakBytes, rootPeakBytes; + int64_t reservedBytes, rootReservedBytes; + auto buffer = from->allocate(kSize); + // Transferring between non-leaf pools is not allowed. + EXPECT_FALSE(from->root()->transferTo(to.get(), buffer, kSize)); + EXPECT_FALSE(from->transferTo(to->root(), buffer, kSize)); + + std::tie(usedBytes, peakBytes, reservedBytes) = getMemoryBytes(from.get()); + std::tie(rootUsedBytes, rootPeakBytes, rootReservedBytes) = + getMemoryBytes(from->root()); + from->transferTo(to.get(), buffer, kSize); + assertEqualBytes(to.get(), usedBytes, peakBytes, reservedBytes); + if (from->root() == to->root()) { + rootPeakBytes *= 2; + } + assertEqualBytes( + to->root(), rootUsedBytes, rootPeakBytes, rootReservedBytes); + to->free(buffer, kSize); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + }; + + auto testTransferAllocateZeroFilled = + [&assertZeroByte, &assertEqualBytes, &getMemoryBytes, &createPools]( + bool betweenDifferentRoots) { + auto [root1, root2, from, to] = createPools(betweenDifferentRoots); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + + const auto kSize = 1024; + int64_t usedBytes, rootUsedBytes; + int64_t peakBytes, rootPeakBytes; + int64_t reservedBytes, rootReservedBytes; + auto buffer = from->allocateZeroFilled(8, kSize / 8); + std::tie(usedBytes, peakBytes, reservedBytes) = + getMemoryBytes(from.get()); + std::tie(rootUsedBytes, rootPeakBytes, rootReservedBytes) = + getMemoryBytes(from->root()); + from->transferTo(to.get(), buffer, kSize); + assertEqualBytes(to.get(), usedBytes, peakBytes, reservedBytes); + if (from->root() == to->root()) { + rootPeakBytes *= 2; + } + assertEqualBytes( + to->root(), rootUsedBytes, rootPeakBytes, rootReservedBytes); + to->free(buffer, kSize); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + }; + + auto testTransferAllocateContiguous = + [&assertZeroByte, &assertEqualBytes, &getMemoryBytes, &createPools]( + uint64_t pageCount, bool betweenDifferentRoots) { + auto [root1, root2, from, to] = createPools(betweenDifferentRoots); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + + int64_t usedBytes, rootUsedBytes; + int64_t peakBytes, rootPeakBytes; + int64_t reservedBytes, rootReservedBytes; + ContiguousAllocation out; + from->allocateContiguous(pageCount, out); + std::tie(usedBytes, peakBytes, reservedBytes) = + getMemoryBytes(from.get()); + std::tie(rootUsedBytes, rootPeakBytes, rootReservedBytes) = + getMemoryBytes(from->root()); + from->transferTo(to.get(), out.data(), out.size()); + assertEqualBytes(to.get(), usedBytes, peakBytes, reservedBytes); + if (from->root() == to->root()) { + rootPeakBytes *= 2; + } + assertEqualBytes( + to->root(), rootUsedBytes, rootPeakBytes, rootReservedBytes); + to->freeContiguous(out); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + }; + + auto testTransferAllocateNonContiguous = + [&assertZeroByte, &assertEqualBytes, &getMemoryBytes, &createPools]( + uint64_t pageCount, bool betweenDifferentRoots) { + auto [root1, root2, from, to] = createPools(betweenDifferentRoots); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + + int64_t usedBytes, rootUsedBytes; + int64_t peakBytes, rootPeakBytes; + int64_t reservedBytes, rootReservedBytes; + Allocation out; + from->allocateNonContiguous(pageCount, out); + std::tie(usedBytes, peakBytes, reservedBytes) = + getMemoryBytes(from.get()); + std::tie(rootUsedBytes, rootPeakBytes, rootReservedBytes) = + getMemoryBytes(from->root()); + for (auto i = 0; i < out.numRuns(); ++i) { + const auto& run = out.runAt(i); + from->transferTo(to.get(), run.data(), run.numBytes()); + } + assertEqualBytes(to.get(), usedBytes, peakBytes, reservedBytes); + if (from->root() == to->root()) { + EXPECT_EQ(to->root()->currentBytes(), rootUsedBytes); + // We reserve and release memory run-by-run, so the peak bytes would + // be no greater than twice of the original peak bytes. + EXPECT_LE(to->root()->peakBytes(), rootPeakBytes * 2); + EXPECT_EQ(to->root()->reservedBytes(), rootReservedBytes); + } else { + assertEqualBytes( + to->root(), rootUsedBytes, rootPeakBytes, rootReservedBytes); + } + to->freeNonContiguous(out); + assertZeroByte(from.get()); + assertZeroByte(to.get()); + assertZeroByte(from->root()); + assertZeroByte(to->root()); + }; + + // Test transfer between siblings of the same root pool. + testTransferAllocate(false); + testTransferAllocateZeroFilled(false); + for (auto pageCount : pageCounts) { + testTransferAllocateContiguous(pageCount, false); + testTransferAllocateNonContiguous(pageCount, false); + } + + // Test transfer between different root pools. + testTransferAllocate(true); + testTransferAllocateZeroFilled(true); + for (auto pageCount : pageCounts) { + testTransferAllocateContiguous(pageCount, true); + testTransferAllocateNonContiguous(pageCount, true); + } +} + BOLT_INSTANTIATE_TEST_SUITE_P( MemoryPoolTestSuite, MemoryPoolTest, diff --git a/bolt/vector/BaseVector.cpp b/bolt/vector/BaseVector.cpp index b422e095d..8cbd65c45 100644 --- a/bolt/vector/BaseVector.cpp +++ b/bolt/vector/BaseVector.cpp @@ -756,6 +756,18 @@ void BaseVector::copy( copyRanges(source, ranges); } +void BaseVector::transferOrCopyTo(bolt::memory::MemoryPool* pool) { + if (pool == pool_) { + return; + } + + if (nulls_ && !nulls_->transferTo(pool)) { + nulls_ = AlignedBuffer::copy(nulls_, pool); + rawNulls_ = nulls_->as(); + } + pool_ = pool; +} + namespace { template diff --git a/bolt/vector/BaseVector.h b/bolt/vector/BaseVector.h index ef707bf8d..b3d32463e 100644 --- a/bolt/vector/BaseVector.h +++ b/bolt/vector/BaseVector.h @@ -501,9 +501,11 @@ class BaseVector { bool canCopyAll); // Utility for making a deep copy of a whole vector. - static VectorPtr copy(const BaseVector& vector) { - auto result = - BaseVector::create(vector.type(), vector.size(), vector.pool()); + static VectorPtr copy( + const BaseVector& vector, + bolt::memory::MemoryPool* pool = nullptr) { + auto result = BaseVector::create( + vector.type(), vector.size(), pool ? pool : vector.pool()); result->copy(&vector, 0, 0, vector.size()); return result; } @@ -533,8 +535,15 @@ class BaseVector { BOLT_UNSUPPORTED("Can only copy into flat or complex vectors"); } - // Construct a zero-copy slice of the vector with the indicated offset and - // length. + /// Transfer or copy this vector and all its buffers recursively to 'pool'. + /// The transfer of a buffer is allowed if its original pool and 'pool' are + /// from the same MemoryAllocator and the buffer is not a BufferView. If a + /// buffer is not allowed to be transferred, it is copied to pool. After this + /// call, this vector and all its buffers are owned by 'pool'. + virtual void transferOrCopyTo(bolt::memory::MemoryPool* pool); + + /// Construct a zero-copy slice of the vector with the indicated offset and + /// length. virtual VectorPtr slice(vector_size_t offset, vector_size_t length) const = 0; // Returns a vector of the type of 'source' where 'indices' contains diff --git a/bolt/vector/BiasVector.h b/bolt/vector/BiasVector.h index bf8de9694..f978f7aea 100644 --- a/bolt/vector/BiasVector.h +++ b/bolt/vector/BiasVector.h @@ -181,6 +181,10 @@ class BiasVector : public SimpleVector { BOLT_NYI(); } + void transferOrCopyTo(bolt::memory::MemoryPool* /*pool*/) override { + BOLT_UNSUPPORTED("transferTo not defined for BiasVector"); + } + private: template inline xsimd::batch loadSIMDInternal(size_t byteOffset) const { diff --git a/bolt/vector/ComplexVector.cpp b/bolt/vector/ComplexVector.cpp index 4dcab3e9e..8e86c8d32 100644 --- a/bolt/vector/ComplexVector.cpp +++ b/bolt/vector/ComplexVector.cpp @@ -451,6 +451,13 @@ bool RowVector::isWritable() const { return isNullsWritable(); } +void RowVector::transferOrCopyTo(bolt::memory::MemoryPool* pool) { + BaseVector::transferOrCopyTo(pool); + for (auto& child : children_) { + child->transferOrCopyTo(pool); + } +} + uint64_t RowVector::estimateFlatSize() const { uint64_t total = BaseVector::retainedSize(); for (const auto& child : children_) { @@ -726,6 +733,18 @@ void ArrayVectorBase::checkRanges() const { } } +void ArrayVectorBase::transferOrCopyTo(bolt::memory::MemoryPool* pool) { + BaseVector::transferOrCopyTo(pool); + if (!offsets_->transferTo(pool)) { + offsets_ = AlignedBuffer::copy(offsets_, pool); + rawOffsets_ = offsets_->as(); + } + if (!sizes_->transferTo(pool)) { + sizes_ = AlignedBuffer::copy(sizes_, pool); + rawSizes_ = sizes_->as(); + } +} + void ArrayVectorBase::validateArrayVectorBase( const VectorValidateOptions& options, vector_size_t minChildVectorSize) const { @@ -996,6 +1015,11 @@ bool ArrayVector::isWritable() const { return isNullsWritable() && BaseVector::isVectorWritable(elements_); } +void ArrayVector::transferOrCopyTo(bolt::memory::MemoryPool* pool) { + ArrayVectorBase::transferOrCopyTo(pool); + elements_->transferOrCopyTo(pool); +} + uint64_t ArrayVector::estimateFlatSize() const { return BaseVector::retainedSize() + offsets_->capacity() + sizes_->capacity() + elements_->estimateFlatSize(); @@ -1306,6 +1330,12 @@ bool MapVector::isWritable() const { BaseVector::isVectorWritable(values_); } +void MapVector::transferOrCopyTo(bolt::memory::MemoryPool* pool) { + ArrayVectorBase::transferOrCopyTo(pool); + keys_->transferOrCopyTo(pool); + values_->transferOrCopyTo(pool); +} + uint64_t MapVector::estimateFlatSize() const { return BaseVector::retainedSize() + offsets_->capacity() + sizes_->capacity() + keys_->estimateFlatSize() + diff --git a/bolt/vector/ComplexVector.h b/bolt/vector/ComplexVector.h index 14b659641..bf2db594c 100644 --- a/bolt/vector/ComplexVector.h +++ b/bolt/vector/ComplexVector.h @@ -184,6 +184,8 @@ class RowVector : public BaseVector { const BaseVector* source, const folly::Range& ranges) override; + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override; + uint64_t retainedSize() const override { auto size = BaseVector::retainedSize(); for (auto& child : children_) { @@ -613,6 +615,8 @@ struct ArrayVectorBase : BaseVector { sizes_->asMutable()[i] = size; } + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override; + /// Verify that an ArrayVector/MapVector does not contain overlapping [offset, /// size] ranges. Throws in case overlaps are found. void checkRanges() const; @@ -728,6 +732,8 @@ class ArrayVector : public ArrayVectorBase { const BaseVector* source, const folly::Range& ranges) override; + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override; + uint64_t retainedSize() const override { return BaseVector::retainedSize() + offsets_->capacity() + sizes_->capacity() + elements_->retainedSize(); @@ -867,6 +873,8 @@ class MapVector : public ArrayVectorBase { const BaseVector* source, const folly::Range& ranges) override; + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override; + uint64_t retainedSize() const override { return BaseVector::retainedSize() + offsets_->capacity() + sizes_->capacity() + keys_->retainedSize() + values_->retainedSize(); diff --git a/bolt/vector/ConstantVector.h b/bolt/vector/ConstantVector.h index 51a033192..567004e6e 100644 --- a/bolt/vector/ConstantVector.h +++ b/bolt/vector/ConstantVector.h @@ -397,6 +397,22 @@ class ConstantVector final : public SimpleVector { } } + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override { + BaseVector::transferOrCopyTo(pool); + if (valueVector_) { + valueVector_->transferOrCopyTo(pool); + } + if constexpr (std::is_same_v) { + if (stringBuffer_ && !stringBuffer_->transferTo(pool)) { + auto newBuffer = AlignedBuffer::copy(stringBuffer_, pool); + auto offset = value_.data() - stringBuffer_->template as(); + value_ = + StringView(newBuffer->template as() + offset, value_.size()); + stringBuffer_ = std::move(newBuffer); + } + } + } + protected: std::string toSummaryString() const override { std::stringstream out; diff --git a/bolt/vector/DictionaryVector.h b/bolt/vector/DictionaryVector.h index b670206dd..12f0b446a 100644 --- a/bolt/vector/DictionaryVector.h +++ b/bolt/vector/DictionaryVector.h @@ -253,6 +253,15 @@ class DictionaryVector : public SimpleVector { void validate(const VectorValidateOptions& options) const override; + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override { + BaseVector::transferOrCopyTo(pool); + dictionaryValues_->transferOrCopyTo(pool); + if (!indices_->transferTo(pool)) { + indices_ = AlignedBuffer::copy(indices_, pool); + rawIndices_ = indices_->as(); + } + } + private: // return the dictionary index for the specified vector index. inline vector_size_t getDictionaryIndex(vector_size_t idx) const { diff --git a/bolt/vector/FlatVector-inl.h b/bolt/vector/FlatVector-inl.h index a54d725f5..7bdd6f79a 100644 --- a/bolt/vector/FlatVector-inl.h +++ b/bolt/vector/FlatVector-inl.h @@ -716,5 +716,87 @@ inline void FlatVector::resizeValues( values_ = std::move(newValues); rawValues_ = values_->asMutable(); } + +template +void FlatVector::transferAndUpdateStringBuffers( + bolt::memory::MemoryPool* /*pool*/) { + BOLT_CHECK(stringBuffers_.empty()); +} + +template <> +inline void FlatVector::transferAndUpdateStringBuffers( + bolt::memory::MemoryPool* pool) { + struct StringBufferRemapping { + const char* oldStart; + const char* newStart; + size_t size; + }; + std::vector stringBufferRemapping; + for (auto& buffer : stringBuffers_) { + if (!buffer->transferTo(pool)) { + BOLT_CHECK_NE( + stringBufferSet_.erase(buffer.get()), + 0, + "Easure of existing string buffer should always succeed."); + auto newBuffer = AlignedBuffer::copy(buffer, pool); + stringBufferRemapping.push_back( + {buffer->as(), newBuffer->as(), buffer->size()}); + buffer = std::move(newBuffer); + BOLT_CHECK(stringBufferSet_.insert(buffer.get()).second); + } + } + if (stringBufferRemapping.empty()) { + return; + } + + std::sort( + stringBufferRemapping.begin(), + stringBufferRemapping.end(), + [](const StringBufferRemapping& lhs, const StringBufferRemapping& rhs) { + return lhs.oldStart < rhs.oldStart; + }); + auto rawValues = values_->asMutable(); + for (auto i = 0; i < BaseVector::length_; ++i) { + if (BaseVector::isNullAt(i)) { + continue; + } + auto& stringView = rawValues[i]; + if (stringView.isInline()) { + continue; + } + // Find the first remapping whose oldStart is strictly greater than + // stringView.data(). The remapping before it is the candidate that + // contains stringView.data(). + auto remapping = std::upper_bound( + stringBufferRemapping.cbegin(), + stringBufferRemapping.cend(), + stringView.data(), + [](const char* lhs, const StringBufferRemapping& rhs) { + return lhs < rhs.oldStart; + }); + // There is no remapping whose oldStart is smaller than or equal to + // stringView.data(). + if (remapping == stringBufferRemapping.begin()) { + continue; + } + remapping--; + if (stringView.data() >= remapping->oldStart && + stringView.data() < remapping->oldStart + remapping->size) { + auto offset = stringView.data() - remapping->oldStart; + stringView = StringView(remapping->newStart + offset, stringView.size()); + } + } +} + +template +void FlatVector::transferOrCopyTo(bolt::memory::MemoryPool* pool) { + BaseVector::transferOrCopyTo(pool); + if (values_ && !values_->transferTo(pool)) { + values_ = AlignedBuffer::copy(values_, pool); + rawValues_ = const_cast(values_->as()); + } + transferAndUpdateStringBuffers(pool); +} + } // namespace bolt } // namespace bytedance diff --git a/bolt/vector/FlatVector.h b/bolt/vector/FlatVector.h index 7ddd592d2..d8326a9a5 100644 --- a/bolt/vector/FlatVector.h +++ b/bolt/vector/FlatVector.h @@ -322,6 +322,8 @@ class FlatVector final : public SimpleVector { const BaseVector* source, const folly::Range& ranges) override; + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override; + void resize(vector_size_t newSize, bool setNotNull = true) override; VectorPtr slice(vector_size_t offset, vector_size_t length) const override; @@ -622,6 +624,11 @@ class FlatVector final : public SimpleVector { } } + // Transfer or copy string buffers to 'pool'. Update StringViews in values_ to + // reference addresses in the new buffers. Non-StringView-typed FlatVector + // should not have string buffers. + void transferAndUpdateStringBuffers(bolt::memory::MemoryPool* pool); + // Contiguous values. // If strings, these are bolt::StringViews into memory held by // 'stringBuffers_' diff --git a/bolt/vector/FunctionVector.h b/bolt/vector/FunctionVector.h index 38b5e84db..b01269376 100644 --- a/bolt/vector/FunctionVector.h +++ b/bolt/vector/FunctionVector.h @@ -207,6 +207,10 @@ class FunctionVector : public BaseVector { BOLT_NYI(); } + void transferOrCopyTo(bolt::memory::MemoryPool* /*pool*/) override { + BOLT_UNSUPPORTED("transferTo not defined for FunctionVector"); + } + private: std::vector> functions_; std::vector rowSets_; diff --git a/bolt/vector/LazyVector.h b/bolt/vector/LazyVector.h index 6a49ce066..f644527cc 100644 --- a/bolt/vector/LazyVector.h +++ b/bolt/vector/LazyVector.h @@ -282,6 +282,13 @@ class LazyVector : public BaseVector { void validate(const VectorValidateOptions& options) const override; + void transferOrCopyTo(bolt::memory::MemoryPool* pool) override { + BaseVector::transferOrCopyTo(pool); + if (vector_) { + vector_->transferOrCopyTo(pool); + } + } + private: static void ensureLoadedRowsImpl( const VectorPtr& vector, diff --git a/bolt/vector/SequenceVector.h b/bolt/vector/SequenceVector.h index 8fd3515b2..f9f9f47b1 100644 --- a/bolt/vector/SequenceVector.h +++ b/bolt/vector/SequenceVector.h @@ -213,6 +213,10 @@ class SequenceVector : public SimpleVector { return false; } + void transferOrCopyTo(bolt::memory::MemoryPool* /*pool*/) override { + BOLT_NYI("{} unsupported", __FUNCTION__); + } + private: // Prepares for use after construction. void setInternalState(); diff --git a/bolt/vector/fuzzer/VectorFuzzer.cpp b/bolt/vector/fuzzer/VectorFuzzer.cpp index 15e0e69a1..18e39d247 100644 --- a/bolt/vector/fuzzer/VectorFuzzer.cpp +++ b/bolt/vector/fuzzer/VectorFuzzer.cpp @@ -140,6 +140,17 @@ T rand(FuzzerGenerator& rng) { } } +template < + typename T, + typename std::enable_if_t, int> = 0> +inline T rand(FuzzerGenerator& rng, T min, T max) { + if constexpr (std::is_integral_v) { + return boost::random::uniform_int_distribution(min, max)(rng); + } else { + return boost::random::uniform_real_distribution(min, max)(rng); + } +} + // Generate special values for the different supported types. // Special values include NaN, MIN, MAX, 9, 99, 999, etc. template @@ -1431,6 +1442,10 @@ RowTypePtr VectorFuzzer::randRowType( return bolt::randRowType(rng_, scalarTypes, maxDepth); } +size_t VectorFuzzer::randInRange(size_t min, size_t max) { + return rand(rng_, min, max); +} + VectorPtr VectorFuzzer::wrapInLazyVector(VectorPtr baseVector) { if (hasNestedDictionaryLayers(baseVector)) { auto indices = baseVector->wrapInfo(); diff --git a/bolt/vector/fuzzer/VectorFuzzer.h b/bolt/vector/fuzzer/VectorFuzzer.h index 7031934fc..b95d6ebb1 100644 --- a/bolt/vector/fuzzer/VectorFuzzer.h +++ b/bolt/vector/fuzzer/VectorFuzzer.h @@ -311,6 +311,9 @@ class VectorFuzzer { const std::vector& scalarTypes, int maxDepth = 5); + /// Returns a random integer between min and max inclusive + size_t randInRange(size_t min, size_t max); + // Generates short decimal TypePtr with random precision and scale. TypePtr randShortDecimalType(); diff --git a/bolt/vector/tests/VectorTest.cpp b/bolt/vector/tests/VectorTest.cpp index c5a44bfc6..037421066 100644 --- a/bolt/vector/tests/VectorTest.cpp +++ b/bolt/vector/tests/VectorTest.cpp @@ -3717,5 +3717,167 @@ TEST_F(VectorTest, sliceUnknownVector) { EXPECT_NE(v->slice(0, 512), nullptr); } +TEST_F(VectorTest, transferOrCopyTo) { + auto rootPool = memory::memoryManager()->addRootPool("long-living"); + auto pool = rootPool->addLeafChild("long-living leaf"); + + VectorPtr vector; + VectorPtr expected; + + // Test primitive type. + { + auto localRootPool = memory::memoryManager()->addRootPool("short-living"); + auto localPool = localRootPool->addLeafChild("short-living leaf"); + test::VectorMaker maker{localPool.get()}; + vector = maker.flatVector( + 3, [](auto row) { return row; }, [](auto row) { return row == 2; }); + expected = BaseVector::copy(*vector, pool.get()); + vector->transferOrCopyTo(pool.get()); + } + ASSERT_EQ(vector->pool(), pool.get()); + test::assertEqualVectors(expected, vector); + + // Test complex type. + { + auto localRootPool = memory::memoryManager()->addRootPool("short-living"); + auto localPool = localRootPool->addLeafChild("short-living leaf"); + test::VectorMaker maker{localPool.get()}; + vector = maker.rowVector( + {maker.flatVector( + 3, + [](auto row) { return row; }, + [](auto row) { return row == 2; }), + maker.constantVector({true, true, true}), + maker.dictionaryVector({1, std::nullopt, 1}), + maker.arrayVector( + 3, + [](auto row) { return row; }, + [](auto row) { return row; }, + [](auto row) { return row == 0; }, + [](auto row) { return row == 1; }), + maker.mapVector( + 3, + [](auto row) { return row; }, + [](auto row) { return row; }, + [](auto row) { return row + 1; }, + [](auto row) { return row == 1; }, + [](auto row) { return row == 2; })}); + expected = BaseVector::copy(*vector, pool.get()); + vector->transferOrCopyTo(pool.get()); + } + ASSERT_EQ(vector->pool(), pool.get()); + test::assertEqualVectors(expected, vector); + + // Test with fuzzing. + // TODO: FlatMapVector doesn't support copy() yet. Add it later when it + // supports. + VectorFuzzer::Options options{ + .nullRatio = 0.2, .stringVariableLength = true, .allowLazyVector = true}; + const int kNumIterations = 500; + for (auto i = 0; i < kNumIterations; ++i) { + { + auto localRootPool = memory::memoryManager()->addRootPool("short-living"); + auto localPool = localRootPool->addLeafChild("short-living leaf"); + + VectorFuzzer fuzzer{options, localPool.get(), 123}; + auto type = fuzzer.randType(); + vector = fuzzer.fuzz(type); + expected = BaseVector::copy(*vector, pool.get()); + vector->transferOrCopyTo(pool.get()); + } + ASSERT_EQ(vector->pool(), pool.get()); + test::assertEqualVectors(expected, vector); + } + + // Test complex-typed vectors with buffers from different pools. + VectorFuzzer fuzzer{options, pool.get(), 123}; + for (auto i = 0; i < kNumIterations; ++i) { + { + auto localRootPool = memory::memoryManager()->addRootPool("short-living"); + auto localPool = localRootPool->addLeafChild("short-living leaf"); + VectorFuzzer localFuzzer{options, localPool.get(), 123}; + + auto type = fuzzer.randType(); + auto elements = localFuzzer.fuzz(type); + auto arrays = fuzzer.fuzzArray(elements, 70); + fuzzer.setOptions({}); + auto keys = fuzzer.fuzz(BIGINT()); + fuzzer.setOptions(options); + auto maps = localFuzzer.fuzzMap(keys, arrays, 50); + vector = localFuzzer.fuzzRow({maps}, 50); + + expected = BaseVector::copy(*vector, pool.get()); + vector->transferOrCopyTo(pool.get()); + } + ASSERT_EQ(vector->pool(), pool.get()); + test::assertEqualVectors(expected, vector); + } + + // Test memory pool with different allocator. + { + memory::MemoryManager anotherManager; + auto anotherRootPool = anotherManager.addRootPool("another root pool"); + auto anotherPool = anotherRootPool->addLeafChild("another leaf pool"); + VectorFuzzer localFuzzer{options, anotherPool.get(), 789}; + + auto type = fuzzer.randType(); + vector = fuzzer.fuzz(type); + expected = BaseVector::copy(*vector, pool.get()); + vector->transferOrCopyTo(pool.get()); + } + ASSERT_EQ(vector->pool(), pool.get()); + test::assertEqualVectors(expected, vector); + + // Test opaque vector. + { + auto localRootPool = memory::memoryManager()->addRootPool("short-living"); + auto localPool = localRootPool->addLeafChild("short-living leaf"); + + auto type = OPAQUE(); + auto size = 100; + vector = BaseVector::create(type, size, localPool.get()); + auto opaqueObj = std::make_shared(); + for (auto i = 0; i < size; ++i) { + vector->as>>()->set(i, opaqueObj); + } + expected = BaseVector::copy(*vector, pool.get()); + vector->transferOrCopyTo(pool.get()); + } + ASSERT_EQ(vector->pool(), pool.get()); + test::assertEqualVectors(expected, vector); + + auto testMemoryStats = [&options](size_t seed) { + auto localRoot1 = memory::memoryManager()->addRootPool("local root 1"); + auto localLeaf1 = localRoot1->addLeafChild("local leaf 1"); + auto localRoot2 = memory::memoryManager()->addRootPool("local root 2"); + auto localLeaf2 = localRoot2->addLeafChild("local leaf 2"); + + EXPECT_EQ(localLeaf1->currentBytes(), 0); + EXPECT_EQ(localLeaf1->peakBytes(), 0); + EXPECT_EQ(localLeaf2->currentBytes(), 0); + EXPECT_EQ(localLeaf2->peakBytes(), 0); + + VectorFuzzer fuzzer{options, localLeaf1.get(), seed}; + auto size = fuzzer.randInRange(0, 10000); + auto type = fuzzer.randType(); + auto vector = fuzzer.fuzz(type, size); + auto usedBytes = localLeaf1->currentBytes(); + auto peakBytes = localLeaf1->peakBytes(); + + vector->transferOrCopyTo(localLeaf2.get()); + EXPECT_LE(localLeaf2->currentBytes(), usedBytes); + EXPECT_LE(localLeaf2->peakBytes(), peakBytes); + if (localLeaf2->currentBytes() == 0) { + auto tmp = bytedance::bolt::isLazyNotLoaded(*vector) || + vector->isConstantEncoding(); + EXPECT_TRUE(tmp); + } + vector = nullptr; + }; + + for (auto i = 0; i < kNumIterations; ++i) { + testMemoryStats(kNumIterations * i + i); + } +} } // namespace } // namespace bytedance::bolt