diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 4f12fea17c9d..f29e3db2ed15 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -985,7 +985,7 @@ std::pair CompactObj::GetExternalSlice() const { } void CompactObj::Materialize(std::string_view blob, bool is_raw) { - CHECK(IsExternal()) << int(taglen_); + CHECK(IsExternal() || Size() == 0) << int(taglen_); DCHECK_GT(blob.size(), kInlineLen); diff --git a/src/core/compact_object.h b/src/core/compact_object.h index e48c8f8699ea..f19dacf63b82 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -405,6 +405,15 @@ class CompactObj { bool HasAllocated() const; + uint8_t GetEncodingMask() const { + return mask_ & kEncMask; + } + + void SetEncodingMask(uint8_t mask) { + mask_ &= ~kEncMask; + mask_ |= (mask & kEncMask); + } + private: void EncodeString(std::string_view str); size_t DecodedLen(size_t sz) const; diff --git a/src/server/rdb_extensions.h b/src/server/rdb_extensions.h index e50d4a5e2a45..9bc84a09d61f 100644 --- a/src/server/rdb_extensions.h +++ b/src/server/rdb_extensions.h @@ -14,11 +14,12 @@ constexpr uint8_t RDB_TYPE_JSON = 30; constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31; constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32; constexpr uint8_t RDB_TYPE_SBF = 33; +constexpr uint8_t RDB_TYPE_TIERED_SEGMENT = 34; constexpr bool rdbIsObjectTypeDF(uint8_t type) { return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON) || (type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY) || - (type == RDB_TYPE_SBF); + (type == RDB_TYPE_SBF) || (type == RDB_TYPE_TIERED_SEGMENT); } // Opcodes: Range 200-240 is used by DF extensions. @@ -40,6 +41,8 @@ constexpr uint8_t RDB_OPCODE_JOURNAL_BLOB = 210; // so it is always sent at the end of the RDB stream. constexpr uint8_t RDB_OPCODE_JOURNAL_OFFSET = 211; +constexpr uint8_t RDB_OPCODE_TIERED_PAGE = 212; + constexpr uint8_t RDB_OPCODE_DF_MASK = 220; /* Mask for key properties */ // RDB_OPCODE_DF_MASK define 4byte field with next flags diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 61c44fdb9471..8fc7e49eceb6 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -32,6 +32,7 @@ extern "C" { #include "base/logging.h" #include "core/bloom.h" #include "core/json/json_object.h" +#include "core/overloaded.h" #include "core/sorted_map.h" #include "core/string_map.h" #include "core/string_set.h" @@ -48,6 +49,7 @@ extern "C" { #include "server/server_state.h" #include "server/set_family.h" #include "server/tiering/common.h" // for _KB literal +#include "server/tiering/disk_storage.h" #include "server/transaction.h" #include "strings/human_readable.h" @@ -387,6 +389,7 @@ class RdbLoaderBase::OpaqueObjLoader { void operator()(const LzfString& lzfstr); void operator()(const unique_ptr& ptr); void operator()(const RdbSBF& src); + void operator()(const RdbTieredSegment& segmnet); std::error_code ec() const { return ec_; @@ -481,6 +484,10 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbSBF& src) { pv_->SetSBF(sbf); } +void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbTieredSegment& src) { + LOG(FATAL) << "unrechable"; +} + void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) { size_t len = ltrace->blob_count(); @@ -1385,6 +1392,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) { case RDB_TYPE_SBF: iores = ReadSBF(); break; + case RDB_TYPE_TIERED_SEGMENT: + iores = ReadTieredSegment(); + break; default: LOG(ERROR) << "Unsupported rdb type " << rdbtype; @@ -1878,6 +1888,14 @@ auto RdbLoaderBase::ReadSBF() -> io::Result { return OpaqueObj{std::move(res), RDB_TYPE_SBF}; } +auto RdbLoaderBase::ReadTieredSegment() -> io::Result { + RdbTieredSegment segment; + SET_OR_UNEXPECT(LoadLen(nullptr), segment.offset); + SET_OR_UNEXPECT(LoadLen(nullptr), segment.length); + SET_OR_UNEXPECT(LoadLen(nullptr), segment.enc_mask); + return OpaqueObj{segment, RDB_TYPE_TIERED_SEGMENT}; +}; + template io::Result RdbLoaderBase::FetchInt() { auto ec = EnsureRead(sizeof(T)); if (ec) @@ -1924,6 +1942,18 @@ RdbLoader::RdbLoader(Service* service) } RdbLoader::~RdbLoader() { + for (auto& [_, page] : small_items_pages_) { + if (!holds_alternative(page)) + continue; + auto segment = get(page); + EngineShard::tlocal()->tiered_storage()->BorrowStorage().MarkAsFree(segment); + } + + for (auto& [_, items] : small_items_) { + for (Item* item : items) + delete item; + } + while (true) { Item* item = item_queue_.Pop(); if (item == nullptr) @@ -2117,6 +2147,11 @@ error_code RdbLoader::Load(io::Source* src) { continue; } + if (type == RDB_OPCODE_TIERED_PAGE) { + RETURN_ON_ERR(LoadTieredPage()); + continue; + } + if (!rdbIsObjectTypeDF(type)) { return RdbError(errc::invalid_rdb_type); } @@ -2126,6 +2161,12 @@ error_code RdbLoader::Load(io::Source* src) { settings.Reset(); } // main load loop + // Flush all small items + HandleSmallItems(true); + DeleteTieredPages(); + + FlushAllShards(); + DVLOG(1) << "RdbLoad loop finished"; if (stop_early_) { @@ -2348,6 +2389,38 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service) { return std::error_code{}; } +error_code RdbLoader::LoadTieredPage() { + size_t offset; + SET_OR_RETURN(LoadLen(nullptr), offset); + + std::string page; + SET_OR_RETURN(FetchGenericString(), page); + + // If tiering is enabled, try saving the received page on disk + // Fall back to memory in case of errors + if (EngineShard::tlocal() && EngineShard::tlocal()->tiered_storage()) { + auto& storage = EngineShard::tlocal()->tiered_storage()->BorrowStorage(); + + util::fb2::Done done; + std::error_code ec; + auto cb = [this, offset, &ec, &done](io::Result res) { + if (res.has_value()) + small_items_pages_[offset] = res.value(); + else + ec = res.error(); + done.Notify(); + }; + ec = storage.Stash(io::Buffer(page), {}, cb); + + done.Wait(); + if (!ec) + return {}; + } + + small_items_pages_[offset] = page; + return {}; +} + error_code RdbLoader::HandleAux() { /* AUX: generic string-string fields. Use to add state to RDB * which is backward compatible. Implementations of RDB loading @@ -2531,20 +2604,37 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) { item->is_sticky = settings->is_sticky; - ShardId sid = Shard(item->key, shard_set->size()); item->expire_ms = settings->expiretime; - auto& out_buf = shard_buf_[sid]; + std::move(cleanup).Cancel(); + if (item->val.rdb_type == RDB_TYPE_TIERED_SEGMENT) { + auto segment = get(item->val.obj); + { + size_t offset = segment.offset / tiering::kPageSize * tiering::kPageSize; + auto& items = small_items_[offset]; + small_items_sizes_.erase({items.size(), offset}); + items.push_back(item); + small_items_sizes_.insert({items.size(), offset}); + } + HandleSmallItems(false); // don't force flush + return kOk; + } + + Add(item); + return kOk; +} + +void RdbLoader::Add(Item* item) { + ShardId sid = Shard(item->key, shard_set->size()); + + auto& out_buf = shard_buf_[sid]; out_buf.emplace_back(item); - std::move(cleanup).Cancel(); constexpr size_t kBufSize = 128; if (out_buf.size() >= kBufSize) { FlushShardAsync(sid); } - - return kOk; } void RdbLoader::LoadScriptFromAux(string&& body) { @@ -2559,6 +2649,50 @@ void RdbLoader::LoadScriptFromAux(string&& body) { } } +void RdbLoader::HandleSmallItems(bool flush) { + while (!small_items_.empty() && (flush || small_items_.size() > 1000)) { + auto [_, offset] = small_items_sizes_.extract(small_items_sizes_.begin()).value(); + auto node = small_items_.extract(offset); + + auto page_reader = [](tiering::DiskSegment segment) { + auto& store = EngineShard::tlocal()->tiered_storage()->BorrowStorage(); + util::fb2::Future f; + store.Read(segment, [f](io::Result result) mutable { + CHECK(result.has_value()); // TODO + f.Resolve(string{result.value()}); + }); + return f.Get(); + }; + string page = visit(Overloaded{[](const string& s) { return s; }, page_reader}, + small_items_pages_[offset]); + + for (Item* item : node.mapped()) { + RdbTieredSegment segment = get(item->val.obj); + + CompactObj co; + co.SetEncodingMask(segment.enc_mask); + co.Materialize({page.data() + (segment.offset - offset), segment.length}, true); + + base::PODArray arr(co.Size(), nullptr); + co.GetString(arr.data()); + + item->val.rdb_type = RDB_TYPE_STRING; + item->val.obj = std::move(arr); + Add(item); + } + } +} + +void RdbLoader::DeleteTieredPages() { + auto& store = EngineShard::tlocal()->tiered_storage()->BorrowStorage(); + for (auto& [offset, page] : small_items_pages_) { + if (!holds_alternative(page)) + continue; + auto segment = get(page); + store.MarkAsFree(segment); + } +} + void RdbLoader::LoadSearchIndexDefFromAux(string&& def) { facade::CapturingReplyBuilder crb{}; ConnectionContext cntx{nullptr, nullptr, &crb}; diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 838944c0a2f0..28f102743093 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -3,6 +3,9 @@ // #pragma once +#include +#include + #include extern "C" { @@ -15,6 +18,7 @@ extern "C" { #include "io/io_buf.h" #include "server/common.h" #include "server/journal/serializer.h" +#include "server/tiering/common.h" namespace dfly { @@ -54,8 +58,13 @@ class RdbLoaderBase { std::vector filters; }; - using RdbVariant = - std::variant, LzfString, std::unique_ptr, RdbSBF>; + struct RdbTieredSegment { + size_t offset, length; + uint8_t enc_mask; + }; + + using RdbVariant = std::variant, LzfString, + std::unique_ptr, RdbSBF, RdbTieredSegment>; struct OpaqueObj { RdbVariant obj; @@ -148,6 +157,7 @@ class RdbLoaderBase { ::io::Result ReadRedisJson(); ::io::Result ReadJson(); ::io::Result ReadSBF(); + ::io::Result ReadTieredSegment(); std::error_code SkipModuleData(); std::error_code HandleCompressedBlob(int op_type); @@ -168,10 +178,13 @@ class RdbLoaderBase { size_t bytes_read_ = 0; size_t source_limit_ = SIZE_MAX; + base::PODArray compr_buf_; std::unique_ptr decompress_impl_; + JournalReader journal_reader_{nullptr, 0}; std::optional journal_offset_ = std::nullopt; + RdbVersion rdb_version_ = RDB_VERSION; }; @@ -259,10 +272,17 @@ class RdbLoader : protected RdbLoaderBase { void FlushShardAsync(ShardId sid); void FlushAllShards(); + void Add(Item* item); void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib); void LoadScriptFromAux(std::string&& value); + // Materialize small items pages with most hits until a limited amount of entries is left. + // When flush is set, all pages are materialized. + void HandleSmallItems(bool flush); + void DeleteSmallItemsPages(); // Delete all small items pages + std::error_code LoadTieredPage(); + // Load index definition from RESP string describing it in FT.CREATE format, // issues an FT.CREATE call, but does not start indexing void LoadSearchIndexDefFromAux(std::string&& value); @@ -285,6 +305,12 @@ class RdbLoader : protected RdbLoaderBase { std::function full_sync_cut_cb; base::MPSCIntrusiveQueue item_queue_; + + absl::flat_hash_map> small_items_; + absl::btree_set, std::greater<>> + small_items_sizes_; + absl::flat_hash_map> + small_items_pages_; }; } // namespace dfly diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index c695359f20e7..d4ff81f647ea 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -161,6 +161,8 @@ uint8_t RdbObjectType(const PrimeValue& pv) { unsigned compact_enc = pv.Encoding(); switch (type) { case OBJ_STRING: + if (pv.IsExternal()) + return RDB_TYPE_TIERED_SEGMENT; return RDB_TYPE_STRING; case OBJ_LIST: if (compact_enc == OBJ_ENCODING_QUICKLIST) @@ -311,8 +313,9 @@ RdbSerializer::~RdbSerializer() { std::error_code RdbSerializer::SaveValue(const PrimeValue& pv) { std::error_code ec; if (pv.ObjType() == OBJ_STRING) { - auto opt_int = pv.TryGetInt(); - if (opt_int) { + if (pv.IsExternal()) { + ec = SaveExternalSegment(pv); + } else if (auto opt_int = pv.TryGetInt(); opt_int) { ec = SaveLongLongAsString(*opt_int); } else { ec = SaveString(pv.GetSlice(&tmp_str_)); @@ -681,6 +684,13 @@ std::error_code RdbSerializer::SaveSBFObject(const PrimeValue& pv) { return {}; } +std::error_code RdbSerializer::SaveExternalSegment(const PrimeValue& pv) { + auto [offset, length] = pv.GetExternalSlice(); + RETURN_ON_ERR(SaveLen(offset)); + RETURN_ON_ERR(SaveLen(length)); + return SaveLen(pv.GetEncodingMask()); +} + /* Save a long long value as either an encoded string or a string. */ error_code RdbSerializer::SaveLongLongAsString(int64_t value) { uint8_t buf[32]; @@ -1635,6 +1645,12 @@ size_t RdbSerializer::GetTempBufferSize() const { return SerializerBase::GetTempBufferSize() + tmp_str_.size(); } +error_code RdbSerializer::SaveTieringPage(size_t offset, std::string_view page) { + RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_TIERED_PAGE)); + RETURN_ON_ERR(SaveLen(offset)); + return SaveString(page); +} + void RdbSerializer::FlushIfNeeded(SerializerBase::FlushState flush_state) { if (flush_fun_) { flush_fun_(SerializedLen(), flush_state); diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index df5547c6dd1e..8a3d6e679667 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -227,6 +227,8 @@ class RdbSerializer : public SerializerBase { size_t GetTempBufferSize() const override; + std::error_code SaveTieringPage(size_t offset, std::string_view page); + private: // Might preempt if flush_fun_ is used std::error_code SaveObject(const PrimeValue& pv); @@ -237,6 +239,7 @@ class RdbSerializer : public SerializerBase { std::error_code SaveStreamObject(const PrimeValue& obj); std::error_code SaveJsonObject(const PrimeValue& pv); std::error_code SaveSBFObject(const PrimeValue& pv); + std::error_code SaveExternalSegment(const PrimeValue& pv); std::error_code SaveLongLongAsString(int64_t value); std::error_code SaveBinaryDouble(double val); diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 62782e79c451..7f354973d9a6 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -307,19 +307,28 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr expire_time = db_slice_->ExpireTime(eit); } - if (pv.IsExternal()) { + auto* ts = EngineShard::tlocal()->tiered_storage(); + if (pv.IsExternal() && pv.Size() > 2048) { // We can't block, so we just schedule a tiered read and append it to the delayed entries util::fb2::Future future; - EngineShard::tlocal()->tiered_storage()->Read( - db_indx, pk.ToString(), pv, - [future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); }); + ts->Read(db_indx, pk.ToString(), pv, + [future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); }); delayed_entries_.push_back({db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time}); ++type_freq_map_[RDB_TYPE_STRING]; - } else { - io::Result res = serializer->SaveEntry(pk, pv, expire_time, db_indx); - CHECK(res); - ++type_freq_map_[*res]; + return; } + + // Serialize page if needed + if (pv.IsExternal()) { + tiering::DiskSegment segment{pv.GetExternalSlice()}; + segment = segment.ContainingPages(); + if (serialized_pages_.emplace(segment.offset).second) + serializer_->SaveTieringPage(segment.offset, ts->ReadPage(segment.offset).Get()); + } + + io::Result res = serializer->SaveEntry(pk, pv, expire_time, db_indx); + CHECK(res); + ++type_freq_map_[*res]; } size_t SliceSnapshot::Serialize(SerializerBase::FlushState flush_state) { @@ -345,9 +354,6 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) { if (!force && serializer_->SerializedLen() < 4096) return false; - // Flush any of the leftovers to avoid interleavings - const auto serialized = Serialize(); - // Bucket serialization might have accumulated some delayed values. // Because we can finally block in this function, we'll await and serialize them while (!delayed_entries_.empty()) { @@ -356,8 +362,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) { delayed_entries_.pop_back(); } - const auto total_serialized = Serialize() + serialized; - return total_serialized > 0; + return Serialize() > 0; } void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { diff --git a/src/server/snapshot.h b/src/server/snapshot.h index d83fb9737489..d3919b7fdfa2 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -159,6 +159,7 @@ class SliceSnapshot { std::unique_ptr serializer_; std::vector delayed_entries_; // collected during atomic bucket traversal + absl::flat_hash_set serialized_pages_; // Used for sanity checks. bool serialize_bucket_running_ = false; diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index e754b8b346d9..32d9f7812135 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -56,6 +56,7 @@ bool OccupiesWholePages(size_t size) { // Stashed bins no longer have bin ids, so this sentinel is used to differentiate from regular reads constexpr auto kFragmentedBin = tiering::SmallBins::kInvalidBin - 1; +constexpr auto kRandomBin = tiering::SmallBins::kInvalidBin - 2; // Called after setting new value in place of previous segment void RecordDeleted(const PrimeValue& pv, size_t tiered_len, DbTableStats* stats) { @@ -243,6 +244,9 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value, return true; // delete } + if (holds_alternative(id)) + return false; // god knows why we read + // 1. When modified is true we MUST upload the value back to memory. // 2. On the other hand, if read is caused by snapshotting we do not want to fetch it. // Currently, our heuristic is not very smart, because we stop uploading any reads during @@ -596,6 +600,20 @@ void TieredStorage::RunOffloading(DbIndex dbid) { } while (offloading_cursor_ != start_cursor && iterations++ < kMaxIterations); } +tiering::DiskStorage& TieredStorage::BorrowStorage() { + return op_manager_->storage_; +} + +util::fb2::Future TieredStorage::ReadPage(size_t offset) { + util::fb2::Future fut; + op_manager_->Enqueue(kRandomBin, {offset, tiering::kPageSize}, + [fut](bool is_raw, const string* raw_val) mutable { + fut.Resolve(*raw_val); + return false; + }); + return fut; +} + bool TieredStorage::ShouldStash(const PrimeValue& pv) const { auto disk_stats = op_manager_->GetStats().disk_stats; return !pv.IsExternal() && !pv.HasStashPending() && pv.ObjType() == OBJ_STRING && diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 9a453b14406b..75ae23b37a34 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -23,7 +23,8 @@ class DbSlice; namespace tiering { class SmallBins; -}; +class DiskStorage; +}; // namespace tiering // Manages offloaded values class TieredStorage { @@ -78,6 +79,10 @@ class TieredStorage { // Run offloading loop until i/o device is loaded or all entries were traversed void RunOffloading(DbIndex dbid); + tiering::DiskStorage& BorrowStorage(); + + util::fb2::Future ReadPage(size_t offset); + private: // Returns if a value should be stashed bool ShouldStash(const PrimeValue& pv) const; diff --git a/src/server/tiering/small_bins.cc b/src/server/tiering/small_bins.cc index 18009cbf1544..81ca532ec427 100644 --- a/src/server/tiering/small_bins.cc +++ b/src/server/tiering/small_bins.cc @@ -175,6 +175,14 @@ SmallBins::Stats SmallBins::GetStats() const { .current_bin_bytes = current_bin_bytes_}; } +vector SmallBins::GetStashedOffsets() const { + vector out; + out.reserve(stashed_bins_.size()); + for (const auto& [offset, _] : stashed_bins_) + out.push_back(offset); + return out; +} + SmallBins::KeyHashDbList SmallBins::DeleteBin(DiskSegment segment, std::string_view value) { DCHECK_EQ(value.size(), kPageSize); diff --git a/src/server/tiering/small_bins.h b/src/server/tiering/small_bins.h index 1cdfb84b8471..ab61057adc12 100644 --- a/src/server/tiering/small_bins.h +++ b/src/server/tiering/small_bins.h @@ -8,10 +8,11 @@ #include #include -#include #include -#include "server/tiering/disk_storage.h" +#include "base/iterator.h" +#include "io/io.h" +#include "server/tiering/common.h" #include "server/tx_base.h" namespace dfly::tiering { @@ -71,6 +72,8 @@ class SmallBins { Stats GetStats() const; + std::vector GetStashedOffsets() const; + private: // Flush current bin FilledBin FlushBin();