Skip to content

Commit

Permalink
chore(tiering): Faster smallbins serialization #2
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Jul 27, 2024
1 parent e2d65a0 commit 4b10617
Show file tree
Hide file tree
Showing 12 changed files with 250 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
}

void CompactObj::Materialize(std::string_view blob, bool is_raw) {
CHECK(IsExternal()) << int(taglen_);
// CHECK(IsExternal()) << int(taglen_);

DCHECK_GT(blob.size(), kInlineLen);

Expand Down
9 changes: 9 additions & 0 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,15 @@ class CompactObj {

bool HasAllocated() const;

uint8_t GetEncodingMask() const {
return mask_ & kEncMask;
}

void SetEncodingMask(uint8_t mask) {
mask_ &= ~kEncMask;
mask_ |= (mask & kEncMask);
}

private:
void EncodeString(std::string_view str);
size_t DecodedLen(size_t sz) const;
Expand Down
5 changes: 4 additions & 1 deletion src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ constexpr uint8_t RDB_TYPE_JSON = 30;
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;
constexpr uint8_t RDB_TYPE_SBF = 33;
constexpr uint8_t RDB_TYPE_TIERED_SEGMENT = 34;

constexpr bool rdbIsObjectTypeDF(uint8_t type) {
return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON) ||
(type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY) ||
(type == RDB_TYPE_SBF);
(type == RDB_TYPE_SBF) || (type == RDB_TYPE_TIERED_SEGMENT);
}

// Opcodes: Range 200-240 is used by DF extensions.
Expand All @@ -40,6 +41,8 @@ constexpr uint8_t RDB_OPCODE_JOURNAL_BLOB = 210;
// so it is always sent at the end of the RDB stream.
constexpr uint8_t RDB_OPCODE_JOURNAL_OFFSET = 211;

constexpr uint8_t RDB_OPCODE_TIERED_PAGE = 212;

constexpr uint8_t RDB_OPCODE_DF_MASK = 220; /* Mask for key properties */

// RDB_OPCODE_DF_MASK define 4byte field with next flags
Expand Down
134 changes: 128 additions & 6 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extern "C" {
#include "base/logging.h"
#include "core/bloom.h"
#include "core/json/json_object.h"
#include "core/overloaded.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"
Expand All @@ -48,6 +49,7 @@ extern "C" {
#include "server/server_state.h"
#include "server/set_family.h"
#include "server/tiering/common.h" // for _KB literal
#include "server/tiering/disk_storage.h"
#include "server/transaction.h"
#include "strings/human_readable.h"

Expand Down Expand Up @@ -387,6 +389,7 @@ class RdbLoaderBase::OpaqueObjLoader {
void operator()(const LzfString& lzfstr);
void operator()(const unique_ptr<LoadTrace>& ptr);
void operator()(const RdbSBF& src);
void operator()(const RdbTieredSegment& segmnet);

std::error_code ec() const {
return ec_;
Expand Down Expand Up @@ -481,6 +484,10 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbSBF& src) {
pv_->SetSBF(sbf);
}

void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbTieredSegment& src) {
CHECK(false) << "unreachable";
}

void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
size_t len = ltrace->blob_count();

Expand Down Expand Up @@ -1385,6 +1392,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
case RDB_TYPE_SBF:
iores = ReadSBF();
break;
case RDB_TYPE_TIERED_SEGMENT:
iores = ReadTieredSegment();
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;

Expand Down Expand Up @@ -1878,6 +1888,14 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(res), RDB_TYPE_SBF};
}

auto RdbLoaderBase::ReadTieredSegment() -> io::Result<OpaqueObj> {
RdbTieredSegment segment;
SET_OR_UNEXPECT(LoadLen(nullptr), segment.offset);
SET_OR_UNEXPECT(LoadLen(nullptr), segment.length);
SET_OR_UNEXPECT(LoadLen(nullptr), segment.enc_mask);
return OpaqueObj{segment, RDB_TYPE_TIERED_SEGMENT};
};

template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {
auto ec = EnsureRead(sizeof(T));
if (ec)
Expand Down Expand Up @@ -1983,7 +2001,7 @@ error_code RdbLoader::Load(io::Source* src) {
/* Read type. */
SET_OR_RETURN(FetchType(), type);

DVLOG(2) << "Opcode type: " << type;
DVLOG(0) << "Opcode type: " << type;

/* Handle special types. */
if (type == RDB_OPCODE_EXPIRETIME) {
Expand Down Expand Up @@ -2117,6 +2135,38 @@ error_code RdbLoader::Load(io::Source* src) {
continue;
}

if (type == RDB_OPCODE_TIERED_PAGE) {
size_t offset;
SET_OR_RETURN(LoadLen(nullptr), offset);
// CHECK(false) << "Offload pages if needed, tiering?";
std::string page;
SET_OR_RETURN(FetchGenericString(), page);

VLOG(0) << "Found tiered page " << offset;

if (EngineShard::tlocal() && EngineShard::tlocal()->tiered_storage()) {
auto& storage = EngineShard::tlocal()->tiered_storage()->BorrowStorage();

util::fb2::Done done;
std::error_code ec;
ec = storage.Stash(io::Buffer(page), {},
[this, offset, &ec, &done](io::Result<tiering::DiskSegment> res) {
if (res.has_value())
small_items_pages_[offset] = res.value();
else
ec = res.error();
done.Notify();
});

done.Wait();
if (!ec)
continue;
}

small_items_pages_[offset] = page;
continue;
}

if (!rdbIsObjectTypeDF(type)) {
return RdbError(errc::invalid_rdb_type);
}
Expand All @@ -2126,6 +2176,11 @@ error_code RdbLoader::Load(io::Source* src) {
settings.Reset();
} // main load loop

// Flush all small items
HandleSmallItems(true);

FlushAllShards();

DVLOG(1) << "RdbLoad loop finished";

if (stop_early_) {
Expand Down Expand Up @@ -2531,20 +2586,38 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {

item->is_sticky = settings->is_sticky;

ShardId sid = Shard(item->key, shard_set->size());
item->expire_ms = settings->expiretime;

auto& out_buf = shard_buf_[sid];
std::move(cleanup).Cancel();

if (item->val.rdb_type == RDB_TYPE_TIERED_SEGMENT) {
auto segment = get<RdbTieredSegment>(item->val.obj);
VLOG(0) << "Found tiered segment " << segment.offset;
{
size_t offset = segment.offset / tiering::kPageSize * tiering::kPageSize;
auto& items = small_items_[offset];
small_items_sizes_.erase({items.size(), offset});
items.push_back(item);
small_items_sizes_.insert({items.size(), offset});
}
HandleSmallItems(false); // don't flush
return kOk;
}

Add(item);
return kOk;
}

void RdbLoader::Add(Item* item) {
ShardId sid = Shard(item->key, shard_set->size());

auto& out_buf = shard_buf_[sid];
out_buf.emplace_back(item);
std::move(cleanup).Cancel();

constexpr size_t kBufSize = 128;
if (out_buf.size() >= kBufSize) {
FlushShardAsync(sid);
}

return kOk;
}

void RdbLoader::LoadScriptFromAux(string&& body) {
Expand All @@ -2559,6 +2632,55 @@ void RdbLoader::LoadScriptFromAux(string&& body) {
}
}

void RdbLoader::HandleSmallItems(bool flush) {
while (!small_items_.empty() && (flush || small_items_.size() > 1000)) {
auto [_, offset] = small_items_sizes_.extract(small_items_sizes_.begin()).value();
VLOG(0) << "Wantin offset " << offset;
auto node = small_items_.extract(offset);

VLOG(0) << "Handling small group with offset " << offset << " entries " << node.mapped().size();

util::fb2::Future<std::string> fut =
visit(Overloaded{[](std::string page) {
util::fb2::Future<std::string> f;
VLOG(0) << "Loading page from emmory";
f.Resolve(page);
return f;
},
[](tiering::DiskSegment segment) {
auto& store = EngineShard::tlocal()->tiered_storage()->BorrowStorage();
util::fb2::Future<std::string> f;
VLOG(0) << "Loading page from Diiisk";
store.Read(segment, [f](io::Result<std::string_view> result) mutable {
f.Resolve(string{result.value()});
});
return f;
}},
small_items_pages_[offset]);
auto page = fut.Get();

for (Item* item : node.mapped()) {
RdbTieredSegment segment = get<RdbTieredSegment>(item->val.obj);
VLOG(0) << "Loading length " << segment.length;

base::PODArray<char> arr(segment.length, nullptr);
memcpy(arr.begin(), page.data() + (segment.offset - offset), segment.length);

CompactObj co;
co.SetEncodingMask(segment.enc_mask);
co.Materialize({arr.data(), arr.size()}, true); // todo: skip double copy

arr.resize(co.Size());
co.GetString(arr.data());

item->val.rdb_type = RDB_TYPE_STRING;
item->val.obj = std::move(arr);

Add(item);
}
}
}

void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
facade::CapturingReplyBuilder crb{};
ConnectionContext cntx{nullptr, nullptr, &crb};
Expand Down
31 changes: 29 additions & 2 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//
#pragma once

#include <absl/container/btree_set.h>
#include <absl/container/flat_hash_map.h>

#include <system_error>

extern "C" {
Expand All @@ -15,6 +18,7 @@ extern "C" {
#include "io/io_buf.h"
#include "server/common.h"
#include "server/journal/serializer.h"
#include "server/tiering/common.h"

namespace dfly {

Expand Down Expand Up @@ -54,8 +58,18 @@ class RdbLoaderBase {
std::vector<Filter> filters;
};

using RdbVariant =
std::variant<long long, base::PODArray<char>, LzfString, std::unique_ptr<LoadTrace>, RdbSBF>;
struct RdbTieredSegment {
size_t offset, length;
uint8_t enc_mask;
};

struct RdbTieredPage {
size_t offset;
std::string blob;
};

using RdbVariant = std::variant<long long, base::PODArray<char>, LzfString,
std::unique_ptr<LoadTrace>, RdbSBF, RdbTieredSegment>;

struct OpaqueObj {
RdbVariant obj;
Expand Down Expand Up @@ -148,6 +162,7 @@ class RdbLoaderBase {
::io::Result<OpaqueObj> ReadRedisJson();
::io::Result<OpaqueObj> ReadJson();
::io::Result<OpaqueObj> ReadSBF();
::io::Result<OpaqueObj> ReadTieredSegment();

std::error_code SkipModuleData();
std::error_code HandleCompressedBlob(int op_type);
Expand All @@ -168,10 +183,13 @@ class RdbLoaderBase {

size_t bytes_read_ = 0;
size_t source_limit_ = SIZE_MAX;

base::PODArray<uint8_t> compr_buf_;
std::unique_ptr<DecompressImpl> decompress_impl_;

JournalReader journal_reader_{nullptr, 0};
std::optional<uint64_t> journal_offset_ = std::nullopt;

RdbVersion rdb_version_ = RDB_VERSION;
};

Expand Down Expand Up @@ -259,10 +277,13 @@ class RdbLoader : protected RdbLoaderBase {
void FlushShardAsync(ShardId sid);
void FlushAllShards();

void Add(Item* item);
void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib);

void LoadScriptFromAux(std::string&& value);

void HandleSmallItems(bool flush);

// Load index definition from RESP string describing it in FT.CREATE format,
// issues an FT.CREATE call, but does not start indexing
void LoadSearchIndexDefFromAux(std::string&& value);
Expand All @@ -285,6 +306,12 @@ class RdbLoader : protected RdbLoaderBase {
std::function<void()> full_sync_cut_cb;

base::MPSCIntrusiveQueue<Item> item_queue_;

absl::flat_hash_map<size_t /* offset */, std::vector<Item*>> small_items_;
absl::btree_set<std::pair<size_t /* num entries*/, size_t /* offset */>, std::greater<>>
small_items_sizes_;
absl::flat_hash_map<size_t /* offset */, std::variant<std::string, tiering::DiskSegment>>
small_items_pages_;
};

} // namespace dfly
Loading

0 comments on commit 4b10617

Please sign in to comment.