Skip to content

Commit

Permalink
Remove memory usage type from MemoryUsageTracker (#3554)
Browse files Browse the repository at this point in the history
Summary:
Remove memory usage type from MemoryUsageTracker and
add a multi-threading test for memory tracker.

Followups: make leaf tracker update thread-safe.

Pull Request resolved: #3554

Reviewed By: HuamengJiang, oerling

Differential Revision: D42162405

Pulled By: xiaoxmeng

fbshipit-source-id: 738eb441c2b89fe6e9355ebde6612e5cac7e7527
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 21, 2022
1 parent 8fec289 commit bacce93
Show file tree
Hide file tree
Showing 21 changed files with 490 additions and 542 deletions.
8 changes: 5 additions & 3 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,14 +341,16 @@ class MemoryPoolImpl : public MemoryPool {

~MemoryPoolImpl() {
if (const auto& tracker = getMemoryUsageTracker()) {
auto remainingBytes = tracker->getCurrentUserBytes();
// TODO: change to check reserved bytes which including the unused
// reservation.
auto remainingBytes = tracker->currentBytes();
VELOX_CHECK_EQ(
0,
remainingBytes,
"Memory pool should be destroyed only after all allocated memory has been freed. Remaining bytes allocated: {}, cumulative bytes allocated: {}, number of allocations: {}",
remainingBytes,
tracker->getCumulativeBytes(),
tracker->getNumAllocs());
tracker->cumulativeBytes(),
tracker->numAllocs());
}
}

Expand Down
177 changes: 84 additions & 93 deletions velox/common/memory/MemoryUsageTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,47 +20,41 @@
namespace facebook::velox::memory {
std::shared_ptr<MemoryUsageTracker> MemoryUsageTracker::create(
const std::shared_ptr<MemoryUsageTracker>& parent,
MemoryUsageTracker::UsageType type,
const MemoryUsageConfig& config) {
struct SharedMemoryUsageTracker : public MemoryUsageTracker {
SharedMemoryUsageTracker(
const std::shared_ptr<MemoryUsageTracker>& parent,
MemoryUsageTracker::UsageType type,
const MemoryUsageConfig& config)
: MemoryUsageTracker(parent, type, config) {}
};

return std::make_shared<SharedMemoryUsageTracker>(parent, type, config);
int64_t maxMemory) {
auto* tracker = new MemoryUsageTracker(parent, maxMemory);
return std::shared_ptr<MemoryUsageTracker>(tracker);
}

void MemoryUsageTracker::update(int64_t size) {
if (size > 0) {
int64_t increment = 0;
++usage(numAllocs_, UsageType::kTotalMem);
++numAllocs_;
{
std::lock_guard<std::mutex> l(mutex_);
if (usedReservation_ + size > reservation_) {
if (usedReservationBytes_ + size > reservationBytes_) {
increment = reserveLocked(size);
}
}
checkAndPropagateReservationIncrement(increment, false);
usedReservation_.fetch_add(size);
usedReservationBytes_.fetch_add(size);
return;
}

// Decreasing usage. See if need to propagate upward.
int64_t decrement = 0;
{
std::lock_guard<std::mutex> l(mutex_);
auto newUsed = usedReservation_ += size;
auto newCap = std::max(minReservation_.load(), newUsed);
usedReservationBytes_ += size;
auto newUsed = usedReservationBytes_.load();
auto newCap = std::max(minReservationBytes_.load(), newUsed);
auto newQuantized = quantizedSize(newCap);
if (newQuantized != reservation_) {
decrement = reservation_ - newQuantized;
reservation_ = newQuantized;
if (newQuantized != reservationBytes_) {
decrement = reservationBytes_ - newQuantized;
reservationBytes_ = newQuantized;
}
}
if (decrement) {
decrementUsage(type_, decrement);
if (decrement > 0) {
decrementUsage(decrement);
}
}

Expand All @@ -69,128 +63,118 @@ void MemoryUsageTracker::reserve(int64_t size) {
{
std::lock_guard<std::mutex> l(mutex_);
increment = reserveLocked(size);
minReservation_ = reservation_.load();
}
if (increment) {
checkAndPropagateReservationIncrement(increment, true);
minReservationBytes_ = reservationBytes_.load();
}
checkAndPropagateReservationIncrement(increment, true);
}

void MemoryUsageTracker::release() {
if (!minReservation_) {
return;
}
int64_t freeable;
int64_t freeable = 0;
{
std::lock_guard<std::mutex> l(mutex_);
auto reservationByUsage = quantizedSize(usedReservation_);
freeable = reservation_ - reservationByUsage;
if (minReservationBytes_ == 0) {
return;
}
const auto quantizedUsedBytes = quantizedSize(usedReservationBytes_);
VELOX_CHECK_GE(reservationBytes_, quantizedUsedBytes);
freeable = reservationBytes_ - quantizedUsedBytes;
if (freeable > 0) {
reservation_ = reservationByUsage;
reservationBytes_ = quantizedUsedBytes;
}
minReservation_ = 0;
minReservationBytes_ = 0;
}
if (freeable) {
decrementUsage(type_, freeable);
if (freeable > 0) {
decrementUsage(freeable);
}
}

void MemoryUsageTracker::checkAndPropagateReservationIncrement(
int64_t increment,
bool updateMinReservation) {
if (!increment) {
if (increment == 0) {
return;
}
std::exception_ptr exception;
try {
incrementUsage(type_, increment);
incrementUsage(increment);
} catch (std::exception& e) {
exception = std::current_exception();
}
// Process the exception outside of catch so as not to kill the process if
// 'mutex_' throws deadlock.
if (exception) {
if (exception != nullptr) {
// Compensate for the increase after exceeding limit.
std::lock_guard<std::mutex> l(mutex_);
reservation_ -= increment;
reservationBytes_ -= increment;
if (updateMinReservation) {
minReservation_ -= increment;
minReservationBytes_ -= increment;
}
std::rethrow_exception(exception);
}
}

void MemoryUsageTracker::incrementUsage(UsageType type, int64_t size) {
// Update parent first. If one of the ancestor's limits are exceeded, it
// will throw MEM_CAP_EXCEEDED exception. This exception will be caught
// and re-thrown with an additional message appended to it if a
void MemoryUsageTracker::incrementUsage(int64_t size) {
// Update parent first. If one of the ancestor's limits are exceeded, it will
// throw MEM_CAP_EXCEEDED exception. This exception will be caught and
// re-thrown with an additional message appended to it if a
// makeMemoryCapExceededMessage_ is set.
if (parent_) {
if (parent_ != nullptr) {
try {
parent_->incrementUsage(type, size);
parent_->incrementUsage(size);
} catch (const VeloxRuntimeError& e) {
if (!makeMemoryCapExceededMessage_) {
if (makeMemoryCapExceededMessage_ == nullptr) {
throw;
}
auto errorMessage =
e.message() + ". " + makeMemoryCapExceededMessage_(*this);
VELOX_MEM_CAP_EXCEEDED(errorMessage);
}
}
auto newUsage = usage(currentUsageInBytes_, type)
.fetch_add(size, std::memory_order_relaxed) +
size;

// We track the peak usage of total memory independent of user and
// system memory since freed user memory can be reallocated as system
// memory and vice versa.
auto otherUsageType =
type == UsageType::kUserMem ? UsageType::kSystemMem : UsageType::kUserMem;
int64_t totalBytes = newUsage + usage(currentUsageInBytes_, otherUsageType);

const auto newUsage =
currentBytes_.fetch_add(size, std::memory_order_relaxed) + size;
// Enforce the limit.
if (newUsage > usage(maxMemory_, type) || totalBytes > total(maxMemory_)) {
if (!growCallback_ || !growCallback_(type, size, *this)) {
if (newUsage > maxMemory_) {
if ((growCallback_ == nullptr) || !growCallback_(size, *this)) {
// Exceeded the limit. revert the change to current usage.
decrementUsage(type, size);
decrementUsage(size);
checkNonNegativeSizes("after exceeding cap");
auto errorMessage = fmt::format(
MEM_CAP_EXCEEDED_ERROR_FORMAT,
succinctBytes(std::min(total(maxMemory_), usage(maxMemory_, type))),
succinctBytes(maxMemory_),
succinctBytes(size));
if (makeMemoryCapExceededMessage_) {
errorMessage += ". " + makeMemoryCapExceededMessage_(*this);
}
VELOX_MEM_CAP_EXCEEDED(errorMessage);
}
}
usage(cumulativeBytes_, type) += size;
maySetMax(type, newUsage);
maySetMax(UsageType::kTotalMem, totalBytes);
cumulativeBytes_ += size;
maybeUpdatePeakBytes(newUsage);
checkNonNegativeSizes("after update");
}

void MemoryUsageTracker::decrementUsage(UsageType type, int64_t size) noexcept {
if (parent_) {
parent_->decrementUsage(type, size);
void MemoryUsageTracker::decrementUsage(int64_t size) noexcept {
if (parent_ != nullptr) {
parent_->decrementUsage(size);
}
usage(currentUsageInBytes_, type).fetch_sub(size);
currentBytes_.fetch_sub(size);
}

bool MemoryUsageTracker::maybeReserve(int64_t increment) {
constexpr int32_t kGrowthQuantum = 8 << 20;
auto addedReservation = bits::roundUp(increment, kGrowthQuantum);
auto candidate = this;
while (candidate) {
auto limit = candidate->maxTotalBytes();
const auto reservationToAdd = bits::roundUp(increment, kGrowthQuantum);
MemoryUsageTracker* candidate = this;
while (candidate != nullptr) {
auto limit = candidate->maxMemory();
// If this tracker has no limit, proceed to its parent.
if (limit == memory::kMaxMemory && candidate->parent_) {
if (limit == kMaxMemory && candidate->parent_ != nullptr) {
candidate = candidate->parent_.get();
continue;
}
if (limit - candidate->getCurrentTotalBytes() > addedReservation) {
if (limit - candidate->currentBytes() >= reservationToAdd) {
try {
reserve(addedReservation);
reserve(reservationToAdd);
} catch (const std::exception& e) {
return false;
}
Expand All @@ -201,34 +185,41 @@ bool MemoryUsageTracker::maybeReserve(int64_t increment) {
return false;
}

void MemoryUsageTracker::maySetMax(UsageType type, int64_t newPeak) {
auto& peakUsage = peakUsageInBytes_[static_cast<int>(type)];
int64_t oldPeak = peakUsage;
void MemoryUsageTracker::maybeUpdatePeakBytes(int64_t newPeak) {
int64_t oldPeak = peakBytes_;
while (oldPeak < newPeak &&
!peakUsage.compare_exchange_weak(oldPeak, newPeak)) {
oldPeak = peakUsage;
!peakBytes_.compare_exchange_weak(oldPeak, newPeak)) {
oldPeak = peakBytes_;
}
}

void MemoryUsageTracker::checkNonNegativeSizes(
const char* FOLLY_NONNULL message) const {
if (user(currentUsageInBytes_) < 0 || system(currentUsageInBytes_) < 0 ||
total(currentUsageInBytes_) < 0) {
LOG_EVERY_N(ERROR, 100)
<< "MEMR: Negative usage " << message << user(currentUsageInBytes_)
<< " " << system(currentUsageInBytes_) << " "
<< total(currentUsageInBytes_);
}
void MemoryUsageTracker::checkNonNegativeSizes(const char* errmsg) const {
VELOX_CHECK_GE(
currentBytes_,
0,
"Negative reserved bytes {}: {}",
currentBytes_,
errmsg);
}

std::string MemoryUsageTracker::toString() const {
std::stringstream out;
out << "<tracker total " << (getCurrentTotalBytes() >> 20) << " available "
<< (getAvailableReservation() >> 20);
if (maxTotalBytes() != kMaxMemory) {
out << "limit " << (maxTotalBytes() >> 20);
out << "<tracker used " << (currentBytes() >> 20) << " available "
<< (availableReservation() >> 20);
if (maxMemory() != kMaxMemory) {
out << " limit " << (maxMemory() >> 20);
}
out << ">";
return out.str();
}

int64_t MemoryUsageTracker::reserveLocked(int64_t size) {
const int64_t neededSize = size - (reservationBytes_ - usedReservationBytes_);
if (neededSize <= 0) {
return 0;
}
const auto increment = roundedDelta(reservationBytes_, neededSize);
reservationBytes_ += increment;
return increment;
}
} // namespace facebook::velox::memory
Loading

0 comments on commit bacce93

Please sign in to comment.