Skip to content

Commit

Permalink
feat: Huge values breakdown in cluster migration (#4144)
Browse files Browse the repository at this point in the history
* feat: Huge values breakdown in cluster migration

Before this PR we used `RESTORE` commands for transferring data between
source and target nodes in cluster slots migration.

While this _works_, it has a side effect of consuming 2x memory for huge
values (i.e. if a single key's value takes 10gb, serializing it will
take 20gb or even 30gb).

With this PR we break down huge keys into multiple commands (`RPUSH`,
`HSET`, etc), respecting the existing `--serialization_max_chunk_size`
flag.

Part of #4100
  • Loading branch information
chakaz authored Nov 25, 2024
1 parent 2b3c182 commit 3c65651
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ endif()

add_library(dfly_transaction db_slice.cc blocking_controller.cc
command_registry.cc cluster/cluster_utility.cc
journal/tx_executor.cc namespaces.cc
journal/cmd_serializer.cc journal/tx_executor.cc namespaces.cc
common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc
server_state.cc table.cc top_keys.cc transaction.cc tx_base.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
Expand Down
30 changes: 30 additions & 0 deletions src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,36 @@ bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSort
return false;
}

bool IterateMap(const PrimeValue& pv, const IterateKVFunc& func) {
bool finished = true;

if (pv.Encoding() == kEncodingListPack) {
uint8_t intbuf[LP_INTBUF_SIZE];
uint8_t* lp = (uint8_t*)pv.RObjPtr();
uint8_t* fptr = lpFirst(lp);
while (fptr) {
string_view key = LpGetView(fptr, intbuf);
fptr = lpNext(lp, fptr);
string_view val = LpGetView(fptr, intbuf);
fptr = lpNext(lp, fptr);
if (!func(ContainerEntry{key.data(), key.size()}, ContainerEntry{val.data(), val.size()})) {
finished = false;
break;
}
}
} else {
StringMap* sm = static_cast<StringMap*>(pv.RObjPtr());
for (const auto& k_v : *sm) {
if (!func(ContainerEntry{k_v.first, sdslen(k_v.first)},
ContainerEntry{k_v.second, sdslen(k_v.second)})) {
finished = false;
break;
}
}
}
return finished;
}

StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context) {
DCHECK_EQ(pv.Encoding(), kEncodingStrMap2);
StringMap* res = static_cast<StringMap*>(pv.RObjPtr());
Expand Down
3 changes: 3 additions & 0 deletions src/server/container_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct ContainerEntry {

using IterateFunc = std::function<bool(ContainerEntry)>;
using IterateSortedFunc = std::function<bool(ContainerEntry, double)>;
using IterateKVFunc = std::function<bool(ContainerEntry, ContainerEntry)>;

// Iterate over all values and call func(val). Iteration stops as soon
// as func return false. Returns true if it successfully processed all elements
Expand All @@ -72,6 +73,8 @@ bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSort
int32_t start = 0, int32_t end = -1, bool reverse = false,
bool use_score = false);

bool IterateMap(const PrimeValue& pv, const IterateKVFunc& func);

// Get StringMap pointer from primetable value. Sets expire time from db_context
StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context);

Expand Down
206 changes: 206 additions & 0 deletions src/server/journal/cmd_serializer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "server/journal/cmd_serializer.h"

#include "server/container_utils.h"
#include "server/journal/serializer.h"
#include "server/rdb_save.h"

namespace dfly {

namespace {
using namespace std;

class CommandAggregator {
public:
using WriteCmdCallback = std::function<void(absl::Span<const string_view>)>;

CommandAggregator(string_view key, WriteCmdCallback cb) : key_(key), cb_(cb) {
}

~CommandAggregator() {
CommitPending();
}

enum class CommitMode { kAuto, kNoCommit };
void AddArg(string arg, CommitMode commit_mode = CommitMode::kAuto) {
agg_bytes_ += arg.size();
members_.push_back(std::move(arg));

if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= serialization_max_chunk_size) {
CommitPending();
}
}

private:
void CommitPending() {
if (members_.empty()) {
return;
}

args_.clear();
args_.reserve(members_.size() + 1);
args_.push_back(key_);
for (string_view member : members_) {
args_.push_back(member);
}
cb_(args_);
members_.clear();
}

string_view key_;
WriteCmdCallback cb_;
vector<string> members_;
absl::InlinedVector<string_view, 5> args_;
size_t agg_bytes_ = 0;
};

} // namespace

CmdSerializer::CmdSerializer(FlushSerialized cb) : cb_(std::move(cb)) {
}

void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
// We send RESTORE commands for small objects, or objects we don't support breaking.
bool use_restore_serialization = true;
if (serialization_max_chunk_size > 0 && pv.MallocUsed() > serialization_max_chunk_size) {
switch (pv.ObjType()) {
case OBJ_SET:
SerializeSet(key, pv);
use_restore_serialization = false;
break;
case OBJ_ZSET:
SerializeZSet(key, pv);
use_restore_serialization = false;
break;
case OBJ_HASH:
SerializeHash(key, pv);
use_restore_serialization = false;
break;
case OBJ_LIST:
SerializeList(key, pv);
use_restore_serialization = false;
break;
case OBJ_STRING:
case OBJ_STREAM:
case OBJ_JSON:
case OBJ_SBF:
default:
// These types are unsupported wrt splitting huge values to multiple commands, so we send
// them as a RESTORE command.
break;
}
}

if (use_restore_serialization) {
// RESTORE sets STICK and EXPIRE as part of the command.
SerializeRestore(key, pk, pv, expire_ms);
} else {
SerializeStickIfNeeded(key, pk);
SerializeExpireIfNeeded(key, expire_ms);
}
}

void CmdSerializer::SerializeCommand(string_view cmd, absl::Span<const string_view> args) {
journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
0, // db index
1, // shard count
0, // slot-id, but it is ignored at this level
journal::Entry::Payload(cmd, ArgSlice(args)));

// Serialize into a string
io::StringSink cmd_sink;
JournalWriter writer{&cmd_sink};
writer.Write(entry);

cb_(std::move(cmd_sink).str());
}

void CmdSerializer::SerializeStickIfNeeded(string_view key, const PrimeValue& pk) {
if (!pk.IsSticky()) {
return;
}

SerializeCommand("STICK", {key});
}

void CmdSerializer::SerializeExpireIfNeeded(string_view key, uint64_t expire_ms) {
if (expire_ms == 0) {
return;
}

SerializeCommand("PEXIRE", {key, absl::StrCat(expire_ms)});
}

void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("SADD", args); });

container_utils::IterateSet(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString());
return true;
});
}

void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("ZADD", args); });

container_utils::IterateSortedSet(
pv.GetRobjWrapper(),
[&](container_utils::ContainerEntry ce, double score) {
aggregator.AddArg(absl::StrCat(score), CommandAggregator::CommitMode::kNoCommit);
aggregator.AddArg(ce.ToString());
return true;
},
/*start=*/0, /*end=*/-1, /*reverse=*/false, /*use_score=*/true);
}

void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("HSET", args); });

container_utils::IterateMap(
pv, [&](container_utils::ContainerEntry k, container_utils::ContainerEntry v) {
aggregator.AddArg(k.ToString(), CommandAggregator::CommitMode::kNoCommit);
aggregator.AddArg(v.ToString());
return true;
});
}

void CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("RPUSH", args); });

container_utils::IterateList(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString());
return true;
});
}

void CmdSerializer::SerializeRestore(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
absl::InlinedVector<string_view, 5> args;
args.push_back(key);

string expire_str = absl::StrCat(expire_ms);
args.push_back(expire_str);

io::StringSink value_dump_sink;
SerializerBase::DumpObject(pv, &value_dump_sink);
args.push_back(value_dump_sink.str());

args.push_back("ABSTTL"); // Means expire string is since epoch

if (pk.IsSticky()) {
args.push_back("STICK");
}

SerializeCommand("RESTORE", args);
}

} // namespace dfly
44 changes: 44 additions & 0 deletions src/server/journal/cmd_serializer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <absl/types/span.h>

#include <string>
#include <string_view>

#include "server/table.h"

namespace dfly {

// CmdSerializer serializes DB entries (key+value) into command(s) in RESP format string.
// Small entries are serialized as RESTORE commands, while bigger ones (see
// serialization_max_chunk_size) are split into multiple commands (like rpush, hset, etc).
// Expiration and stickiness are also serialized into commands.
class CmdSerializer {
public:
using FlushSerialized = std::function<void(std::string)>;

explicit CmdSerializer(FlushSerialized cb);

void SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);

private:
void SerializeCommand(std::string_view cmd, absl::Span<const std::string_view> args);
void SerializeStickIfNeeded(std::string_view key, const PrimeValue& pk);
void SerializeExpireIfNeeded(std::string_view key, uint64_t expire_ms);

void SerializeSet(std::string_view key, const PrimeValue& pv);
void SerializeZSet(std::string_view key, const PrimeValue& pv);
void SerializeHash(std::string_view key, const PrimeValue& pv);
void SerializeList(std::string_view key, const PrimeValue& pv);
void SerializeRestore(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);

FlushSerialized cb_;
};

} // namespace dfly
34 changes: 3 additions & 31 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "base/flags.h"
#include "base/logging.h"
#include "server/cluster/cluster_defs.h"
#include "server/journal/cmd_serializer.h"
#include "util/fibers/synchronization.h"

using namespace facade;
Expand Down Expand Up @@ -317,37 +318,8 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req

void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
absl::InlinedVector<string_view, 5> args;
args.push_back(key);

string expire_str = absl::StrCat(expire_ms);
args.push_back(expire_str);

io::StringSink restore_cmd_sink;
{ // to destroy extra copy
io::StringSink value_dump_sink;
SerializerBase::DumpObject(pv, &value_dump_sink);
args.push_back(value_dump_sink.str());

args.push_back("ABSTTL"); // Means expire string is since epoch

if (pk.IsSticky()) {
args.push_back("STICK");
}

journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
0, // db index
1, // shard count
0, // slot-id, but it is ignored at this level
journal::Entry::Payload("RESTORE", ArgSlice(args)));

JournalWriter writer{&restore_cmd_sink};
writer.Write(entry);
}
// TODO: From DumpObject to till Write we tripple copy the PrimeValue. It's very inefficient and
// will burn CPU for large values.
Write(restore_cmd_sink.str());
CmdSerializer serializer([&](std::string s) { Write(s); });
serializer.SerializeEntry(key, pk, pv, expire_ms);
}

} // namespace dfly
3 changes: 2 additions & 1 deletion src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ class RestoreStreamer : public JournalStreamer {

// Returns whether anything was written
void WriteBucket(PrimeTable::bucket_iterator it);
void WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms);
void WriteEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);

DbSlice* db_slice_;
DbTableArray db_array_;
Expand Down
Loading

0 comments on commit 3c65651

Please sign in to comment.