-
Notifications
You must be signed in to change notification settings - Fork 968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Huge values breakdown in cluster migration #4144
Conversation
Before this PR we used `RESTORE` commands for transferring data between source and target nodes in cluster slots migration. While this _works_, it has a side effect of consuming 2x memory for huge values (i.e. if a single key's value takes 10gb, serializing it will take 20gb or even 30gb). With this PR we break down huge keys into multiple commands (`RPUSH`, `HSET`, etc), respecting the existing `--serialization_max_chunk_size` flag. Fixes #4100
src/server/journal/streamer.h
Outdated
void WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms); | ||
void WriteEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv, | ||
uint64_t expire_ms); | ||
void WriteCommand(std::string_view cmd, absl::Span<const std::string_view> args); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this new funciton should be seperated to a new class which will be in a new file
This class will handle serialization of entries for migration, similar to what we have with rdb serializer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you write "this new function" you only mean WriteCommand()
, or something broader than that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean all the new functions that you introduced
They are handling how we serialize key values and I think it will be better to put them in serprate classe
pytest.param(3, 16, 20_000), | ||
pytest.param(5, 20, 30_000, marks=[pytest.mark.slow, pytest.mark.opt_only]), | ||
pytest.param(3, 16, 20_000, 10), | ||
pytest.param(3, 16, 20_000, 1_000_000), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have any containers in test with values of this size 1_000_000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that's why I used 10
above (to force splitting)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so using 1_000_000 is to check only the old flow of only restore commands?
lets add a comment here
and we do actually want to check both flows in one test I think it is better coverage
we should edit seeder code to make different container I asked Kostas to create an issue for this
@kostasrim please add in the issue description that once this change is introduced we can all use it in this test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done #4157
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so using 1_000_000 is to check only the old flow of only restore commands?
Yes, and I added a comment & TODO
@@ -317,37 +318,188 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req | |||
|
|||
void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RestoreStreamer::WriteBucket has fiber gaurd, doesnt this means that the buffer before send to socket just grows and grows? so you break the values when serializing but we dont realy reduce rss usage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can add a check for used_memory_peak_rss in the pytest compare before and after migration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so I'll split this to a separate test then which actually uses huge values, otherwise RSS will be noisy with small values like now
src/server/journal/streamer.cc
Outdated
@@ -279,7 +280,6 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const { | |||
|
|||
void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { | |||
if (it.GetVersion() < snapshot_version_) { | |||
FiberAtomicGuard fg; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Untill today we said that bucket serialization is atomic (this is why the fiber gaurd)
Now when you change this assumption we need to make sure no one other fiber is trying to change the keys in this bucket while we are in the middle of serialization.
When a transaction comes to change a value of key it will first check the snapshot version of the bucket if pre update cb is needed to be excuted. Because the flow here is first setting the snapshot version and than can preempt while serializing you can get to a point where you edit the container while you did not finish the serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and I think our cluster migration tests are lacking because we do not run seeder while migrating right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and I think our cluster migration tests are lacking because we do not run seeder while migrating right?
I think this is challenging because of the way our seeder is built(?), in that we need to first capture and then compare, but if we run seeder during migration and something breaks, we can't tell because capture will capture the existing state.
Or is there a way to compare to what we had sent with the seeder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can solve this by running the seeder twice with the same seed number, this should generate the same command sequence (this needs to be verified). This way we can run the seeder again on a new seprate instance and compare the captures between this instance the tested cluster
@adiholden following our offline sync, I modified this PR (including desc) to only do sync breakdown for huge values. I'll follow up on async later. |
src/server/journal/streamer.cc
Outdated
// We send RESTORE commands for small objects, or objects we don't support breaking. | ||
CmdSerializer serializer([&](std::string s) { Write(s); }); | ||
|
||
bool use_restore_serialization = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all this logic below should be in your new class CmdSerializer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only api to use it will be SerializeEntry which will get the key value and expire time
// TODO: From DumpObject to till Write we tripple copy the PrimeValue. It's very inefficient and | ||
// will burn CPU for large values. | ||
Write(restore_cmd_sink.str()); | ||
CmdSerializer serializer([&](std::string s) { Write(s); }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the write function
void Write(std::string_view str);
what is the reason you use std::string passed to this cb? also above I see you do std::move for the string but it has no affect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somewhere there will be a cast from std::string
to std::string_view
. I think that CmdSerializer
is a good place to have that conversion, given potentially other users of it might find it useful to take in the string
src/server/journal/cmd_serializer.h
Outdated
|
||
class CmdSerializer { | ||
public: | ||
using Callback = std::function<void(std::string)>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about instead calling this Callback rename to FlushSerialized?
Also Lets add some comment for this class, what it does
string expire_str = absl::StrCat(expire_ms); | ||
args.push_back(expire_str); | ||
|
||
io::StringSink value_dump_sink; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can make this sink and the other one members of this class and the CmdSerializer a member of RestoreSteamer and by that reduce the number of allocations we have
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The challenge is that JournalStreamer::Write
is more complicated than just a simple sink :(
Before this PR we used
RESTORE
commands for transferring data between source and target nodes in cluster slots migration.While this works, it has a side effect of consuming 2x memory for huge values (i.e. if a single key's value takes 10gb, serializing it will take 20gb or even 30gb).
With this PR we break down huge keys into multiple commands (
RPUSH
,HSET
, etc), respecting the existing--serialization_max_chunk_size
flag.Note that we still use
RESTORE
serialization for small values and for some data types which are not (yet?) supported.Part of #4100