Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove DbSlice mutex and add ConditionFlag in SliceSnapshot #4073

Merged
merged 43 commits into from
Dec 5, 2024

Conversation

kostasrim
Copy link
Contributor

  • remove DbSlice mutex
  • add ConditionFlag in SliceSnapshot

Partially solves #4072

@kostasrim kostasrim self-assigned this Nov 6, 2024
};

// Helper class used to guarantee atomicity between serialization of buckets
class ConditionGuard {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is code we checked in and removed on previous PR's. I just brought it back

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for all the changes in this header

@kostasrim kostasrim requested a review from adiholden November 6, 2024 12:24
@kostasrim
Copy link
Contributor Author

Comment on lines 358 to 392
struct ConditionFlag {
util::fb2::CondVarAny cond_var;
bool flag = false;
};

// Helper class used to guarantee atomicity between serialization of buckets
class ConditionGuard {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this not just a thread local mutex? 🤔 Topological proof 🤓: you can move the constructor/destructor code into lock and unlock() of ConditionFlag and use it with lock_guard

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

ConditionFlag* enclosing_;
};

class LocalBlockingCounter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have EmbeddedBlockingCounter in helio, but it's atomicity is not needed here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's why I introduced it

@@ -243,6 +243,7 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
}

bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
ConditionGuard guard(&bucket_ser_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why here and in OnDbChange and not inside SerializeBucket ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have asked me the exact same question in June 🤣

The answer is, that if you look on BucketSaveCb it calls FlushChangeToEarlierCallbacks which might preempt and when the fiber resumes the condition the check above might not be true. The code within BucketSaveCb should be atomic /considered as a critical section

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean the check below lambda for version?
you can check also inside BucketSaveCb the snapshot version and if its bigger than dont serialize the bucket

@@ -106,6 +106,8 @@ class RestoreStreamer : public JournalStreamer {
cluster::SlotSet my_slots_;
bool fiber_cancelled_ = false;
bool snapshot_finished_ = false;

ConditionFlag bucket_ser_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you add it to streamer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it has similar semantics with the snapshot. It traverses and serializes the buckets (see RestoreStreamer::Run)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we dont have any preeption in serializing buckets in RestoreStreamer as we do not support yet big value serialization in migration yet

// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
// Skip heartbeat if we are serializing a big value
if (db_slice.HasBlockingCounterMutating()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition can be violated if we proceed, preempt and then resume

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add some prints here so we will be able to know if hearbeat is not running for long time maybe because we are running lots of big values serialization or we have a bug somewere causing this skip

Copy link
Contributor Author

@kostasrim kostasrim Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+100 on this one. I thought the same tbh -- I will add it

@@ -1158,6 +1143,11 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
const_cast<DbSlice*>(this)->PerformDeletion(Iterator(it, StringOrView::FromView(key)),
ExpIterator(expire_it, StringOrView::FromView(key)),
db.get());
// Replicate expiry
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reason you moved this here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The answer is preemptions because heartbeat is not transactional. If we first write to the journal, preempt and then delete, we risk another transaction coming in between the preemption and before the deletion. This is not good.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RecordExpiry can not preempt, the await is set to false
Does this assumption breaks now?

Copy link
Contributor Author

@kostasrim kostasrim Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope you are right with this assumption because: serializer_->WriteJournalEntry(item.data); does not ever call FlushIfNeeded so it won't flush. I still rather keep it though, we always do the operation and then write to the journal not the other way around

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my concern here that this assumption is not right anymore :(
even if await is false we can run the await on the ConditinalGuard as we have some other entry in the middle of serialization
But we dont run heartbeat when serialization of big value is in progress. So I am not sure if we really have a problem but this is not writen well.

  1. we need to add a FiberGaurd when we call write journal with await=flase to make sure the assumption is fulfilled
  2. we need to rewrite the code as the flow is not clear , we call await=flase but under some guard we might call await

Copy link
Contributor Author

@kostasrim kostasrim Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my concern here that this assumption is not right anymore :(

I think it is with a small exception which is easy to address (see below -- in fact your comment/discussion is GOLD).

  1. We can only enter Heartbeat if we are not serializing a big value.
  2. Within heartbeat we do two things. a) we expire b) we evict. Both of those flows won't preempt (we preempt at the end when we flush)

However, for Heartbeat to proceed it relies on LocalBlockingCounter. This is ok for all but one flow. Both db_slice::CallChangeCallbacks and db_slice::FlushToEarlierCallbacks are fine, since if they preempt the counter will be incremented and Heartbeat won't run. The only exception is BucketSaveCb which will

  1. FlushChangeToEarlierCallbacks
  2. Serialize the bucket

The problem is that if (2) preempts because of a big value, the counter might be zero and heartbeat will run normally.

The fix is simple though, we should increment the counter in bucket_save as well which will provide mutual exlusivity for Heartbeat.

Last but not least, I agree in adding a FiberGuard, although this will merely denote that Heartbeat did not preempt not that there is no big value serialization happening (luckliy if this is the case the tests will fail because journal will contain junk)

// could lead to UB or assertion failures (while DashTable::Traverse is iterating over
// a logical bucket).
util::fb2::LockGuard lk(local_mu_);
block_counter_.Wait();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this true? we can not flush while we are in the middle of a big value serialization?
when snapshoting we can run flushdb as we use intrusive ptr there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it did mess with Traverse in the past as it caused the assertion to trigger. Now we got bucket (not logical) Traversal it's probably ok. I would rather keep this though as a safe guard, and have another branch run this without this Wait here. We can incrementally remove those parts if you are ok with this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont see how this is related to travesing on logical/ not logical bucket, we copy the pointer asside while flushing and snapshoting so that when flush is run we reset the pointer for the next transactions running.
Actually if we have a bug here I would like to understand before we merge this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I am thinking about it, think of the following:

We called IterateBucketsFb which calls Traverse which calls BucketSaveCb on the bucket. Now this will serialize an entry. Let's assume this entry is a list, so it calls SaveListObject. What this does is it iterates over each node in the list. Now let't say that each node is also a big value, so we call at the end of each iteration the FlushIfNeeded which happily preempts. Now we issue FLUSHALL which cleans up the dbtable. When SaveListObject resumes, the pointers to the list/node are invalidated (freed by the FLUSHALL). Isn't that a problem ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they should not be freed by flushall, what is the different if you preempt from snapshoting in the middle of entry serialize or at the end of bucket serialize.
when running flushall the snapshot should continue to create a point in time snapshot for the data although flush all finised and next transactions get empty db
this is why we use intrusive ptr in snapshoting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

148 // We use reference counting semantics of DbTable when doing snapshotting.
149 // There we need to preserve the copy of the table in case someone flushes it during
150 // the snapshot process. We copy the pointers in StartSnapshotInShard function.
151 using DbTableArray = std::vector<boost::intrusive_ptr<DbTable>>;  

and

   db_array_ = slice->databases();

Now it makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that pops my mind:

Within FlushDbIndexes:

   803                                                                                                   
   804   auto cb = [indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
   805     flush_db_arr.clear();
   806     ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap |
   807                                           ServerState::kGlibcmalloc);
   808   };
   809 
   810   fb2::Fiber("flush_dbs", std::move(cb)).Detach();

The fiber will clear flush_db_arr and then call memory decommit. This won't have any effect if we are sharpshooing. Shall we run memory decommit in the destructor instead ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we can

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice. It's minor but I will patch this at some point.

replica = df_factory.create(
cluster_mode=cluster_mode, cluster_announce_ip=announce_ip, announce_port=announce_port
disable_serialization_max_chunk_size=0,
Copy link
Collaborator

@adiholden adiholden Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why disable in this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dfly_version = "v1.19.2" does not support the flag, so launching the instance will fail

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which version did you introduced this flag? maybe we can update this test to run this dfly version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which version did you introduced this flag?

I will need to check.

in which version did you introduced this flag? maybe we can update this test to run this dfly version

There is no point because a) the bug was on that version b) we won't run the reg tests with this flag on in the future anyway

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think should use this flag in regression tests so that small percentage of entries in replication/snapshot test will hit this flow.
The default for this flag will be much higher though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think should use this flag in regression tests so that small percentage of entries in replication/snapshot test will hit this flow.

Only for extreme testing. I can disable this no issue on that

@@ -124,6 +124,12 @@ def __init__(self, params: DflyParams, args):
if threads > 1:
self.args["num_shards"] = threads - 1

if "disable_serialization_max_chunk_size" not in self.args:
# Add 1 byte limit for big values
self.args["serialization_max_chunk_size"] = 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it realy makes sense to run always the big value serialization flow in regression tests?
it is good for extensive testing at the beging but then we dont test the other flow in code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted this to run for a week or so to make sure that we don't see any other new bugs. I will revert this afterwards. It was Roman's idea back in the summer 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

point is I either keep this PR and run the reg tests every 3 hours manually or I introduce this change and wait for a week 😄

@adiholden
Copy link
Collaborator

It will be best if you add statistics for big value serilization i.e preept in serialize expose in stats
This way we will know this logic is executed in pytest flows and track this in running dragonfly servers

@kostasrim
Copy link
Contributor Author

It will be best if you add statistics for big value serilization i.e preept in serialize expose in stats This way we will know this logic is executed in pytest flows and track this in running dragonfly servers

Generally this PR is about removing the lock from db slice and not adding statistics. I am happy to do this as part of this PR as it's a straightforward change but usually I like to separate tasks like that

// Does not check for non supported events. Callers must parse the string and reject it
// if it's not empty and not EX.
void SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events);

bool HasBlockingCounterMutating() const {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we want to expose by this api is if we will preepmt if we try to do any mutation on db
i.e if heartbeat function want to do mutation for eviction/expiry and we do not allow preepmting there right now than is should invoke db slice api IsMutatingPreepmtive and if yes we will not invoke heartbeat eviction/expiry step
Note: we should to call this in hearbeat and in PrimeEvictionPolicy::Evict

I think that beside renaming the function name the logic will be more clear if we register 2 callbacks to DbSlice::RegisterOnChange one is the one we currently have the change cb, and the other one is the new callback that will return false or true if we will preempt if we invoke the callback.
This way we will need to iterate though this callbacks to know if IsMutatingPreepmtive
On one hand it is not as simple as just returning the block_counter_ as you do now
But on the other hand I find this more robust and flow more clear when reading as it is more simple and clear from the register side what is this function and we will not need to remember to call the blocking_counter increamental from different places in code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed internally, we can use this but for now I am aiming for correctness. Change should be simple though, we will discuss this offline.

@kostasrim kostasrim requested a review from adiholden December 4, 2024 15:03
@@ -1098,6 +1090,7 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
LOG(ERROR) << "Invalid call to ExpireIfNeeded";
return {it, ExpireIterator{}};
}
block_counter_.Wait();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we discussed that this should be removed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

status = StopFullSyncInThread(flow, &replica_ptr->cntx, shard);
if (*status != OpStatus::OK) {
return;
}
StartStableSyncInThread(flow, &replica_ptr->cntx, shard);
};
shard_set->RunBlockingInParallel(std::move(cb));

if (*status != OpStatus::OK)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you remove this lines?

if (db_slice.WillBlockOnJournalWrite()) {
const auto elapsed = std::chrono::system_clock::now() - start;
if (elapsed > std::chrono::seconds(1)) {
LOG(WARNING) << "Stalled heartbeat() fiber for " << elapsed.count()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should use log ever t
as if the heartbeat is stalled due to big value writings you we will be flooded with this prints

@@ -597,6 +597,10 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor,
auto& db_slice = op_args.GetDbSlice();
DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index));

// We need to make sure we don't preempt below, because we don't hold any locks to the keys
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think its related to holding locks for keys
We do not allow preemption in traverse callback here, because other fiber can edit the bucket at the time we yield
Please write:
Because ScanCb can preempt due to journaling expired entries we need to make sure that we enter the callback in a timing when journaling will not cause preemtion

Copy link
Contributor Author

@kostasrim kostasrim Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's both. Let's ignore bucket splits when we preempt. Now let's say the bucket has 5 slots.
We iterate the first and second and on the third we call ExpireIfNeeded from ScanCb. The issue is that we preempt when we write to the journal but we have not yet deleted the entry from the dash table. If we preempt on the first step, then we could get a transaction on the same key that just does a blind update. What's the problem there? That once we resume we will expire the update. That's a bug and its a side effect of not holding the locks to the relevant key - and you just broke the isolation level of the datastore.

  • ScanCb preempts on RecordExpiry -> Transaction updates the same slot/key to a new value -> ScanCb resumes and deletes/expires the key.

Why for example it is ok to call ExpireIfNeeded and preempt in Find ? Well, because this is transactional and we hold the locks so even if we preempt we don't risk the corner case described above.

I will leave a big comment here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I think I might be wrong because if we preempt when we write the journal change, the transaction that will try to modify the same key will block on PreUpdate() since it will try to call the registered callback that locks the same mutex and will block. In otherwords, the write to journal within ScanCb will syncronize with PreUpdate()

@kostasrim kostasrim merged commit 267d5ab into main Dec 5, 2024
14 checks passed
@kostasrim kostasrim deleted the kpr2 branch December 5, 2024 11:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants