diff --git a/src/server/common.cc b/src/server/common.cc index 00ee8e205b3c..56520ff05186 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -430,7 +430,7 @@ ThreadLocalMutex::ThreadLocalMutex() { } ThreadLocalMutex::~ThreadLocalMutex() { - // DCHECK_EQ(EngineShard::tlocal(), shard_); + DCHECK_EQ(EngineShard::tlocal(), shard_); } void ThreadLocalMutex::lock() { diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index dd78588bbef8..01c4dee120e4 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -206,8 +206,6 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT if (auto journal = db_slice_->shard_owner()->journal(); journal) { RecordExpiry(cntx_.db_index, key); } - // Safe we already acquired util::fb2::LockGuard lk(db_slice_->GetSerializationMutex()); - // on the flows that call this function db_slice_->PerformDeletion(DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)), table); ++evicted_; @@ -1131,6 +1129,11 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato << ", prime table size: " << db->prime.size() << util::fb2::GetStacktrace(); } + // Replicate expiry + if (auto journal = owner_->journal(); journal) { + RecordExpiry(cntx.db_index, key); + } + if (expired_keys_events_recording_) db->expired_keys_events_.emplace_back(key); @@ -1142,10 +1145,6 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato const_cast(this)->PerformDeletion(Iterator(it, StringOrView::FromView(key)), ExpIterator(expire_it, StringOrView::FromView(key)), db.get()); - // Replicate expiry - if (auto journal = owner_->journal(); journal) { - RecordExpiry(cntx.db_index, key); - } ++events_.expired_keys; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 0d1f4180910a..532378694604 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -589,6 +589,12 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { DCHECK(shard); + absl::Cleanup clean([&]() { + // Reset cleanup and saver + flow->cleanup = []() {}; + flow->saver.reset(); + }); + error_code ec = flow->saver->StopFullSyncInShard(shard); if (ec) { cntx->ReportError(ec); @@ -600,10 +606,6 @@ void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* s cntx->ReportError(ec); return; } - - // Reset cleanup and saver - flow->cleanup = []() {}; - flow->saver.reset(); } void DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {