Skip to content

Commit

Permalink
chore: reenable evictions upon insertion to avoid OOM rejections (#3387)
Browse files Browse the repository at this point in the history
* chore: reenable evictions upon insertion to avoid OOM rejections

Before: when running dragonfly with --cache_mode we could get OOM rejections
even though the eviction policy allowed to evict items to free memory.
Ideally, dragonfly in cache mode should not respond with the OOM error.

This PR reuses the same Eviction step we have in the Heartbeat and conditionally applies it
during the insertion. In my test the OOM errors went from 500K to 0 and the server
still respected memory limit.

Also, remove the old heuristics that has never been used.

Test:

./dfly_bench --key_prefix=bar: -d 1024 --ratio=1:0 --qps=200 -n 3000
./dragonfly --dbfilename=  --proactor_threads=2 --maxmemory=600M --cache_mode

---------

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jul 25, 2024
1 parent fb4222d commit e2d65a0
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 141 deletions.
4 changes: 2 additions & 2 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ class CompactObj {
// Precondition: the object is a non-inline string.
StringOrView GetRawString() const;

bool HasAllocated() const;

private:
void EncodeString(std::string_view str);
size_t DecodedLen(size_t sz) const;
Expand All @@ -412,8 +414,6 @@ class CompactObj {
// Requires: HasAllocated() - true.
void Free();

bool HasAllocated() const;

bool CmpEncoded(std::string_view sv) const;

void SetMeta(uint8_t taglen, uint8_t mask = 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/acl/validator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace dfly::acl {

class AclKeys;
struct AclKeys;

std::pair<bool, AclLog::Reason> IsUserAllowedToInvokeCommandGeneric(
const std::vector<uint64_t>& acl_commands, const AclKeys& keys, facade::CmdArgList tail_args,
Expand Down
169 changes: 38 additions & 131 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
#include "util/fibers/fibers.h"
#include "util/fibers/stacktrace.h"

ABSL_FLAG(bool, enable_heartbeat_eviction, true,
"Enable eviction during heartbeat when memory is under pressure.");

ABSL_FLAG(uint32_t, max_eviction_per_heartbeat, 100,
"The maximum number of key-value pairs that will be deleted in each eviction "
"when heartbeat based eviction is triggered under memory pressure.");
Expand Down Expand Up @@ -484,8 +481,7 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
if (caching_mode_ && IsValid(res.it)) {
if (!change_cb_.empty()) {
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
CallChangeCallbacks(cntx.db_index, bit);
CallChangeCallbacks(cntx.db_index, key, bit);
};
db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb);
}
Expand Down Expand Up @@ -565,8 +561,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status;

// It's a new entry.
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
CallChangeCallbacks(cntx.db_index, key);
CallChangeCallbacks(cntx.db_index, key, {key});

// In case we are loading from rdb file or replicating we want to disable conservative memory
// checks (inside PrimeEvictionPolicy::CanGrow) and reject insertions only after we pass max
Expand Down Expand Up @@ -598,8 +593,8 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
CompactObj co_key{key};
PrimeIterator it;

// I try/catch just for sake of having a convenient place to set a breakpoint.
size_t table_before = db.prime.mem_usage();
size_t table_before = db.table_memory();

try {
it = db.prime.InsertNew(std::move(co_key), PrimeValue{}, evp);
} catch (bad_alloc& e) {
Expand All @@ -608,19 +603,20 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
return OpStatus::OUT_OF_MEMORY;
}

table_memory_ += (db.prime.mem_usage() - table_before);
size_t evicted_obj_bytes = 0;

// We may still reach the state when our memory usage is above the limit even if we
// do not add new segments. For example, we have half full segments
// and we add new objects or update the existing ones and our memory usage grows.
if (evp.mem_budget() < 0) {
// TODO(roman): EvictObjects is too aggressive and it's messing with cache hit-rate.
// The regular eviction policy does a decent job though it may cross the passed limit
// a little bit. I do not consider it as a serious bug.
// evicted_obj_bytes = EvictObjects(-evp.mem_budget(), it, &db);
if (evp.mem_budget() < 0 && apply_memory_limit) {
// We may reach the state when our memory usage is below the limit even if we
// do not add new segments. For example, we have half full segments
// and we add new objects or update the existing ones and our memory usage grows.
// We do not require for a single operation to unload the whole negative debt.
// Instead, we create a positive, converging force that should help with freeing enough memory.
// Free at least 256 bytes or 3% of the total debt.
size_t evict_goal = std::max<size_t>(256, (-evp.mem_budget()) / 32);
evicted_obj_bytes = FreeMemWithEvictionStep(cntx.db_index, it.segment_id(), evict_goal);
}

table_memory_ += (db.table_memory() - table_before);

db.stats.inline_keys += it->first.IsInline();
AccountObjectMemory(key, it->first.ObjType(), it->first.MallocUsed(), &db); // Account for key

Expand Down Expand Up @@ -709,7 +705,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
string_view key = get<string_view>(req.change);
table->CVCUponInsert(
next_version, key,
[this, db_index, next_version, iterate_bucket](PrimeTable::bucket_iterator it) {
[db_index, next_version, iterate_bucket](PrimeTable::bucket_iterator it) {
DCHECK_LT(it.GetVersion(), next_version);
iterate_bucket(db_index, it);
});
Expand Down Expand Up @@ -762,7 +758,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
}

CHECK(fetched_items_.empty());
auto cb = [this, indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
auto cb = [indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
flush_db_arr.clear();
ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap |
ServerState::kGlibcmalloc);
Expand Down Expand Up @@ -1023,9 +1019,7 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const
}

void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
DVLOG(2) << "Running callbacks in dbid " << db_ind;
CallChangeCallbacks(db_ind, ChangeReq{it.GetInnerIt()});

CallChangeCallbacks(db_ind, key, ChangeReq{it.GetInnerIt()});
it.GetInnerIt().SetVersion(NextVersion());
}

Expand Down Expand Up @@ -1217,25 +1211,22 @@ int32_t DbSlice::GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) c
db_arr_[db_ind]->prime.GetSegmentCount();
}

void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes) {
size_t DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t starting_segment_id,
size_t increase_goal_bytes) {
DCHECK(!owner_->IsReplica());
if ((!caching_mode_) || !expire_allowed_ || !GetFlag(FLAGS_enable_heartbeat_eviction))
return;
if ((!caching_mode_) || !expire_allowed_)
return 0;

auto max_eviction_per_hb = GetFlag(FLAGS_max_eviction_per_heartbeat);
auto max_segment_to_consider = GetFlag(FLAGS_max_segment_to_consider);

auto time_start = absl::GetCurrentTimeNanos();
auto& db_table = db_arr_[db_ind];
int32_t num_segments = db_table->prime.GetSegmentCount();
int32_t num_buckets = PrimeTable::Segment_t::kTotalBuckets;
int32_t num_slots = PrimeTable::Segment_t::kSlotNum;
constexpr int32_t num_buckets = PrimeTable::Segment_t::kTotalBuckets;
constexpr int32_t num_slots = PrimeTable::Segment_t::kSlotNum;

size_t used_memory_after;
size_t evicted = 0;
size_t evicted_items = 0, evicted_bytes = 0;
string tmp;
int32_t starting_segment_id = rand() % num_segments;
size_t used_memory_before = owner_->UsedMemory();

bool record_keys = owner_->journal() != nullptr || expired_keys_events_recording_;
vector<string> keys_to_journal;
Expand All @@ -1257,7 +1248,7 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes
continue;

auto evict_it = db_table->prime.GetIterator(segment_id, bucket_id, slot_id);
if (evict_it->first.IsSticky())
if (evict_it->first.IsSticky() || !evict_it->second.HasAllocated())
continue;

// check if the key is locked by looking up transaction table.
Expand All @@ -1269,13 +1260,12 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes
if (record_keys)
keys_to_journal.emplace_back(key);

evicted_bytes += evict_it->second.MallocUsed();
++evicted_items;
PerformDeletion(Iterator(evict_it, StringOrView::FromView(key)), db_table.get());
++evicted;

used_memory_after = owner_->UsedMemory();
// returns when whichever condition is met first
if ((evicted == max_eviction_per_hb) ||
(used_memory_before - used_memory_after >= increase_goal_bytes))
if ((evicted_items == max_eviction_per_hb) || (evicted_bytes >= increase_goal_bytes))
goto finish;
}
}
Expand All @@ -1294,12 +1284,12 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes
}

auto time_finish = absl::GetCurrentTimeNanos();
events_.evicted_keys += evicted;
DVLOG(2) << "Memory usage before eviction: " << used_memory_before;
DVLOG(2) << "Memory usage after eviction: " << used_memory_after;
DVLOG(2) << "Number of keys evicted / max eviction per hb: " << evicted << "/"
events_.evicted_keys += evicted_items;
DVLOG(2) << "Evicted: " << evicted_bytes;
DVLOG(2) << "Number of keys evicted / max eviction per hb: " << evicted_items << "/"
<< max_eviction_per_hb;
DVLOG(2) << "Eviction time (us): " << (time_finish - time_start) / 1000;
return evicted_bytes;
}

void DbSlice::CreateDb(DbIndex db_ind) {
Expand All @@ -1310,93 +1300,6 @@ void DbSlice::CreateDb(DbIndex db_ind) {
}
}

// "it" is the iterator that we just added/updated and it should not be deleted.
// "table" is the instance where we should delete the objects from.
size_t DbSlice::EvictObjects(size_t memory_to_free, Iterator it, DbTable* table) {
if (owner_->IsReplica()) {
return 0;
}
PrimeTable::Segment_t* segment = table->prime.GetSegment(it.GetInnerIt().segment_id());
DCHECK(segment);

constexpr unsigned kNumStashBuckets = PrimeTable::Segment_t::kStashBucketNum;
constexpr unsigned kNumRegularBuckets = PrimeTable::Segment_t::kBucketNum;

PrimeTable::bucket_iterator it2(it.GetInnerIt());
unsigned evicted = 0;
bool evict_succeeded = false;

EngineShard* shard = owner_;
size_t used_memory_start = shard->UsedMemory();

auto freed_memory_fun = [&] {
size_t current = shard->UsedMemory();
return current < used_memory_start ? used_memory_start - current : 0;
};

for (unsigned i = 0; !evict_succeeded && i < kNumStashBuckets; ++i) {
unsigned stash_bid = i + kNumRegularBuckets;
const auto& bucket = segment->GetBucket(stash_bid);
if (bucket.IsEmpty())
continue;

for (int slot_id = PrimeTable::Segment_t::kSlotNum - 1; slot_id >= 0; --slot_id) {
if (!bucket.IsBusy(slot_id))
continue;

auto evict_it = table->prime.GetIterator(it.GetInnerIt().segment_id(), stash_bid, slot_id);
// skip the iterator that we must keep or the sticky items.
if (evict_it == it.GetInnerIt() || evict_it->first.IsSticky())
continue;

PerformDeletion(evict_it, table);
++evicted;

if (freed_memory_fun() > memory_to_free) {
evict_succeeded = true;
break;
}
}
}

if (evicted) {
DVLOG(1) << "Evicted " << evicted << " stashed items, freed " << freed_memory_fun() << " bytes";
}

// Try normal buckets now. We iterate from largest slot to smallest across the whole segment.
for (int slot_id = PrimeTable::Segment_t::kSlotNum - 1; !evict_succeeded && slot_id >= 0;
--slot_id) {
for (unsigned i = 0; i < kNumRegularBuckets; ++i) {
unsigned bid = (it.GetInnerIt().bucket_id() + i) % kNumRegularBuckets;
const auto& bucket = segment->GetBucket(bid);
if (!bucket.IsBusy(slot_id))
continue;

auto evict_it = table->prime.GetIterator(it.GetInnerIt().segment_id(), bid, slot_id);
if (evict_it == it.GetInnerIt() || evict_it->first.IsSticky())
continue;

PerformDeletion(evict_it, table);
++evicted;

if (freed_memory_fun() > memory_to_free) {
evict_succeeded = true;
break;
}
}
}

if (evicted) {
DVLOG(1) << "Evicted total: " << evicted << " items, freed " << freed_memory_fun() << " bytes "
<< "success: " << evict_succeeded;

events_.evicted_keys += evicted;
events_.hard_evictions += evicted;
}

return freed_memory_fun();
};

void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key,
ConnectionState::ExecInfo* exec_info) {
db_arr_[db_indx]->watched_keys[key].push_back(exec_info);
Expand Down Expand Up @@ -1566,7 +1469,11 @@ void DbSlice::OnCbFinish() {
fetched_items_.clear();
}

void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const {
void DbSlice::CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const {
if (change_cb_.empty())
return;

DVLOG(2) << "Running callbacks for key " << key << " in dbid " << id;
FetchedItemsRestorer fetched_restorer(&fetched_items_);
std::unique_lock<LocalBlockingCounter> lk(block_counter_);

Expand Down
12 changes: 9 additions & 3 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,12 @@ class DbSlice {

// Deletes some amount of possible expired items.
DeleteExpiredStats DeleteExpiredStep(const Context& cntx, unsigned count);
void FreeMemWithEvictionStep(DbIndex db_indx, size_t increase_goal_bytes);

// Evicts items with dynamically allocated data from the primary table.
// Does not shrink tables.
// Returnes number of bytes freed due to evictions.
size_t FreeMemWithEvictionStep(DbIndex db_indx, size_t starting_segment_id,
size_t increase_goal_bytes);
void ScheduleForOffloadStep(DbIndex db_indx, size_t increase_goal_bytes);

int32_t GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) const;
Expand Down Expand Up @@ -469,6 +474,8 @@ class DbSlice {
// Resets events_ member. Used by CONFIG RESETSTAT
void ResetEvents();

// Controls the expiry/eviction state. The server may enter states where
// Both evictions and expiries will be stopped for a short period of time.
void SetExpireAllowed(bool is_allowed) {
expire_allowed_ = is_allowed;
}
Expand Down Expand Up @@ -508,7 +515,6 @@ class DbSlice {
void SendInvalidationTrackingMessage(std::string_view key);

void CreateDb(DbIndex index);
size_t EvictObjects(size_t memory_to_free, Iterator it, DbTable* table);

enum class UpdateStatsMode {
kReadStats,
Expand All @@ -534,7 +540,7 @@ class DbSlice {
return version_++;
}

void CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const;
void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const;

class LocalBlockingCounter {
public:
Expand Down
17 changes: 15 additions & 2 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ ABSL_FLAG(string, shard_round_robin_prefix, "",
ABSL_FLAG(uint32_t, mem_defrag_check_sec_interval, 10,
"Number of seconds between every defragmentation necessity check");

ABSL_FLAG(bool, enable_heartbeat_eviction, true,
"Enable eviction during heartbeat when memory is under pressure.");

namespace dfly {

using namespace tiering::literals;
Expand Down Expand Up @@ -641,8 +644,10 @@ void EngineShard::Heartbeat() {
}

// if our budget is below the limit
if (db_slice.memory_budget() < eviction_redline) {
db_slice.FreeMemWithEvictionStep(i, eviction_redline - db_slice.memory_budget());
if (db_slice.memory_budget() < eviction_redline && GetFlag(FLAGS_enable_heartbeat_eviction)) {
uint32_t starting_segment_id = rand() % pt->GetSegmentCount();
db_slice.FreeMemWithEvictionStep(i, starting_segment_id,
eviction_redline - db_slice.memory_budget());
}

if (UsedMemory() > tiering_offload_threshold) {
Expand All @@ -658,17 +663,25 @@ void EngineShard::Heartbeat() {
}

void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) {
VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms";

bool runs_global_periodic = (shard_id() == 0); // Only shard 0 runs global periodic.
unsigned global_count = 0;
int64_t last_stats_time = time(nullptr);
int64_t last_heartbeat_ms = INT64_MAX;

while (true) {
if (fiber_periodic_done_.WaitFor(period_ms)) {
VLOG(2) << "finished running engine shard periodic task";
return;
}

int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
if (now_ms - 5 * period_ms.count() > last_heartbeat_ms) {
VLOG(1) << "This heartbeat took " << now_ms - last_heartbeat_ms << "ms";
}
Heartbeat();
last_heartbeat_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;

if (runs_global_periodic) {
++global_count;
Expand Down
Loading

0 comments on commit e2d65a0

Please sign in to comment.