diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6321d493b315..ea3413e5ae61 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -35,5 +35,3 @@ repos: rev: v8.16.3 hooks: - id: gitleaks - - diff --git a/src/server/common.cc b/src/server/common.cc index 56520ff05186..603455c82710 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -481,4 +481,17 @@ BorrowedInterpreter::~BorrowedInterpreter() { ServerState::tlocal()->ReturnInterpreter(interpreter_); } +void LocalBlockingCounter::unlock() { + DCHECK(mutating_ > 0); + --mutating_; + if (mutating_ == 0) { + cond_var_.notify_all(); + } +} + +void LocalBlockingCounter::Wait() { + util::fb2::NoOpLock noop_lk_; + cond_var_.wait(noop_lk_, [this]() { return mutating_ == 0; }); +} + } // namespace dfly diff --git a/src/server/common.h b/src/server/common.h index 6b63aafa37bb..28c41da4e12f 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -387,4 +387,23 @@ struct BorrowedInterpreter { extern size_t serialization_max_chunk_size; +class LocalBlockingCounter { + public: + void lock() { + ++mutating_; + } + + void unlock(); + + void Wait(); + + bool IsBlocked() const { + return mutating_ > 0; + } + + private: + util::fb2::CondVarAny cond_var_; + size_t mutating_ = 0; +}; + } // namespace dfly diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 21a85f48d5fa..ad7aed04eb5a 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -140,6 +140,11 @@ bool PrimeEvictionPolicy::CanGrow(const PrimeTable& tbl) const { unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) { unsigned res = 0; + + if (db_slice_->WillBlockOnJournalWrite()) { + return res; + } + // bool should_print = (eb.key_hash % 128) == 0; // based on tests - it's more efficient to pass regular buckets to gc. @@ -165,7 +170,7 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e } unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) { - if (!can_evict_) + if (!can_evict_ || db_slice_->WillBlockOnJournalWrite()) return 0; constexpr size_t kNumStashBuckets = ABSL_ARRAYSIZE(eb.probes.by_type.stash_buckets); @@ -192,8 +197,6 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT if (auto journal = db_slice_->shard_owner()->journal(); journal) { RecordExpiry(cntx_.db_index, key, false); } - // Safe we already acquired util::fb2::LockGuard lk(db_slice_->GetSerializationMutex()); - // on the flows that call this function db_slice_->PerformDeletion(DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)), table); ++evicted_; @@ -459,7 +462,6 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: if (caching_mode_ && IsValid(res.it)) { if (!change_cb_.empty()) { FetchedItemsRestorer fetched_restorer(&fetched_items_); - util::fb2::LockGuard lk(local_mu_); auto bump_cb = [&](PrimeTable::bucket_iterator bit) { CallChangeCallbacks(cntx.db_index, key, bit); }; @@ -552,7 +554,6 @@ OpResult DbSlice::AddOrFindInternal(const Context& cnt CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status; FetchedItemsRestorer fetched_restorer(&fetched_items_); - util::fb2::LockGuard lk(local_mu_); // It's a new entry. CallChangeCallbacks(cntx.db_index, key, {key}); @@ -668,8 +669,6 @@ void DbSlice::ActivateDb(DbIndex db_ind) { } bool DbSlice::Del(Context cntx, Iterator it) { - util::fb2::LockGuard lk(local_mu_); - if (!IsValid(it)) { return false; } @@ -735,7 +734,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) { PrimeTable::Cursor cursor; uint64_t i = 0; do { - PrimeTable::Cursor next = Traverse(pt, cursor, del_entry_cb); + PrimeTable::Cursor next = pt->Traverse(cursor, del_entry_cb); ++i; cursor = next; if (i % 100 == 0) { @@ -792,10 +791,6 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { } void DbSlice::FlushDb(DbIndex db_ind) { - // We should not flush if serialization of a big value is in progress because this - // could lead to UB or assertion failures (while DashTable::Traverse is iterating over - // a logical bucket). - util::fb2::LockGuard lk(local_mu_); // clear client tracking map. client_tracking_map_.clear(); @@ -817,7 +812,6 @@ void DbSlice::FlushDb(DbIndex db_ind) { } void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) { - util::fb2::LockGuard lk(local_mu_); uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates. auto& db = *db_arr_[db_ind]; size_t table_before = db.expire.mem_usage(); @@ -827,7 +821,6 @@ void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) { } bool DbSlice::RemoveExpire(DbIndex db_ind, Iterator main_it) { - util::fb2::LockGuard lk(local_mu_); if (main_it->second.HasExpire()) { auto& db = *db_arr_[db_ind]; size_t table_before = db.expire.mem_usage(); @@ -1056,7 +1049,6 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) { FetchedItemsRestorer fetched_restorer(&fetched_items_); - util::fb2::LockGuard lk(local_mu_); CallChangeCallbacks(db_ind, key, ChangeReq{it.GetInnerIt()}); it.GetInnerIt().SetVersion(NextVersion()); } @@ -1137,12 +1129,17 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato const_cast(this)->PerformDeletion(Iterator(it, StringOrView::FromView(key)), ExpIterator(expire_it, StringOrView::FromView(key)), db.get()); + ++events_.expired_keys; return {PrimeIterator{}, ExpireIterator{}}; } void DbSlice::ExpireAllIfNeeded() { + // We hold no locks to any of the keys so we should Wait() here such that + // we don't preempt in ExpireIfNeeded + block_counter_.Wait(); + for (DbIndex db_index = 0; db_index < db_arr_.size(); db_index++) { if (!db_arr_[db_index]) continue; @@ -1159,7 +1156,7 @@ void DbSlice::ExpireAllIfNeeded() { ExpireTable::Cursor cursor; do { - cursor = Traverse(&db.expire, cursor, cb); + cursor = db.expire.Traverse(cursor, cb); } while (cursor); } } @@ -1170,6 +1167,7 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) { void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) { FetchedItemsRestorer fetched_restorer(&fetched_items_); + std::unique_lock lk(block_counter_); uint64_t bucket_version = it.GetVersion(); // change_cb_ is ordered by version. @@ -1193,7 +1191,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_ //! Unregisters the callback. void DbSlice::UnregisterOnChange(uint64_t id) { - util::fb2::LockGuard lk(local_mu_); + block_counter_.Wait(); auto it = find_if(change_cb_.begin(), change_cb_.end(), [id](const auto& cb) { return cb.first == id; }); CHECK(it != change_cb_.end()); @@ -1354,13 +1352,10 @@ void DbSlice::CreateDb(DbIndex db_ind) { void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key, ConnectionState::ExecInfo* exec_info) { // Because we might insert while another fiber is preempted - util::fb2::LockGuard lk(local_mu_); db_arr_[db_indx]->watched_keys[key].push_back(exec_info); } void DbSlice::UnregisterConnectionWatches(const ConnectionState::ExecInfo* exec_info) { - // Because we might remove while another fiber is preempted and miss a notification - util::fb2::LockGuard lk(local_mu_); for (const auto& [db_indx, key] : exec_info->watched_keys) { auto& watched_keys = db_arr_[db_indx]->watched_keys; if (auto it = watched_keys.find(key); it != watched_keys.end()) { @@ -1536,6 +1531,7 @@ void DbSlice::OnCbFinish() { } void DbSlice::CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const { + std::unique_lock lk(block_counter_); if (change_cb_.empty()) return; diff --git a/src/server/db_slice.h b/src/server/db_slice.h index a9c72823d60d..a5e97f04474b 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -305,34 +305,33 @@ class DbSlice { AddOrFindResult& operator=(ItAndUpdater&& o); }; - OpResult AddOrFind(const Context& cntx, std::string_view key) - ABSL_LOCKS_EXCLUDED(local_mu_); + OpResult AddOrFind(const Context& cntx, std::string_view key); // Same as AddOrSkip, but overwrites in case entry exists. OpResult AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj, - uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_); + uint64_t expire_at_ms); // Adds a new entry. Requires: key does not exist in this slice. // Returns the iterator to the newly added entry. // Returns OpStatus::OUT_OF_MEMORY if bad_alloc is thrown OpResult AddNew(const Context& cntx, std::string_view key, PrimeValue obj, - uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_); + uint64_t expire_at_ms); // Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry // already expired and was deleted; facade::OpResult UpdateExpire(const Context& cntx, Iterator prime_it, ExpIterator exp_it, - const ExpireParams& params) ABSL_LOCKS_EXCLUDED(local_mu_); + const ExpireParams& params); // Adds expiry information. - void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_); + void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at); // Removes the corresponing expiry information if exists. // Returns true if expiry existed (and removed). - bool RemoveExpire(DbIndex db_ind, Iterator main_it) ABSL_LOCKS_EXCLUDED(local_mu_); + bool RemoveExpire(DbIndex db_ind, Iterator main_it); // Either adds or removes (if at == 0) expiry. Returns true if a change was made. // Does not change expiry if at != 0 and expiry already exists. - bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_); + bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at); void SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag); uint32_t GetMCFlag(DbIndex db_ind, const PrimeKey& key) const; @@ -343,12 +342,12 @@ class DbSlice { // Delete a key referred by its iterator. void PerformDeletion(Iterator del_it, DbTable* table); - bool Del(Context cntx, Iterator it) ABSL_LOCKS_EXCLUDED(local_mu_); + bool Del(Context cntx, Iterator it); constexpr static DbIndex kDbAll = 0xFFFF; // Flushes db_ind or all databases if kDbAll is passed - void FlushDb(DbIndex db_ind) ABSL_LOCKS_EXCLUDED(local_mu_); + void FlushDb(DbIndex db_ind); // Flushes the data of given slot ranges. void FlushSlots(cluster::SlotRanges slot_ranges); @@ -439,7 +438,7 @@ class DbSlice { void FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound); //! Unregisters the callback. - void UnregisterOnChange(uint64_t id) ABSL_LOCKS_EXCLUDED(local_mu_); + void UnregisterOnChange(uint64_t id); struct DeleteExpiredStats { uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed). @@ -496,25 +495,18 @@ class DbSlice { client_tracking_map_[key].insert(conn_ref); } - // Provides access to the internal lock of db_slice for flows that serialize - // entries with preemption and need to synchronize with Traverse below which - // acquires the same lock. - ThreadLocalMutex& GetSerializationMutex() { - return local_mu_; - } - - // Wrapper around DashTable::Traverse that allows preemptions - template - PrimeTable::Cursor Traverse(DashTable* pt, PrimeTable::Cursor cursor, Cb&& cb) - ABSL_LOCKS_EXCLUDED(local_mu_) { - util::fb2::LockGuard lk(local_mu_); - return pt->Traverse(cursor, std::forward(cb)); - } - // Does not check for non supported events. Callers must parse the string and reject it // if it's not empty and not EX. void SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events); + bool WillBlockOnJournalWrite() const { + return block_counter_.IsBlocked(); + } + + LocalBlockingCounter* BlockingCounter() { + return &block_counter_; + } + private: void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key); void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); @@ -571,8 +563,11 @@ class DbSlice { void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const; - // Used to provide exclusive access while Traversing segments - mutable ThreadLocalMutex local_mu_; + // We need this because registered callbacks might yield and when they do so we want + // to avoid Heartbeat or Flushing the db. + // This counter protects us against this case. + mutable LocalBlockingCounter block_counter_; + ShardId shard_id_; uint8_t caching_mode_ : 1; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 2b0743649463..326fd569ee76 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -301,7 +301,7 @@ void DoBuildObjHist(EngineShard* shard, ConnectionContext* cntx, ObjHistMap* obj continue; PrimeTable::Cursor cursor; do { - cursor = db_slice.Traverse(&dbt->prime, cursor, [&](PrimeIterator it) { + cursor = dbt->prime.Traverse(cursor, [&](PrimeIterator it) { unsigned obj_type = it->second.ObjType(); auto& hist_ptr = (*obj_hist_map)[obj_type]; if (!hist_ptr) { diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index b2b806c06cd2..c30f2ab51237 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -595,6 +595,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { DCHECK(shard); + error_code ec = flow->saver->StopFullSyncInShard(shard); if (ec) { cntx->ReportError(ec); @@ -697,6 +698,7 @@ void DflyCmd::BreakStalledFlowsInShard() { return; ShardId sid = EngineShard::tlocal()->shard_id(); + vector deleted; for (auto [sync_id, replica_ptr] : replica_infos_) { diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index a90e37838182..467cd32ac189 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -329,7 +329,7 @@ bool EngineShard::DoDefrag() { uint64_t attempts = 0; do { - cur = slice.Traverse(prime_table, cur, [&](PrimeIterator it) { + cur = prime_table->Traverse(cur, [&](PrimeIterator it) { // for each value check whether we should move it because it // seats on underutilized page of memory, and if so, do it. bool did = it->second.DefragIfNeeded(threshold); @@ -660,13 +660,24 @@ void EngineShard::Heartbeat() { CacheStats(); + // TODO: iterate over all namespaces + DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); + // Skip heartbeat if we are serializing a big value + static auto start = std::chrono::system_clock::now(); + if (db_slice.WillBlockOnJournalWrite()) { + const auto elapsed = std::chrono::system_clock::now() - start; + if (elapsed > std::chrono::seconds(1)) { + LOG_EVERY_T(WARNING, 5) << "Stalled heartbeat() fiber for " << elapsed.count() + << " seconds because of big value serialization"; + } + return; + } + start = std::chrono::system_clock::now(); + if (!IsReplica()) { // Never run expiry/evictions on replica. RetireExpiredAndEvict(); } - // TODO: iterate over all namespaces - DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); - // Offset CoolMemoryUsage when consider background offloading. // TODO: Another approach could be is to align the approach similarly to how we do with // FreeMemWithEvictionStep, i.e. if memory_budget is below the limit. @@ -690,55 +701,49 @@ void EngineShard::Heartbeat() { } void EngineShard::RetireExpiredAndEvict() { - // TODO: iterate over all namespaces - DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); - // Some of the functions below might acquire the same lock again so we need to unlock it - // asap. We won't yield before we relock the mutex again, so the code below is atomic - // in respect to preemptions of big values. An example of that is the call to - // DeleteExpiredStep() below, which eventually calls ExpireIfNeeded() - // and within that the call to RecordExpiry() will trigger the registered - // callback OnJournalEntry which locks the exact same mutex. - // We need to lock below and immediately release because there should be no other fiber - // that is serializing a big value. - { std::unique_lock lk(db_slice.GetSerializationMutex()); } - constexpr double kTtlDeleteLimit = 200; - constexpr double kRedLimitFactor = 0.1; - - uint32_t traversed = GetMovingSum6(TTL_TRAVERSE); - uint32_t deleted = GetMovingSum6(TTL_DELETE); - unsigned ttl_delete_target = 5; - - if (deleted > 10) { - // deleted should be <= traversed. - // hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit). - // The higher ttl_delete_target the more likely we have lots of expired items that need - // to be deleted. - ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10); - } - - ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size(); - - DbContext db_cntx; - db_cntx.time_now_ms = GetCurrentTimeMs(); - - for (unsigned i = 0; i < db_slice.db_array_size(); ++i) { - if (!db_slice.IsDbValid(i)) - continue; + { + FiberAtomicGuard guard; + // TODO: iterate over all namespaces + DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); + constexpr double kTtlDeleteLimit = 200; + constexpr double kRedLimitFactor = 0.1; + + uint32_t traversed = GetMovingSum6(TTL_TRAVERSE); + uint32_t deleted = GetMovingSum6(TTL_DELETE); + unsigned ttl_delete_target = 5; + + if (deleted > 10) { + // deleted should be <= traversed. + // hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit). + // The higher ttl_delete_target the more likely we have lots of expired items that need + // to be deleted. + ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10); + } - db_cntx.db_index = i; - auto [pt, expt] = db_slice.GetTables(i); - if (expt->size() > pt->size() / 4) { - DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target); + ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size(); - counter_[TTL_TRAVERSE].IncBy(stats.traversed); - counter_[TTL_DELETE].IncBy(stats.deleted); - } + DbContext db_cntx; + db_cntx.time_now_ms = GetCurrentTimeMs(); + + for (unsigned i = 0; i < db_slice.db_array_size(); ++i) { + if (!db_slice.IsDbValid(i)) + continue; + + db_cntx.db_index = i; + auto [pt, expt] = db_slice.GetTables(i); + if (expt->size() > pt->size() / 4) { + DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target); - // if our budget is below the limit - if (db_slice.memory_budget() < eviction_redline && GetFlag(FLAGS_enable_heartbeat_eviction)) { - uint32_t starting_segment_id = rand() % pt->GetSegmentCount(); - db_slice.FreeMemWithEvictionStep(i, starting_segment_id, - eviction_redline - db_slice.memory_budget()); + counter_[TTL_TRAVERSE].IncBy(stats.traversed); + counter_[TTL_DELETE].IncBy(stats.deleted); + } + + // if our budget is below the limit + if (db_slice.memory_budget() < eviction_redline && GetFlag(FLAGS_enable_heartbeat_eviction)) { + uint32_t starting_segment_id = rand() % pt->GetSegmentCount(); + db_slice.FreeMemWithEvictionStep(i, starting_segment_id, + eviction_redline - db_slice.memory_budget()); + } } } diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index fd1f2743d276..140c94b3d992 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -597,6 +597,11 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, auto& db_slice = op_args.GetDbSlice(); DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index)); + // ScanCb can preempt due to journaling expired entries and we need to make sure that + // we enter the callback in a timing when journaling will not cause preemptions. Otherwise, + // the bucket might change as we Traverse and yield. + db_slice.BlockingCounter()->Wait(); + util::FiberAtomicGuard guard; unsigned cnt = 0; @@ -607,11 +612,9 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_cntx.db_index); string scratch; do { - cur = db_slice.Traverse(prime_table, cur, [&](PrimeIterator it) { - cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); - }); + cur = prime_table->Traverse( + cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); }); } while (cur && cnt < scan_opts.limit); - VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value(); *cursor = cur.value(); } diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 8f9fe8750198..5677c5487758 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -208,8 +208,7 @@ void RestoreStreamer::Run() { do { if (fiber_cancelled_) return; - - cursor = db_slice_->Traverse(pt, cursor, [&](PrimeTable::bucket_iterator it) { + cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) { if (fiber_cancelled_) // Could be cancelled any time as Traverse may preempt return; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 1f4612a2d78f..7327ad49b627 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -159,7 +159,15 @@ std::string AbslUnparseFlag(dfly::CompressionMode flag) { } dfly::CompressionMode GetDefaultCompressionMode() { - return absl::GetFlag(FLAGS_compression_mode); + const auto flag = absl::GetFlag(FLAGS_compression_mode); + if (serialization_max_chunk_size == 0) { + return flag; + } + + static bool once = flag != dfly::CompressionMode::NONE; + LOG_IF(WARNING, once) << "Setting CompressionMode to NONE because big value serialization is on"; + once = false; + return dfly::CompressionMode::NONE; } uint8_t RdbObjectType(const PrimeValue& pv) { diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index ae3b9272bde2..913598fec7e8 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -122,6 +122,7 @@ class RdbSaver { struct SnapshotStats { size_t current_keys = 0; size_t total_keys = 0; + size_t big_value_preemptions = 0; }; SnapshotStats GetCurrentSnapshotProgress() const; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 573fa0ee7477..b3ff9c1676ec 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2346,6 +2346,7 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) { append("total_net_output_bytes", reply_stats.io_write_bytes); append("rdb_save_usec", m.coordinator_stats.rdb_save_usec); append("rdb_save_count", m.coordinator_stats.rdb_save_count); + append("big_value_preemptions", m.coordinator_stats.big_value_preemptions); append("instantaneous_input_kbps", -1); append("instantaneous_output_kbps", -1); append("rejected_connections", -1); diff --git a/src/server/server_state.cc b/src/server/server_state.cc index a906c2de2ffd..3cf7dc653189 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -27,7 +27,7 @@ ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) { } ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { - static_assert(sizeof(Stats) == 17 * 8, "Stats size mismatch"); + static_assert(sizeof(Stats) == 18 * 8, "Stats size mismatch"); #define ADD(x) this->x += (other.x) @@ -49,6 +49,9 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { ADD(blocked_on_interpreter); ADD(rdb_save_usec); ADD(rdb_save_count); + + ADD(big_value_preemptions); + ADD(oom_error_cmd_cnt); if (this->tx_width_freq_arr.size() > 0) { diff --git a/src/server/server_state.h b/src/server/server_state.h index f8be766a5816..0cfc48be1634 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -122,6 +122,8 @@ class ServerState { // public struct - to allow initialization. uint64_t rdb_save_usec = 0; uint64_t rdb_save_count = 0; + uint64_t big_value_preemptions = 0; + // Number of times we rejected command dispatch due to OOM condition. uint64_t oom_error_cmd_cnt = 0; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 5526240068f5..528843ef1e39 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -18,6 +18,7 @@ #include "server/journal/journal.h" #include "server/rdb_extensions.h" #include "server/rdb_save.h" +#include "server/server_state.h" #include "server/tiered_storage.h" #include "util/fibers/synchronization.h" @@ -85,6 +86,8 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot if (bytes_serialized > flush_threshold) { size_t serialized = FlushSerialized(flush_state); VLOG(2) << "FlushSerialized " << serialized << " bytes"; + auto& stats = ServerState::tlocal()->stats; + ++stats.big_value_preemptions; } }; } @@ -165,8 +168,9 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx; do { - if (cll->IsCancelled()) + if (cll->IsCancelled()) { return; + } PrimeTable::Cursor next = pt->TraverseBuckets(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); @@ -244,6 +248,8 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { } bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { + std::lock_guard guard(big_value_mu_); + ++stats_.savecb_calls; auto check = [&](auto v) { @@ -256,14 +262,19 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { return true; }; - uint64_t v = it.GetVersion(); - if (!check(v)) { + if (!check(it.GetVersion())) { return false; } db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it), snapshot_version_); + auto* blocking_counter = db_slice_->BlockingCounter(); + // Locking this never preempts. We merely just increment the underline counter such that + // if SerializeBucket preempts, Heartbeat() won't run because the blocking counter is not + // zero. + std::lock_guard blocking_counter_guard(*blocking_counter); + stats_.loop_serialized += SerializeBucket(current_db_, it); return false; @@ -372,6 +383,8 @@ bool SliceSnapshot::PushSerialized(bool force) { } void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { + std::lock_guard guard(big_value_mu_); + PrimeTable* table = db_slice_->GetTables(db_index).first; const PrimeTable::bucket_iterator* bit = req.update(); @@ -396,7 +409,7 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) // To enable journal flushing to sync after non auto journal command is executed we call // TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no // additional journal change to serialize, it simply invokes PushSerialized. - std::unique_lock lk(db_slice_->GetSerializationMutex()); + std::lock_guard guard(big_value_mu_); if (item.opcode != journal::Op::NOOP) { serializer_->WriteJournalEntry(item.data); } diff --git a/src/server/snapshot.h b/src/server/snapshot.h index e3839fb9bd88..98ef11c78166 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -169,6 +169,8 @@ class SliceSnapshot { size_t keys_total = 0; } stats_; + ThreadLocalMutex big_value_mu_; + std::function on_push_; std::function on_snapshot_finish_; }; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 7d4e7941db7c..3173e304ddf9 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1424,7 +1424,7 @@ async def test_migration_with_key_ttl(df_factory): assert await nodes[1].client.execute_command("stick k_sticky") == 0 -@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "serialization_max_chunk_size": 0}) async def test_network_disconnect_during_migration(df_factory): instances = [ df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) @@ -1961,7 +1961,7 @@ async def node1size0(): assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}") -@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) +@dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "serialization_max_chunk_size": 0}) @pytest.mark.asyncio async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory): instances = [ @@ -2424,7 +2424,7 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact @pytest.mark.asyncio -@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "serialization_max_chunk_size": 0}) async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_seeder_factory): # Timeout set to 3 seconds because we must first saturate the socket before we get the timeout instances = [ diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index 18a0bc8324c5..3814d970c9ef 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -425,9 +425,9 @@ def create(self, existing_port=None, path=None, version=100, **kwargs) -> DflyIn args.setdefault("list_experimental_v2") args.setdefault("log_dir", self.params.log_dir) - if version >= 1.21: + if version >= 1.21 and "serialization_max_chunk_size" not in args: # Add 1 byte limit for big values - args.setdefault("serialization_max_chunk_size", 0) + args.setdefault("serialization_max_chunk_size", 1) for k, v in args.items(): args[k] = v.format(**self.params.env) if isinstance(v, str) else v diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 8d160f4e8715..6ff68fc390e9 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2372,9 +2372,15 @@ async def test_replicate_old_master( dfly_version = "v1.19.2" released_dfly_path = download_dragonfly_release(dfly_version) - master = df_factory.create(version=1.19, path=released_dfly_path, cluster_mode=cluster_mode) + master = df_factory.create( + version=1.19, + path=released_dfly_path, + cluster_mode=cluster_mode, + ) replica = df_factory.create( - cluster_mode=cluster_mode, cluster_announce_ip=announce_ip, announce_port=announce_port + cluster_mode=cluster_mode, + cluster_announce_ip=announce_ip, + announce_port=announce_port, ) df_factory.start_all([master, replica])