Skip to content

Commit

Permalink
chore: remove DbSlice mutex and add ConditionFlag in SliceSnapshot (#…
Browse files Browse the repository at this point in the history
…4073)

* remove DbSlice mutex
* add ConditionFlag in SliceSnapshot
* disable compression when big value serialization is on
* add metrics

---------

Signed-off-by: kostas <[email protected]>
  • Loading branch information
kostasrim authored Dec 5, 2024
1 parent 7ccad66 commit 267d5ab
Show file tree
Hide file tree
Showing 20 changed files with 186 additions and 120 deletions.
2 changes: 0 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,3 @@ repos:
rev: v8.16.3
hooks:
- id: gitleaks


13 changes: 13 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,4 +481,17 @@ BorrowedInterpreter::~BorrowedInterpreter() {
ServerState::tlocal()->ReturnInterpreter(interpreter_);
}

void LocalBlockingCounter::unlock() {
DCHECK(mutating_ > 0);
--mutating_;
if (mutating_ == 0) {
cond_var_.notify_all();
}
}

void LocalBlockingCounter::Wait() {
util::fb2::NoOpLock noop_lk_;
cond_var_.wait(noop_lk_, [this]() { return mutating_ == 0; });
}

} // namespace dfly
19 changes: 19 additions & 0 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,4 +387,23 @@ struct BorrowedInterpreter {

extern size_t serialization_max_chunk_size;

class LocalBlockingCounter {
public:
void lock() {
++mutating_;
}

void unlock();

void Wait();

bool IsBlocked() const {
return mutating_ > 0;
}

private:
util::fb2::CondVarAny cond_var_;
size_t mutating_ = 0;
};

} // namespace dfly
36 changes: 16 additions & 20 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ bool PrimeEvictionPolicy::CanGrow(const PrimeTable& tbl) const {

unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) {
unsigned res = 0;

if (db_slice_->WillBlockOnJournalWrite()) {
return res;
}

// bool should_print = (eb.key_hash % 128) == 0;

// based on tests - it's more efficient to pass regular buckets to gc.
Expand All @@ -165,7 +170,7 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e
}

unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) {
if (!can_evict_)
if (!can_evict_ || db_slice_->WillBlockOnJournalWrite())
return 0;

constexpr size_t kNumStashBuckets = ABSL_ARRAYSIZE(eb.probes.by_type.stash_buckets);
Expand All @@ -192,8 +197,6 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
if (auto journal = db_slice_->shard_owner()->journal(); journal) {
RecordExpiry(cntx_.db_index, key, false);
}
// Safe we already acquired util::fb2::LockGuard lk(db_slice_->GetSerializationMutex());
// on the flows that call this function
db_slice_->PerformDeletion(DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)), table);

++evicted_;
Expand Down Expand Up @@ -459,7 +462,6 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
if (caching_mode_ && IsValid(res.it)) {
if (!change_cb_.empty()) {
FetchedItemsRestorer fetched_restorer(&fetched_items_);
util::fb2::LockGuard lk(local_mu_);
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
CallChangeCallbacks(cntx.db_index, key, bit);
};
Expand Down Expand Up @@ -552,7 +554,6 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status;

FetchedItemsRestorer fetched_restorer(&fetched_items_);
util::fb2::LockGuard lk(local_mu_);

// It's a new entry.
CallChangeCallbacks(cntx.db_index, key, {key});
Expand Down Expand Up @@ -668,8 +669,6 @@ void DbSlice::ActivateDb(DbIndex db_ind) {
}

bool DbSlice::Del(Context cntx, Iterator it) {
util::fb2::LockGuard lk(local_mu_);

if (!IsValid(it)) {
return false;
}
Expand Down Expand Up @@ -735,7 +734,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
PrimeTable::Cursor cursor;
uint64_t i = 0;
do {
PrimeTable::Cursor next = Traverse(pt, cursor, del_entry_cb);
PrimeTable::Cursor next = pt->Traverse(cursor, del_entry_cb);
++i;
cursor = next;
if (i % 100 == 0) {
Expand Down Expand Up @@ -792,10 +791,6 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
}

void DbSlice::FlushDb(DbIndex db_ind) {
// We should not flush if serialization of a big value is in progress because this
// could lead to UB or assertion failures (while DashTable::Traverse is iterating over
// a logical bucket).
util::fb2::LockGuard lk(local_mu_);
// clear client tracking map.
client_tracking_map_.clear();

Expand All @@ -817,7 +812,6 @@ void DbSlice::FlushDb(DbIndex db_ind) {
}

void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) {
util::fb2::LockGuard lk(local_mu_);
uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates.
auto& db = *db_arr_[db_ind];
size_t table_before = db.expire.mem_usage();
Expand All @@ -827,7 +821,6 @@ void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) {
}

bool DbSlice::RemoveExpire(DbIndex db_ind, Iterator main_it) {
util::fb2::LockGuard lk(local_mu_);
if (main_it->second.HasExpire()) {
auto& db = *db_arr_[db_ind];
size_t table_before = db.expire.mem_usage();
Expand Down Expand Up @@ -1056,7 +1049,6 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const

void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
FetchedItemsRestorer fetched_restorer(&fetched_items_);
util::fb2::LockGuard lk(local_mu_);
CallChangeCallbacks(db_ind, key, ChangeReq{it.GetInnerIt()});
it.GetInnerIt().SetVersion(NextVersion());
}
Expand Down Expand Up @@ -1137,12 +1129,17 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
const_cast<DbSlice*>(this)->PerformDeletion(Iterator(it, StringOrView::FromView(key)),
ExpIterator(expire_it, StringOrView::FromView(key)),
db.get());

++events_.expired_keys;

return {PrimeIterator{}, ExpireIterator{}};
}

void DbSlice::ExpireAllIfNeeded() {
// We hold no locks to any of the keys so we should Wait() here such that
// we don't preempt in ExpireIfNeeded
block_counter_.Wait();

for (DbIndex db_index = 0; db_index < db_arr_.size(); db_index++) {
if (!db_arr_[db_index])
continue;
Expand All @@ -1159,7 +1156,7 @@ void DbSlice::ExpireAllIfNeeded() {

ExpireTable::Cursor cursor;
do {
cursor = Traverse(&db.expire, cursor, cb);
cursor = db.expire.Traverse(cursor, cb);
} while (cursor);
}
}
Expand All @@ -1170,6 +1167,7 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {

void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) {
FetchedItemsRestorer fetched_restorer(&fetched_items_);
std::unique_lock<LocalBlockingCounter> lk(block_counter_);

uint64_t bucket_version = it.GetVersion();
// change_cb_ is ordered by version.
Expand All @@ -1193,7 +1191,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_

//! Unregisters the callback.
void DbSlice::UnregisterOnChange(uint64_t id) {
util::fb2::LockGuard lk(local_mu_);
block_counter_.Wait();
auto it = find_if(change_cb_.begin(), change_cb_.end(),
[id](const auto& cb) { return cb.first == id; });
CHECK(it != change_cb_.end());
Expand Down Expand Up @@ -1354,13 +1352,10 @@ void DbSlice::CreateDb(DbIndex db_ind) {
void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key,
ConnectionState::ExecInfo* exec_info) {
// Because we might insert while another fiber is preempted
util::fb2::LockGuard lk(local_mu_);
db_arr_[db_indx]->watched_keys[key].push_back(exec_info);
}

void DbSlice::UnregisterConnectionWatches(const ConnectionState::ExecInfo* exec_info) {
// Because we might remove while another fiber is preempted and miss a notification
util::fb2::LockGuard lk(local_mu_);
for (const auto& [db_indx, key] : exec_info->watched_keys) {
auto& watched_keys = db_arr_[db_indx]->watched_keys;
if (auto it = watched_keys.find(key); it != watched_keys.end()) {
Expand Down Expand Up @@ -1536,6 +1531,7 @@ void DbSlice::OnCbFinish() {
}

void DbSlice::CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const {
std::unique_lock<LocalBlockingCounter> lk(block_counter_);
if (change_cb_.empty())
return;

Expand Down
51 changes: 23 additions & 28 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,34 +305,33 @@ class DbSlice {
AddOrFindResult& operator=(ItAndUpdater&& o);
};

OpResult<AddOrFindResult> AddOrFind(const Context& cntx, std::string_view key)
ABSL_LOCKS_EXCLUDED(local_mu_);
OpResult<AddOrFindResult> AddOrFind(const Context& cntx, std::string_view key);

// Same as AddOrSkip, but overwrites in case entry exists.
OpResult<AddOrFindResult> AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_);
uint64_t expire_at_ms);

// Adds a new entry. Requires: key does not exist in this slice.
// Returns the iterator to the newly added entry.
// Returns OpStatus::OUT_OF_MEMORY if bad_alloc is thrown
OpResult<ItAndUpdater> AddNew(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_);
uint64_t expire_at_ms);

// Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry
// already expired and was deleted;
facade::OpResult<int64_t> UpdateExpire(const Context& cntx, Iterator prime_it, ExpIterator exp_it,
const ExpireParams& params) ABSL_LOCKS_EXCLUDED(local_mu_);
const ExpireParams& params);

// Adds expiry information.
void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_);
void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at);

// Removes the corresponing expiry information if exists.
// Returns true if expiry existed (and removed).
bool RemoveExpire(DbIndex db_ind, Iterator main_it) ABSL_LOCKS_EXCLUDED(local_mu_);
bool RemoveExpire(DbIndex db_ind, Iterator main_it);

// Either adds or removes (if at == 0) expiry. Returns true if a change was made.
// Does not change expiry if at != 0 and expiry already exists.
bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_);
bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at);

void SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag);
uint32_t GetMCFlag(DbIndex db_ind, const PrimeKey& key) const;
Expand All @@ -343,12 +342,12 @@ class DbSlice {
// Delete a key referred by its iterator.
void PerformDeletion(Iterator del_it, DbTable* table);

bool Del(Context cntx, Iterator it) ABSL_LOCKS_EXCLUDED(local_mu_);
bool Del(Context cntx, Iterator it);

constexpr static DbIndex kDbAll = 0xFFFF;

// Flushes db_ind or all databases if kDbAll is passed
void FlushDb(DbIndex db_ind) ABSL_LOCKS_EXCLUDED(local_mu_);
void FlushDb(DbIndex db_ind);

// Flushes the data of given slot ranges.
void FlushSlots(cluster::SlotRanges slot_ranges);
Expand Down Expand Up @@ -439,7 +438,7 @@ class DbSlice {
void FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound);

//! Unregisters the callback.
void UnregisterOnChange(uint64_t id) ABSL_LOCKS_EXCLUDED(local_mu_);
void UnregisterOnChange(uint64_t id);

struct DeleteExpiredStats {
uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed).
Expand Down Expand Up @@ -496,25 +495,18 @@ class DbSlice {
client_tracking_map_[key].insert(conn_ref);
}

// Provides access to the internal lock of db_slice for flows that serialize
// entries with preemption and need to synchronize with Traverse below which
// acquires the same lock.
ThreadLocalMutex& GetSerializationMutex() {
return local_mu_;
}

// Wrapper around DashTable::Traverse that allows preemptions
template <typename Cb, typename DashTable>
PrimeTable::Cursor Traverse(DashTable* pt, PrimeTable::Cursor cursor, Cb&& cb)
ABSL_LOCKS_EXCLUDED(local_mu_) {
util::fb2::LockGuard lk(local_mu_);
return pt->Traverse(cursor, std::forward<Cb>(cb));
}

// Does not check for non supported events. Callers must parse the string and reject it
// if it's not empty and not EX.
void SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events);

bool WillBlockOnJournalWrite() const {
return block_counter_.IsBlocked();
}

LocalBlockingCounter* BlockingCounter() {
return &block_counter_;
}

private:
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
Expand Down Expand Up @@ -571,8 +563,11 @@ class DbSlice {

void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const;

// Used to provide exclusive access while Traversing segments
mutable ThreadLocalMutex local_mu_;
// We need this because registered callbacks might yield and when they do so we want
// to avoid Heartbeat or Flushing the db.
// This counter protects us against this case.
mutable LocalBlockingCounter block_counter_;

ShardId shard_id_;
uint8_t caching_mode_ : 1;

Expand Down
2 changes: 1 addition & 1 deletion src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ void DoBuildObjHist(EngineShard* shard, ConnectionContext* cntx, ObjHistMap* obj
continue;
PrimeTable::Cursor cursor;
do {
cursor = db_slice.Traverse(&dbt->prime, cursor, [&](PrimeIterator it) {
cursor = dbt->prime.Traverse(cursor, [&](PrimeIterator it) {
unsigned obj_type = it->second.ObjType();
auto& hist_ptr = (*obj_hist_map)[obj_type];
if (!hist_ptr) {
Expand Down
2 changes: 2 additions & 0 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha

OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
DCHECK(shard);

error_code ec = flow->saver->StopFullSyncInShard(shard);
if (ec) {
cntx->ReportError(ec);
Expand Down Expand Up @@ -697,6 +698,7 @@ void DflyCmd::BreakStalledFlowsInShard() {
return;

ShardId sid = EngineShard::tlocal()->shard_id();

vector<uint32_t> deleted;

for (auto [sync_id, replica_ptr] : replica_infos_) {
Expand Down
Loading

0 comments on commit 267d5ab

Please sign in to comment.