Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Huge values breakdown in cluster migration #4144

Merged
merged 9 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ endif()

add_library(dfly_transaction db_slice.cc blocking_controller.cc
command_registry.cc cluster/cluster_utility.cc
journal/tx_executor.cc namespaces.cc
journal/cmd_serializer.cc journal/tx_executor.cc namespaces.cc
common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc
server_state.cc table.cc top_keys.cc transaction.cc tx_base.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
Expand Down
30 changes: 30 additions & 0 deletions src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,36 @@ bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSort
return false;
}

bool IterateMap(const PrimeValue& pv, const IterateKVFunc& func) {
bool finished = true;

if (pv.Encoding() == kEncodingListPack) {
uint8_t intbuf[LP_INTBUF_SIZE];
uint8_t* lp = (uint8_t*)pv.RObjPtr();
uint8_t* fptr = lpFirst(lp);
while (fptr) {
string_view key = LpGetView(fptr, intbuf);
fptr = lpNext(lp, fptr);
string_view val = LpGetView(fptr, intbuf);
fptr = lpNext(lp, fptr);
if (!func(ContainerEntry{key.data(), key.size()}, ContainerEntry{val.data(), val.size()})) {
finished = false;
break;
}
}
} else {
StringMap* sm = static_cast<StringMap*>(pv.RObjPtr());
for (const auto& k_v : *sm) {
if (!func(ContainerEntry{k_v.first, sdslen(k_v.first)},
ContainerEntry{k_v.second, sdslen(k_v.second)})) {
finished = false;
break;
}
}
}
return finished;
}

StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context) {
DCHECK_EQ(pv.Encoding(), kEncodingStrMap2);
StringMap* res = static_cast<StringMap*>(pv.RObjPtr());
Expand Down
3 changes: 3 additions & 0 deletions src/server/container_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct ContainerEntry {

using IterateFunc = std::function<bool(ContainerEntry)>;
using IterateSortedFunc = std::function<bool(ContainerEntry, double)>;
using IterateKVFunc = std::function<bool(ContainerEntry, ContainerEntry)>;

// Iterate over all values and call func(val). Iteration stops as soon
// as func return false. Returns true if it successfully processed all elements
Expand All @@ -72,6 +73,8 @@ bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSort
int32_t start = 0, int32_t end = -1, bool reverse = false,
bool use_score = false);

bool IterateMap(const PrimeValue& pv, const IterateKVFunc& func);

// Get StringMap pointer from primetable value. Sets expire time from db_context
StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context);

Expand Down
206 changes: 206 additions & 0 deletions src/server/journal/cmd_serializer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "server/journal/cmd_serializer.h"

#include "server/container_utils.h"
#include "server/journal/serializer.h"
#include "server/rdb_save.h"

namespace dfly {

namespace {
using namespace std;

class CommandAggregator {
public:
using WriteCmdCallback = std::function<void(absl::Span<const string_view>)>;

CommandAggregator(string_view key, WriteCmdCallback cb) : key_(key), cb_(cb) {
}

~CommandAggregator() {
CommitPending();
}

enum class CommitMode { kAuto, kNoCommit };
void AddArg(string arg, CommitMode commit_mode = CommitMode::kAuto) {
agg_bytes_ += arg.size();
members_.push_back(std::move(arg));

if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= serialization_max_chunk_size) {
CommitPending();
}
}

private:
void CommitPending() {
if (members_.empty()) {
return;
}

args_.clear();
args_.reserve(members_.size() + 1);
args_.push_back(key_);
for (string_view member : members_) {
args_.push_back(member);
}
cb_(args_);
members_.clear();
}

string_view key_;
WriteCmdCallback cb_;
vector<string> members_;
absl::InlinedVector<string_view, 5> args_;
size_t agg_bytes_ = 0;
};

} // namespace

CmdSerializer::CmdSerializer(FlushSerialized cb) : cb_(std::move(cb)) {
}

void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
// We send RESTORE commands for small objects, or objects we don't support breaking.
bool use_restore_serialization = true;
if (serialization_max_chunk_size > 0 && pv.MallocUsed() > serialization_max_chunk_size) {
switch (pv.ObjType()) {
case OBJ_SET:
SerializeSet(key, pv);
use_restore_serialization = false;
break;
case OBJ_ZSET:
SerializeZSet(key, pv);
use_restore_serialization = false;
break;
case OBJ_HASH:
SerializeHash(key, pv);
use_restore_serialization = false;
break;
case OBJ_LIST:
SerializeList(key, pv);
use_restore_serialization = false;
break;
case OBJ_STRING:
case OBJ_STREAM:
case OBJ_JSON:
case OBJ_SBF:
default:
// These types are unsupported wrt splitting huge values to multiple commands, so we send
// them as a RESTORE command.
break;
}
}

if (use_restore_serialization) {
// RESTORE sets STICK and EXPIRE as part of the command.
SerializeRestore(key, pk, pv, expire_ms);
} else {
SerializeStickIfNeeded(key, pk);
SerializeExpireIfNeeded(key, expire_ms);
}
}

void CmdSerializer::SerializeCommand(string_view cmd, absl::Span<const string_view> args) {
journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
0, // db index
1, // shard count
0, // slot-id, but it is ignored at this level
journal::Entry::Payload(cmd, ArgSlice(args)));

// Serialize into a string
io::StringSink cmd_sink;
JournalWriter writer{&cmd_sink};
writer.Write(entry);

cb_(std::move(cmd_sink).str());
}

void CmdSerializer::SerializeStickIfNeeded(string_view key, const PrimeValue& pk) {
if (!pk.IsSticky()) {
return;
}

SerializeCommand("STICK", {key});
}

void CmdSerializer::SerializeExpireIfNeeded(string_view key, uint64_t expire_ms) {
if (expire_ms == 0) {
return;
}

SerializeCommand("PEXIRE", {key, absl::StrCat(expire_ms)});
}

void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("SADD", args); });

container_utils::IterateSet(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString());
return true;
});
}

void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("ZADD", args); });

container_utils::IterateSortedSet(
pv.GetRobjWrapper(),
[&](container_utils::ContainerEntry ce, double score) {
aggregator.AddArg(absl::StrCat(score), CommandAggregator::CommitMode::kNoCommit);
aggregator.AddArg(ce.ToString());
return true;
},
/*start=*/0, /*end=*/-1, /*reverse=*/false, /*use_score=*/true);
}

void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("HSET", args); });

container_utils::IterateMap(
pv, [&](container_utils::ContainerEntry k, container_utils::ContainerEntry v) {
aggregator.AddArg(k.ToString(), CommandAggregator::CommitMode::kNoCommit);
aggregator.AddArg(v.ToString());
return true;
});
}

void CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("RPUSH", args); });

container_utils::IterateList(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString());
return true;
});
}

void CmdSerializer::SerializeRestore(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
absl::InlinedVector<string_view, 5> args;
args.push_back(key);

string expire_str = absl::StrCat(expire_ms);
args.push_back(expire_str);

io::StringSink value_dump_sink;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can make this sink and the other one members of this class and the CmdSerializer a member of RestoreSteamer and by that reduce the number of allocations we have

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The challenge is that JournalStreamer::Write is more complicated than just a simple sink :(

SerializerBase::DumpObject(pv, &value_dump_sink);
args.push_back(value_dump_sink.str());

args.push_back("ABSTTL"); // Means expire string is since epoch

if (pk.IsSticky()) {
args.push_back("STICK");
}

SerializeCommand("RESTORE", args);
}

} // namespace dfly
44 changes: 44 additions & 0 deletions src/server/journal/cmd_serializer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <absl/types/span.h>

#include <string>
#include <string_view>

#include "server/table.h"

namespace dfly {

// CmdSerializer serializes DB entries (key+value) into command(s) in RESP format string.
// Small entries are serialized as RESTORE commands, while bigger ones (see
// serialization_max_chunk_size) are split into multiple commands (like rpush, hset, etc).
// Expiration and stickiness are also serialized into commands.
class CmdSerializer {
public:
using FlushSerialized = std::function<void(std::string)>;

explicit CmdSerializer(FlushSerialized cb);

void SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);

private:
void SerializeCommand(std::string_view cmd, absl::Span<const std::string_view> args);
void SerializeStickIfNeeded(std::string_view key, const PrimeValue& pk);
void SerializeExpireIfNeeded(std::string_view key, uint64_t expire_ms);

void SerializeSet(std::string_view key, const PrimeValue& pv);
void SerializeZSet(std::string_view key, const PrimeValue& pv);
void SerializeHash(std::string_view key, const PrimeValue& pv);
void SerializeList(std::string_view key, const PrimeValue& pv);
void SerializeRestore(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);

FlushSerialized cb_;
};

} // namespace dfly
34 changes: 3 additions & 31 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "base/flags.h"
#include "base/logging.h"
#include "server/cluster/cluster_defs.h"
#include "server/journal/cmd_serializer.h"
#include "util/fibers/synchronization.h"

using namespace facade;
Expand Down Expand Up @@ -317,37 +318,8 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req

void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RestoreStreamer::WriteBucket has fiber gaurd, doesnt this means that the buffer before send to socket just grows and grows? so you break the values when serializing but we dont realy reduce rss usage

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice catch!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can add a check for used_memory_peak_rss in the pytest compare before and after migration

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I'll split this to a separate test then which actually uses huge values, otherwise RSS will be noisy with small values like now

uint64_t expire_ms) {
absl::InlinedVector<string_view, 5> args;
args.push_back(key);

string expire_str = absl::StrCat(expire_ms);
args.push_back(expire_str);

io::StringSink restore_cmd_sink;
{ // to destroy extra copy
io::StringSink value_dump_sink;
SerializerBase::DumpObject(pv, &value_dump_sink);
args.push_back(value_dump_sink.str());

args.push_back("ABSTTL"); // Means expire string is since epoch

if (pk.IsSticky()) {
args.push_back("STICK");
}

journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
0, // db index
1, // shard count
0, // slot-id, but it is ignored at this level
journal::Entry::Payload("RESTORE", ArgSlice(args)));

JournalWriter writer{&restore_cmd_sink};
writer.Write(entry);
}
// TODO: From DumpObject to till Write we tripple copy the PrimeValue. It's very inefficient and
// will burn CPU for large values.
Write(restore_cmd_sink.str());
CmdSerializer serializer([&](std::string s) { Write(s); });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the write function
void Write(std::string_view str);
what is the reason you use std::string passed to this cb? also above I see you do std::move for the string but it has no affect

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere there will be a cast from std::string to std::string_view. I think that CmdSerializer is a good place to have that conversion, given potentially other users of it might find it useful to take in the string

serializer.SerializeEntry(key, pk, pv, expire_ms);
}

} // namespace dfly
3 changes: 2 additions & 1 deletion src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ class RestoreStreamer : public JournalStreamer {

// Returns whether anything was written
void WriteBucket(PrimeTable::bucket_iterator it);
void WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms);
void WriteEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);

DbSlice* db_slice_;
DbTableArray db_array_;
Expand Down
Loading
Loading