Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions bolt/buffer/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,14 @@ class Buffer {
sizeof(T), is_pod_like_v<T>, buffer, offset, length);
}

/// Transfers this buffer to 'pool'. Returns true if the transfer succeeds, or
/// false if the transfer fails. A buffer can be transferred to 'pool' if its
/// original pool and 'pool' are from the same MemoryAllocator and the buffer
/// is not a BufferView.
virtual bool transferTo(bolt::memory::MemoryPool* /*pool*/) {
BOLT_NYI("{} unsupported", __FUNCTION__);
}

protected:
// Writes a magic word at 'capacity_'. No-op for a BufferView. The actual
// logic is inside a separate virtual function, allowing override by derived
Expand Down Expand Up @@ -509,6 +517,42 @@ class AlignedBuffer : public Buffer {
return newBuffer;
}

template <typename T>
static BufferPtr copy(
const BufferPtr& buffer,
bolt::memory::MemoryPool* pool) {
if (buffer == nullptr) {
return nullptr;
}

// The reason we use uint8_t is because mutableNulls()->size() will return
// in byte count. We also don't bother initializing since copyFrom will be
// overwriting anyway.
BufferPtr newBuffer;
if constexpr (std::is_same_v<T, bool>) {
newBuffer = AlignedBuffer::allocate<uint8_t>(buffer->size(), pool);
} else {
const auto numElements = checkedDivide(buffer->size(), sizeof(T));
newBuffer = AlignedBuffer::allocate<T>(numElements, pool);
}

newBuffer->copyFrom(buffer.get(), newBuffer->size());

return newBuffer;
}

bool transferTo(bolt::memory::MemoryPool* pool) override {
if (pool_ == pool) {
return true;
}
if (pool_->transferTo(
pool, this, checkedPlus<size_t>(kPaddedSize, capacity_))) {
setPool(pool);
return true;
}
return false;
}

protected:
AlignedBuffer(bolt::memory::MemoryPool* pool, size_t capacity)
: Buffer(
Expand Down Expand Up @@ -549,6 +593,12 @@ class AlignedBuffer : public Buffer {
}
}

void setPool(bolt::memory::MemoryPool* pool) {
bolt::memory::MemoryPool** poolPtr =
const_cast<bolt::memory::MemoryPool**>(&pool_);
*poolPtr = pool;
}

protected:
void setEndGuardImpl() override {
*reinterpret_cast<uint64_t*>(data_ + capacity_) = kEndGuard;
Expand Down Expand Up @@ -680,6 +730,23 @@ class NonPODAlignedBuffer : public Buffer {
this, checkedPlus<size_t>(AlignedBuffer::kPaddedSize, capacity_));
}

bool transferTo(bolt::memory::MemoryPool* pool) override {
if (pool_ == pool) {
return true;
}

if (pool_->transferTo(
pool,
this,
checkedPlus<size_t>(AlignedBuffer::kPaddedSize, capacity_))) {
bolt::memory::MemoryPool** poolPtr =
const_cast<bolt::memory::MemoryPool**>(&pool_);
*poolPtr = pool;
return true;
}
return false;
}

// Needs to use this class from static methods of AlignedBuffer
friend class AlignedBuffer;
};
Expand Down Expand Up @@ -707,6 +774,13 @@ class BufferView : public Buffer {
return true;
}

bool transferTo(bolt::memory::MemoryPool* pool) override {
if (pool_ == pool) {
return true;
}
return false;
}

private:
BufferView(const uint8_t* data, size_t size, Releaser releaser, bool podType)
// A BufferView must be created over the data held by a cache
Expand Down
6 changes: 3 additions & 3 deletions bolt/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ void* MemoryPoolImpl::allocate(
toString(),
allocator_->getAndClearFailureMessage()));
}
RECORD_ALLOC(buffer, size);
RECORD_ALLOC_SPEC(this, buffer, size);
return buffer;
}

Expand All @@ -630,7 +630,7 @@ void* MemoryPoolImpl::allocateZeroFilled(
toString(),
allocator_->getAndClearFailureMessage()));
}
RECORD_ALLOC(buffer, size);
RECORD_ALLOC_SPEC(this, buffer, size);
return buffer;
}

Expand Down Expand Up @@ -664,7 +664,7 @@ void* MemoryPoolImpl::reallocate(
if (p) {
RECORD_FREE(p, size);
}
RECORD_ALLOC(newP, newSize);
RECORD_ALLOC_SPEC(this, newP, newSize);
} else {
RECORD_GROW(p, newP, size, newSize);
}
Expand Down
207 changes: 207 additions & 0 deletions bolt/common/memory/tests/MemoryPoolTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3862,6 +3862,213 @@ TEST_P(MemoryPoolTest, allocationWithCoveredCollateral) {
pool->freeContiguous(contiguousAllocation);
}

TEST_P(MemoryPoolTest, transferTo) {
MemoryManager::Options options;
options.alignment = MemoryAllocator::kMinAlignment;
options.allocatorCapacity = kDefaultCapacity;
setupMemory(options);
auto manager = getMemoryManager();

auto largestSizeClass = manager->allocator()->largestSizeClass();
std::vector<MachinePageCount> pageCounts{
largestSizeClass,
largestSizeClass + 1,
largestSizeClass / 10,
1,
largestSizeClass * 2,
largestSizeClass * 3 + 1};

auto assertEqualBytes = [](const memory::MemoryPool* pool,
int64_t usedBytes,
int64_t peakBytes,
int64_t reservedBytes) {
EXPECT_EQ(pool->currentBytes(), usedBytes);
EXPECT_EQ(pool->peakBytes(), peakBytes);
EXPECT_EQ(pool->reservedBytes(), reservedBytes);
};

auto assertZeroByte = [](const memory::MemoryPool* pool) {
EXPECT_EQ(pool->currentBytes(), 0);
EXPECT_EQ(pool->reservedBytes(), 0);
};

auto getMemoryBytes = [](const memory::MemoryPool* pool) {
return std::make_tuple(
pool->currentBytes(), pool->peakBytes(), pool->reservedBytes());
};

auto createPools = [&manager](bool betweenDifferentRoots) {
auto root1 = manager->addRootPool("root1");
auto root2 = manager->addRootPool("root2");
std::shared_ptr<MemoryPool> from;
std::shared_ptr<MemoryPool> to;
if (betweenDifferentRoots) {
from = root1->addLeafChild("from");
to = root2->addLeafChild("to");
} else {
from = root1->addLeafChild("from");
to = root1->addLeafChild("to");
}
return std::make_tuple(root1, root2, from, to);
};

auto testTransferAllocate = [&assertZeroByte,
&assertEqualBytes,
&getMemoryBytes,
&createPools](bool betweenDifferentRoots) {
auto [root1, root2, from, to] = createPools(betweenDifferentRoots);
assertZeroByte(from.get());
assertZeroByte(to.get());
assertZeroByte(from->root());
assertZeroByte(to->root());

const auto kSize = 1024;
int64_t usedBytes, rootUsedBytes;
int64_t peakBytes, rootPeakBytes;
int64_t reservedBytes, rootReservedBytes;
auto buffer = from->allocate(kSize);
// Transferring between non-leaf pools is not allowed.
EXPECT_FALSE(from->root()->transferTo(to.get(), buffer, kSize));
EXPECT_FALSE(from->transferTo(to->root(), buffer, kSize));

std::tie(usedBytes, peakBytes, reservedBytes) = getMemoryBytes(from.get());
std::tie(rootUsedBytes, rootPeakBytes, rootReservedBytes) =
getMemoryBytes(from->root());
from->transferTo(to.get(), buffer, kSize);
assertEqualBytes(to.get(), usedBytes, peakBytes, reservedBytes);
if (from->root() == to->root()) {
rootPeakBytes *= 2;
}
assertEqualBytes(
to->root(), rootUsedBytes, rootPeakBytes, rootReservedBytes);
to->free(buffer, kSize);
assertZeroByte(from.get());
assertZeroByte(to.get());
assertZeroByte(from->root());
assertZeroByte(to->root());
};

auto testTransferAllocateZeroFilled =
[&assertZeroByte, &assertEqualBytes, &getMemoryBytes, &createPools](
bool betweenDifferentRoots) {
auto [root1, root2, from, to] = createPools(betweenDifferentRoots);
assertZeroByte(from.get());
assertZeroByte(to.get());
assertZeroByte(from->root());
assertZeroByte(to->root());

const auto kSize = 1024;
int64_t usedBytes, rootUsedBytes;
int64_t peakBytes, rootPeakBytes;
int64_t reservedBytes, rootReservedBytes;
auto buffer = from->allocateZeroFilled(8, kSize / 8);
std::tie(usedBytes, peakBytes, reservedBytes) =
getMemoryBytes(from.get());
std::tie(rootUsedBytes, rootPeakBytes, rootReservedBytes) =
getMemoryBytes(from->root());
from->transferTo(to.get(), buffer, kSize);
assertEqualBytes(to.get(), usedBytes, peakBytes, reservedBytes);
if (from->root() == to->root()) {
rootPeakBytes *= 2;
}
assertEqualBytes(
to->root(), rootUsedBytes, rootPeakBytes, rootReservedBytes);
to->free(buffer, kSize);
assertZeroByte(from.get());
assertZeroByte(to.get());
assertZeroByte(from->root());
assertZeroByte(to->root());
};

auto testTransferAllocateContiguous =
[&assertZeroByte, &assertEqualBytes, &getMemoryBytes, &createPools](
uint64_t pageCount, bool betweenDifferentRoots) {
auto [root1, root2, from, to] = createPools(betweenDifferentRoots);
assertZeroByte(from.get());
assertZeroByte(to.get());
assertZeroByte(from->root());
assertZeroByte(to->root());

int64_t usedBytes, rootUsedBytes;
int64_t peakBytes, rootPeakBytes;
int64_t reservedBytes, rootReservedBytes;
ContiguousAllocation out;
from->allocateContiguous(pageCount, out);
std::tie(usedBytes, peakBytes, reservedBytes) =
getMemoryBytes(from.get());
std::tie(rootUsedBytes, rootPeakBytes, rootReservedBytes) =
getMemoryBytes(from->root());
from->transferTo(to.get(), out.data(), out.size());
assertEqualBytes(to.get(), usedBytes, peakBytes, reservedBytes);
if (from->root() == to->root()) {
rootPeakBytes *= 2;
}
assertEqualBytes(
to->root(), rootUsedBytes, rootPeakBytes, rootReservedBytes);
to->freeContiguous(out);
assertZeroByte(from.get());
assertZeroByte(to.get());
assertZeroByte(from->root());
assertZeroByte(to->root());
};

auto testTransferAllocateNonContiguous =
[&assertZeroByte, &assertEqualBytes, &getMemoryBytes, &createPools](
uint64_t pageCount, bool betweenDifferentRoots) {
auto [root1, root2, from, to] = createPools(betweenDifferentRoots);
assertZeroByte(from.get());
assertZeroByte(to.get());
assertZeroByte(from->root());
assertZeroByte(to->root());

int64_t usedBytes, rootUsedBytes;
int64_t peakBytes, rootPeakBytes;
int64_t reservedBytes, rootReservedBytes;
Allocation out;
from->allocateNonContiguous(pageCount, out);
std::tie(usedBytes, peakBytes, reservedBytes) =
getMemoryBytes(from.get());
std::tie(rootUsedBytes, rootPeakBytes, rootReservedBytes) =
getMemoryBytes(from->root());
for (auto i = 0; i < out.numRuns(); ++i) {
const auto& run = out.runAt(i);
from->transferTo(to.get(), run.data(), run.numBytes());
}
assertEqualBytes(to.get(), usedBytes, peakBytes, reservedBytes);
if (from->root() == to->root()) {
EXPECT_EQ(to->root()->currentBytes(), rootUsedBytes);
// We reserve and release memory run-by-run, so the peak bytes would
// be no greater than twice of the original peak bytes.
EXPECT_LE(to->root()->peakBytes(), rootPeakBytes * 2);
EXPECT_EQ(to->root()->reservedBytes(), rootReservedBytes);
} else {
assertEqualBytes(
to->root(), rootUsedBytes, rootPeakBytes, rootReservedBytes);
}
to->freeNonContiguous(out);
assertZeroByte(from.get());
assertZeroByte(to.get());
assertZeroByte(from->root());
assertZeroByte(to->root());
};

// Test transfer between siblings of the same root pool.
testTransferAllocate(false);
testTransferAllocateZeroFilled(false);
for (auto pageCount : pageCounts) {
testTransferAllocateContiguous(pageCount, false);
testTransferAllocateNonContiguous(pageCount, false);
}

// Test transfer between different root pools.
testTransferAllocate(true);
testTransferAllocateZeroFilled(true);
for (auto pageCount : pageCounts) {
testTransferAllocateContiguous(pageCount, true);
testTransferAllocateNonContiguous(pageCount, true);
}
}

BOLT_INSTANTIATE_TEST_SUITE_P(
MemoryPoolTestSuite,
MemoryPoolTest,
Expand Down
12 changes: 12 additions & 0 deletions bolt/vector/BaseVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,18 @@ void BaseVector::copy(
copyRanges(source, ranges);
}

void BaseVector::transferOrCopyTo(bolt::memory::MemoryPool* pool) {
if (pool == pool_) {
return;
}

if (nulls_ && !nulls_->transferTo(pool)) {
nulls_ = AlignedBuffer::copy<bool>(nulls_, pool);
rawNulls_ = nulls_->as<uint64_t>();
}
pool_ = pool;
}

namespace {

template <TypeKind kind>
Expand Down
Loading