Skip to content

Commit

Permalink
Clean up legacy code from Buffer::reallocate and reduce window where …
Browse files Browse the repository at this point in the history
…Buffer is invalid (facebookincubator#9755)

Summary:
Pull Request resolved: facebookincubator#9755

When running Velox with the save input on expression setting enabled, I'm occasionally seeing
SIGSEGVs writing out the Vectors when the exception was thrown while resizing the offset or length
Buffer in a MapVector or an ArrayVector, e.g. because of an OOM.  When this happens in
Buffer::reallocate it can leave the Buffer in an invalid state before the exception is caught (when the
exception message is being generated) triggering the crash.

While I doubt we can 100% guarantee VectorSaver won't trigger a crash we can at least fix this case.

Instead of detaching buffer before reallocating it, if we detach it after we still avoid a double free and
the buffer is (almost) always valid. There are windows after the old buffer is freed before MemoryPool::reallocate returns, and while setting up newBuffer where buffer would still be invalid,
but these are much less likely to run into issues (these would mainly be bugs in the code).

While digging through this I also noticed that the comments on MemoryPool::reallocate have gotten
out of date and Buffer::reallocate is handling some unnecessary cases (newPtr can never be equal to
old), so I've cleaned those up.

Reviewed By: xiaoxmeng

Differential Revision: D57139357

fbshipit-source-id: b2a0391a77d6ccbdee9e96f3a85bde9ab815b2e9
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed May 11, 2024
1 parent ed0ecdd commit 2c98308
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 26 deletions.
31 changes: 8 additions & 23 deletions velox/buffer/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,34 +381,19 @@ class AlignedBuffer : public Buffer {
auto oldCapacity = checkedPlus<size_t>(old->capacity(), kPaddedSize);
auto preferredSize =
pool->preferredSize(checkedPlus<size_t>(size, kPaddedSize));
// Make the buffer no longer owned by '*buffer' because reallocate
// may free the old buffer. Reassigning the new buffer to
// '*buffer' would be a double free.

void* newPtr = pool->reallocate(old, oldCapacity, preferredSize);

// Make the old buffer no longer owned by '*buffer' because reallocate
// freed the old buffer. Reassigning the new buffer to
// '*buffer' would be a double free if we didn't do this.
buffer->detach();
// Decrement the reference count. No need to check, we just
// checked old->unique().
old->referenceCount_.fetch_sub(1);
void* newPtr;
try {
newPtr = pool->reallocate(old, oldCapacity, preferredSize);
} catch (const std::exception&) {
*buffer = old;
throw;
}
if (newPtr == reinterpret_cast<void*>(old)) {
// The pointer did not change. Put the old pointer back in the
// smart pointer and adjust capacity.
*buffer = old;
(*buffer)->setCapacity(preferredSize - kPaddedSize);
(*buffer)->setSize(size);
reinterpret_cast<AlignedBuffer*>(buffer->get())
->fillNewMemory<T>(oldSize, size, initValue);
return;
}

auto newBuffer =
new (newPtr) AlignedBuffer(pool, preferredSize - kPaddedSize);
newBuffer->setSize(size);
newBuffer->fillNewMemory<T>(oldSize, size, initValue);

*buffer = newBuffer;
}

Expand Down
113 changes: 113 additions & 0 deletions velox/buffer/tests/BufferTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "velox/buffer/Buffer.h"

#include "folly/Range.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/type/StringView.h"

#include <glog/logging.h>
Expand Down Expand Up @@ -182,6 +184,117 @@ TEST_F(BufferTest, testReallocate) {
EXPECT_GT(numMoved, 0);
}

TEST_F(BufferTest, testReallocateNoReuse) {
// This test checks that regardless of how we resize a Buffer in reallocate
// (up, down, the same) as long as we hit MemoryPool::reallocate, the Buffer
// always points to a new location in memory. If this test fails, it's not
// necessarily a problem, but it's worth looking at optimizations we could do
// in reallocate when the pointer doesn't change.

enum BufferResizeOption {
BIGGER,
SMALLER,
SAME,
};

auto test = [&](BufferResizeOption bufferResizeOption,
bool useMmapAllocator) {
memory::MemoryManagerOptions options;
options.useMmapAllocator = useMmapAllocator;
options.allocatorCapacity = 1024 * 1024;
memory::MemoryManager memoryManager(options);

auto pool = memoryManager.addLeafPool("testReallocateNoReuse");

const size_t originalBufferSize = 10;
auto buffer = AlignedBuffer::allocate<char>(originalBufferSize, pool.get());
auto* originalBufferPtr = buffer.get();

size_t newSize;
switch (bufferResizeOption) {
case SMALLER:
newSize = originalBufferSize - 1;
break;
case SAME:
newSize = originalBufferSize;
break;
case BIGGER:
// Make sure the new size is large enough that we hit
// MemoryPoolImpl::reallocate.
newSize = buffer->capacity() + 1;
break;
default:
VELOX_FAIL("Unexpected buffer resize option");
}

AlignedBuffer::reallocate<char>(&buffer, newSize);

EXPECT_NE(buffer.get(), originalBufferPtr);
};

test(SMALLER, true);
test(SAME, true);
test(BIGGER, true);

test(SMALLER, false);
test(SAME, false);
test(BIGGER, false);
}

DEBUG_ONLY_TEST_F(BufferTest, testReallocateFails) {
// Reallocating a buffer can cause an exception to be thrown e.g. if we
// run out of memory. If the buffer is left in an invalid state this can
// cause crahses, e.g. if VectorSaver attempts to write out a Vector that
// was in the midst of resizing. This test verifies the buffer is valid at
// different points in the exception's lifecycle.

const size_t bufferSize = 10;
auto buffer = AlignedBuffer::allocate<char>(bufferSize, pool_.get());

::memset(buffer->asMutable<char>(), 'a', bufferSize);

common::testutil::TestValue::enable();

const std::string kErrorMessage = "Expected out of memory exception";
SCOPED_TESTVALUE_SET(
"facebook::velox::memory::MemoryPoolImpl::reserveThreadSafe",
std::function<void(memory::MemoryPool*)>([&](memory::MemoryPool*) {
VELOX_MEM_POOL_CAP_EXCEEDED(kErrorMessage);
}));

{
ExceptionContextSetter setter(
{.messageFunc = [](VeloxException::Type,
void* untypedArg) -> std::string {
// Validate that the buffer is still valid at the point
// the exception is thrown.
auto bufferArg = *static_cast<BufferPtr*>(untypedArg);

const auto* bufferContents = bufferArg->as<char>();
VELOX_CHECK_EQ(bufferArg->size(), 10);
for (int i = 0; i < 10; i++) {
VELOX_CHECK_EQ(bufferContents[i], 'a');
}

return "Exception context message func called.";
},
.arg = &buffer});

VELOX_ASSERT_THROW_CODE(
AlignedBuffer::reallocate<char>(
&buffer, pool_->availableReservation() + 1),
error_code::kMemCapExceeded,
kErrorMessage);
}

// Validate the buffer is valid after the exception is caught.
const auto* bufferContents = buffer->as<char>();
VELOX_CHECK_EQ(buffer->size(), bufferSize);
for (int i = 0; i < bufferSize; i++) {
VELOX_CHECK_EQ(bufferContents[i], 'a');
}
}

struct MockCachePin {
void addRef() {
++pinCount;
Expand Down
4 changes: 1 addition & 3 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,7 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
virtual void* allocateZeroFilled(int64_t numEntries, int64_t sizeEach) = 0;

/// Re-allocates from an existing buffer with 'newSize' and update memory
/// usage counting accordingly. If 'newSize' is larger than the current buffer
/// 'size', the function will allocate a new buffer and free the old buffer.
/// If the new allocation fails, this method will throw and not free 'p'.
/// usage counting accordingly.
virtual void* reallocate(void* p, int64_t size, int64_t newSize) = 0;

/// Frees an allocated buffer.
Expand Down

0 comments on commit 2c98308

Please sign in to comment.