Skip to content

Commit

Permalink
chore: split RecordExpiry preemptive and non-preemptive flows (#4252)
Browse files Browse the repository at this point in the history
* add FiberGuard to RecordExpiry for non-preemptive flows

---------

Signed-off-by: kostas <[email protected]>
  • Loading branch information
kostasrim authored Dec 4, 2024
1 parent 892a415 commit d8fda40
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 12 deletions.
17 changes: 9 additions & 8 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT

// log the evicted keys to journal.
if (auto journal = db_slice_->shard_owner()->journal(); journal) {
RecordExpiry(cntx_.db_index, key);
RecordExpiry(cntx_.db_index, key, false);
}
// Safe we already acquired util::fb2::LockGuard lk(db_slice_->GetSerializationMutex());
// on the flows that call this function
Expand Down Expand Up @@ -450,7 +450,7 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
}

if (res.it->second.HasExpire()) { // check expiry state
res = ExpireIfNeeded(cntx, res.it);
res = ExpireIfNeeded(cntx, res.it, true);
if (!IsValid(res.it)) {
return OpStatus::KEY_NOTFOUND;
}
Expand Down Expand Up @@ -1088,11 +1088,12 @@ void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size
}

DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, Iterator it) const {
auto res = ExpireIfNeeded(cntx, it.GetInnerIt());
auto res = ExpireIfNeeded(cntx, it.GetInnerIt(), false);
return {.it = Iterator::FromPrime(res.it), .exp_it = ExpIterator::FromPrime(res.exp_it)};
}

DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) const {
DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it,
bool preempts) const {
if (!it->second.HasExpire()) {
LOG(ERROR) << "Invalid call to ExpireIfNeeded";
return {it, ExpireIterator{}};
Expand Down Expand Up @@ -1122,7 +1123,7 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato

// Replicate expiry
if (auto journal = owner_->journal(); journal) {
RecordExpiry(cntx.db_index, key);
RecordExpiry(cntx.db_index, key, preempts);
}

if (expired_keys_events_recording_)
Expand Down Expand Up @@ -1153,7 +1154,7 @@ void DbSlice::ExpireAllIfNeeded() {
LOG(ERROR) << "Expire entry " << exp_it->first.ToString() << " not found in prime table";
return;
}
ExpireIfNeeded(Context{nullptr, db_index, GetCurrentTimeMs()}, prime_it);
ExpireIfNeeded(Context{nullptr, db_index, GetCurrentTimeMs()}, prime_it, false);
};

ExpireTable::Cursor cursor;
Expand Down Expand Up @@ -1215,7 +1216,7 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx
if (ttl <= 0) {
auto prime_it = db.prime.Find(it->first);
CHECK(!prime_it.is_done());
ExpireIfNeeded(cntx, prime_it);
ExpireIfNeeded(cntx, prime_it, false);
++result.deleted;
} else {
result.survivor_ttl_sum += ttl;
Expand Down Expand Up @@ -1283,7 +1284,7 @@ pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t s
// fiber preemption could happen in this phase.
for (string_view key : keys_to_journal) {
if (auto journal = owner_->journal(); journal)
RecordExpiry(db_ind, key);
RecordExpiry(db_ind, key, false);

if (expired_keys_events_recording_)
db_table->expired_keys_events_.emplace_back(key);
Expand Down
2 changes: 1 addition & 1 deletion src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ class DbSlice {
ExpireIterator exp_it;
};

PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it) const;
PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it, bool preempts = false) const;

OpResult<AddOrFindResult> AddOrFindInternal(const Context& cntx, std::string_view key);

Expand Down
1 change: 1 addition & 0 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor,
auto& db_slice = op_args.GetDbSlice();
DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index));

util::FiberAtomicGuard guard;
unsigned cnt = 0;

VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_cntx.db_index << " has "
Expand Down
10 changes: 8 additions & 2 deletions src/server/tx_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,17 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, facade::ArgSlice
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, true);
}

void RecordExpiry(DbIndex dbid, string_view key) {
void RecordExpiry(DbIndex dbid, string_view key, bool preempts) {
auto journal = EngineShard::tlocal()->journal();
CHECK(journal);
if (!preempts) {
util::FiberAtomicGuard guard;
journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key),
Payload("DEL", ArgSlice{key}), preempts);
return;
}
journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key),
Payload("DEL", ArgSlice{key}), false);
Payload("DEL", ArgSlice{key}), preempts);
}

void TriggerJournalWriteToSink() {
Expand Down
2 changes: 1 addition & 1 deletion src/server/tx_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,

// Record expiry in journal with independent transaction. Must be called from shard thread holding
// key.
void RecordExpiry(DbIndex dbid, std::string_view key);
void RecordExpiry(DbIndex dbid, std::string_view key, bool preempts = false);

// Trigger journal write to sink, no journal record will be added to journal.
// Must be called from shard thread of journal to sink.
Expand Down

0 comments on commit d8fda40

Please sign in to comment.