Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: split RecordExpiry preemptive and non-preemptive flows #4252

Merged
merged 5 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT

// log the evicted keys to journal.
if (auto journal = db_slice_->shard_owner()->journal(); journal) {
RecordExpiry(cntx_.db_index, key);
RecordExpiry(cntx_.db_index, key, false);
}
// Safe we already acquired util::fb2::LockGuard lk(db_slice_->GetSerializationMutex());
// on the flows that call this function
Expand Down Expand Up @@ -450,7 +450,7 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
}

if (res.it->second.HasExpire()) { // check expiry state
res = ExpireIfNeeded(cntx, res.it);
res = ExpireIfNeeded(cntx, res.it, true);
if (!IsValid(res.it)) {
return OpStatus::KEY_NOTFOUND;
}
Expand Down Expand Up @@ -1088,11 +1088,12 @@ void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size
}

DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, Iterator it) const {
auto res = ExpireIfNeeded(cntx, it.GetInnerIt());
auto res = ExpireIfNeeded(cntx, it.GetInnerIt(), false);
return {.it = Iterator::FromPrime(res.it), .exp_it = ExpIterator::FromPrime(res.exp_it)};
}

DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) const {
DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it,
bool preempts) const {
if (!it->second.HasExpire()) {
LOG(ERROR) << "Invalid call to ExpireIfNeeded";
return {it, ExpireIterator{}};
Expand Down Expand Up @@ -1122,7 +1123,7 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato

// Replicate expiry
if (auto journal = owner_->journal(); journal) {
RecordExpiry(cntx.db_index, key);
RecordExpiry(cntx.db_index, key, preempts);
}

if (expired_keys_events_recording_)
Expand Down Expand Up @@ -1153,7 +1154,7 @@ void DbSlice::ExpireAllIfNeeded() {
LOG(ERROR) << "Expire entry " << exp_it->first.ToString() << " not found in prime table";
return;
}
ExpireIfNeeded(Context{nullptr, db_index, GetCurrentTimeMs()}, prime_it);
ExpireIfNeeded(Context{nullptr, db_index, GetCurrentTimeMs()}, prime_it, false);
};

ExpireTable::Cursor cursor;
Expand Down Expand Up @@ -1215,7 +1216,7 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx
if (ttl <= 0) {
auto prime_it = db.prime.Find(it->first);
CHECK(!prime_it.is_done());
ExpireIfNeeded(cntx, prime_it);
ExpireIfNeeded(cntx, prime_it, false);
++result.deleted;
} else {
result.survivor_ttl_sum += ttl;
Expand Down Expand Up @@ -1283,7 +1284,7 @@ pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t s
// fiber preemption could happen in this phase.
for (string_view key : keys_to_journal) {
if (auto journal = owner_->journal(); journal)
RecordExpiry(db_ind, key);
RecordExpiry(db_ind, key, false);

if (expired_keys_events_recording_)
db_table->expired_keys_events_.emplace_back(key);
Expand Down
2 changes: 1 addition & 1 deletion src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ class DbSlice {
ExpireIterator exp_it;
};

PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it) const;
PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it, bool preempts = false) const;

OpResult<AddOrFindResult> AddOrFindInternal(const Context& cntx, std::string_view key);

Expand Down
1 change: 1 addition & 0 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor,
auto& db_slice = op_args.GetDbSlice();
DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index));

util::FiberAtomicGuard guard;
unsigned cnt = 0;

VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_cntx.db_index << " has "
Expand Down
10 changes: 8 additions & 2 deletions src/server/tx_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,17 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, facade::ArgSlice
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, true);
}

void RecordExpiry(DbIndex dbid, string_view key) {
void RecordExpiry(DbIndex dbid, string_view key, bool preempts) {
auto journal = EngineShard::tlocal()->journal();
CHECK(journal);
if (!preempts) {
util::FiberAtomicGuard guard;
journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key),
Payload("DEL", ArgSlice{key}), preempts);
return;
}
journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key),
Payload("DEL", ArgSlice{key}), false);
Payload("DEL", ArgSlice{key}), preempts);
}

void TriggerJournalWriteToSink() {
Expand Down
2 changes: 1 addition & 1 deletion src/server/tx_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,

// Record expiry in journal with independent transaction. Must be called from shard thread holding
// key.
void RecordExpiry(DbIndex dbid, std::string_view key);
void RecordExpiry(DbIndex dbid, std::string_view key, bool preempts = false);

// Trigger journal write to sink, no journal record will be added to journal.
// Must be called from shard thread of journal to sink.
Expand Down
Loading