Skip to content

Commit

Permalink
Merge pull request ClickHouse#61049 from hanfei1991/hanfei/check-limi…
Browse files Browse the repository at this point in the history
…t-periodically

Check cgroups memory limit update periodically
  • Loading branch information
hanfei1991 authored Mar 18, 2024
2 parents 5e5a39d + 9b9f409 commit 8a54c85
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 82 deletions.
3 changes: 0 additions & 3 deletions base/base/getMemoryAmount.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ std::optional<uint64_t> getCgroupsV2MemoryLimit()

}

/** Returns the size of physical memory (RAM) in bytes.
* Returns 0 on unsupported platform
*/
uint64_t getMemoryAmountOrZero()
{
int64_t num_pages = sysconf(_SC_PHYS_PAGES);
Expand Down
9 changes: 4 additions & 5 deletions base/base/getMemoryAmount.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

#include <cstdint>

/** Returns the size of physical memory (RAM) in bytes.
* Returns 0 on unsupported platform or if it cannot determine the size of physical memory.
*/
/// Returns the size in bytes of physical memory (RAM) available to the process. The value can
/// be smaller than the total available RAM available to the system due to cgroups settings.
/// Returns 0 on unsupported platform or if it cannot determine the size of physical memory.
uint64_t getMemoryAmountOrZero();

/** Throws exception if it cannot determine the size of physical memory.
*/
/// Throws exception if it cannot determine the size of physical memory.
uint64_t getMemoryAmount();
20 changes: 20 additions & 0 deletions programs/keeper/Keeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <IO/UseSSL.h>
#include <Core/ServerUUID.h>
#include <Common/logger_useful.h>
#include <Common/CgroupsMemoryUsageObserver.h>
#include <Common/ErrorHandlers.h>
#include <Common/assertProcessUserMatchesDataOwner.h>
#include <Common/makeSocketAddress.h>
Expand Down Expand Up @@ -623,6 +624,25 @@ try
buildLoggers(config(), logger());
main_config_reloader->start();

std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
try
{
auto wait_time = config().getUInt64("keeper_server.cgroups_memory_observer_wait_time", 15);
if (wait_time != 0)
{
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
/// Not calling cgroups_memory_usage_observer->setLimits() here (as for the normal ClickHouse server) because Keeper controls
/// its memory usage by other means (via setting 'max_memory_usage_soft_limit').
cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
cgroups_memory_usage_observer->startThread();
}
}
catch (Exception &)
{
tryLogCurrentException(log, "Disabling cgroup memory observer because of an error during initialization");
}


LOG_INFO(log, "Ready for connections.");

waitForTerminationRequest();
Expand Down
10 changes: 8 additions & 2 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,7 @@ try
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
try
{
UInt64 wait_time = server_settings.cgroups_memory_usage_observer_wait_time;
auto wait_time = server_settings.cgroups_memory_usage_observer_wait_time;
if (wait_time != 0)
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
}
Expand Down Expand Up @@ -1362,7 +1362,7 @@ try
{
double hard_limit_ratio = new_server_settings.cgroup_memory_watcher_hard_limit_ratio;
double soft_limit_ratio = new_server_settings.cgroup_memory_watcher_soft_limit_ratio;
cgroups_memory_usage_observer->setLimits(
cgroups_memory_usage_observer->setMemoryUsageLimits(
static_cast<uint64_t>(max_server_memory_usage * hard_limit_ratio),
static_cast<uint64_t>(max_server_memory_usage * soft_limit_ratio));
}
Expand Down Expand Up @@ -1720,6 +1720,12 @@ try
throw;
}

if (cgroups_memory_usage_observer)
{
cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
cgroups_memory_usage_observer->startThread();
}

/// Reload config in SYSTEM RELOAD CONFIG query.
global_context->setConfigReloadCallback([&]()
{
Expand Down
96 changes: 53 additions & 43 deletions src/Common/CgroupsMemoryUsageObserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <base/cgroupsv2.h>
#include <base/getMemoryAmount.h>
#include <base/sleep.h>

#include <filesystem>
Expand Down Expand Up @@ -36,7 +37,7 @@ namespace ErrorCodes
CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_)
: log(getLogger("CgroupsMemoryUsageObserver"))
, wait_time(wait_time_)
, file(log)
, memory_usage_file(log)
{
LOG_INFO(log, "Initialized cgroups memory limit observer, wait time is {} sec", wait_time.count());
}
Expand All @@ -46,13 +47,13 @@ CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver()
stopThread();
}

void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_limit_)
void CgroupsMemoryUsageObserver::setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_)
{
std::lock_guard<std::mutex> limit_lock(limit_mutex);

if (hard_limit_ == hard_limit && soft_limit_ == soft_limit)
return;

stopThread();

hard_limit = hard_limit_;
soft_limit = soft_limit_;

Expand Down Expand Up @@ -83,25 +84,24 @@ void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_l
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
#endif
/// Reset current usage in memory tracker. Expect zero for free_memory_in_allocator_arenas as we just purged them.
uint64_t current_usage = readMemoryUsage();
MemoryTracker::setRSS(current_usage, 0);
uint64_t memory_usage = memory_usage_file.readMemoryUsage();
MemoryTracker::setRSS(memory_usage, 0);

LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(current_usage));
LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(memory_usage));
}
else
{
LOG_INFO(log, "Dropped below soft memory limit ({})", ReadableSize(soft_limit_));
}
};

startThread();

LOG_INFO(log, "Set new limits, soft limit: {}, hard limit: {}", ReadableSize(soft_limit_), ReadableSize(hard_limit_));
}

uint64_t CgroupsMemoryUsageObserver::readMemoryUsage() const
void CgroupsMemoryUsageObserver::setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_)
{
return file.readMemoryUsage();
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
on_memory_amount_available_changed = on_memory_amount_available_changed_;
}

namespace
Expand Down Expand Up @@ -163,7 +163,7 @@ std::pair<std::string, CgroupsMemoryUsageObserver::CgroupsVersion> getCgroupsFil

}

CgroupsMemoryUsageObserver::File::File(LoggerPtr log_)
CgroupsMemoryUsageObserver::MemoryUsageFile::MemoryUsageFile(LoggerPtr log_)
: log(log_)
{
std::tie(file_name, version) = getCgroupsFileName();
Expand All @@ -177,7 +177,7 @@ CgroupsMemoryUsageObserver::File::File(LoggerPtr log_)
file_name, "Cannot open file '{}'", file_name);
}

CgroupsMemoryUsageObserver::File::~File()
CgroupsMemoryUsageObserver::MemoryUsageFile::~MemoryUsageFile()
{
assert(fd != -1);
if (::close(fd) != 0)
Expand All @@ -195,7 +195,7 @@ CgroupsMemoryUsageObserver::File::~File()
}
}

uint64_t CgroupsMemoryUsageObserver::File::readMemoryUsage() const
uint64_t CgroupsMemoryUsageObserver::MemoryUsageFile::readMemoryUsage() const
{
/// File read is probably not read is thread-safe, just to be sure
std::lock_guard lock(mutex);
Expand Down Expand Up @@ -278,6 +278,9 @@ void CgroupsMemoryUsageObserver::runThread()
{
setThreadName("CgrpMemUsgObsr");

last_available_memory_amount = getMemoryAmount();
LOG_INFO(log, "Memory amount initially available to the process is {}", ReadableSize(last_available_memory_amount));

std::unique_lock lock(thread_mutex);
while (true)
{
Expand All @@ -286,8 +289,42 @@ void CgroupsMemoryUsageObserver::runThread()

try
{
uint64_t memory_usage = file.readMemoryUsage();
processMemoryUsage(memory_usage);
uint64_t available_memory_amount = getMemoryAmount();
if (available_memory_amount != last_available_memory_amount)
{
LOG_INFO(log, "Memory amount available to the process changed from {} to {}", ReadableSize(last_available_memory_amount), ReadableSize(available_memory_amount));
last_available_memory_amount = available_memory_amount;
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
on_memory_amount_available_changed();
}

std::lock_guard<std::mutex> limit_lock(limit_mutex);
if (soft_limit > 0 && hard_limit > 0)
{
uint64_t memory_usage = memory_usage_file.readMemoryUsage();
if (memory_usage > hard_limit)
{
if (last_memory_usage <= hard_limit)
on_hard_limit(true);
}
else
{
if (last_memory_usage > hard_limit)
on_hard_limit(false);
}

if (memory_usage > soft_limit)
{
if (last_memory_usage <= soft_limit)
on_soft_limit(true);
}
else
{
if (last_memory_usage > soft_limit)
on_soft_limit(false);
}
last_memory_usage = memory_usage;
}
}
catch (...)
{
Expand All @@ -296,33 +333,6 @@ void CgroupsMemoryUsageObserver::runThread()
}
}

void CgroupsMemoryUsageObserver::processMemoryUsage(uint64_t current_usage)
{
if (current_usage > hard_limit)
{
if (last_usage <= hard_limit)
on_hard_limit(true);
}
else
{
if (last_usage > hard_limit)
on_hard_limit(false);
}

if (current_usage > soft_limit)
{
if (last_usage <= soft_limit)
on_soft_limit(true);
}
else
{
if (last_usage > soft_limit)
on_soft_limit(false);
}

last_usage = current_usage;
}

}

#endif
68 changes: 40 additions & 28 deletions src/Common/CgroupsMemoryUsageObserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,71 @@

#include <Common/ThreadPool.h>

#include <atomic>
#include <chrono>
#include <mutex>

namespace DB
{

/// Periodically reads the current memory usage from Linux cgroups.
/// You can specify soft or hard memory limits:
/// - When the soft memory limit is hit, drop jemalloc cache.
/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster.
/// Does two things:
/// 1. Periodically reads the memory usage of the process from Linux cgroups.
/// You can specify soft or hard memory limits:
/// - When the soft memory limit is hit, drop jemalloc cache.
/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster.
/// The goal of this is to avoid that the process hits the maximum allowed memory limit at which there is a good
/// chance that the Limux OOM killer terminates it. All of this is done is because internal memory tracking in
/// ClickHouse can unfortunately under-estimate the actually used memory.
/// 2. Periodically reads the the maximum memory available to the process (which can change due to cgroups settings).
/// You can specify a callback to react on changes. The callback typically reloads the configuration, i.e. Server
/// or Keeper configuration file. This reloads settings 'max_server_memory_usage' (Server) and 'max_memory_usage_soft_limit'
/// (Keeper) from which various other internal limits are calculated, including the soft and hard limits for (1.).
/// The goal of this is to provide elasticity when the container is scaled-up/scaled-down. The mechanism (polling
/// cgroups) is quite implicit, unfortunately there is currently no better way to communicate memory threshold changes
/// to the database.
#if defined(OS_LINUX)
class CgroupsMemoryUsageObserver
{
public:
using OnMemoryLimitFn = std::function<void(bool)>;
using OnMemoryAmountAvailableChangedFn = std::function<void()>;

enum class CgroupsVersion
{
V1,
V2

};

explicit CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_);
~CgroupsMemoryUsageObserver();

void setLimits(uint64_t hard_limit_, uint64_t soft_limit_);

size_t getHardLimit() const { return hard_limit; }
size_t getSoftLimit() const { return soft_limit; }
void setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_);
void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_);

uint64_t readMemoryUsage() const;
void startThread();

private:
LoggerPtr log;

std::atomic<size_t> hard_limit = 0;
std::atomic<size_t> soft_limit = 0;

const std::chrono::seconds wait_time;

using CallbackFn = std::function<void(bool)>;
CallbackFn on_hard_limit;
CallbackFn on_soft_limit;
std::mutex limit_mutex;
size_t hard_limit TSA_GUARDED_BY(limit_mutex) = 0;
size_t soft_limit TSA_GUARDED_BY(limit_mutex) = 0;
OnMemoryLimitFn on_hard_limit TSA_GUARDED_BY(limit_mutex);
OnMemoryLimitFn on_soft_limit TSA_GUARDED_BY(limit_mutex);

uint64_t last_usage = 0;
std::mutex memory_amount_available_changed_mutex;
OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed TSA_GUARDED_BY(memory_amount_available_changed_mutex);

uint64_t last_memory_usage = 0; /// how much memory does the process use
uint64_t last_available_memory_amount; /// how much memory can the process use

/// Represents the cgroup virtual file that shows the memory consumption of the process's cgroup.
struct File
struct MemoryUsageFile
{
public:
explicit File(LoggerPtr log_);
~File();
explicit MemoryUsageFile(LoggerPtr log_);
~MemoryUsageFile();
uint64_t readMemoryUsage() const;
private:
LoggerPtr log;
Expand All @@ -62,13 +76,11 @@ class CgroupsMemoryUsageObserver
std::string file_name;
};

File file;
MemoryUsageFile memory_usage_file;

void startThread();
void stopThread();

void runThread();
void processMemoryUsage(uint64_t usage);

std::mutex thread_mutex;
std::condition_variable cond;
Expand All @@ -79,13 +91,13 @@ class CgroupsMemoryUsageObserver
#else
class CgroupsMemoryUsageObserver
{
using OnMemoryAmountAvailableChangedFn = std::function<void()>;
public:
explicit CgroupsMemoryUsageObserver(std::chrono::seconds) {}

void setLimits(uint64_t, uint64_t) {}
size_t readMemoryUsage() { return 0; }
size_t getHardLimit() { return 0; }
size_t getSoftLimit() { return 0; }
void setMemoryUsageLimits(uint64_t, uint64_t) {}
void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn) {}
void startThread() {}
};
#endif

Expand Down
Loading

0 comments on commit 8a54c85

Please sign in to comment.