Skip to content

Commit

Permalink
feat(tiering): TieredStorageV2 (#2849)
Browse files Browse the repository at this point in the history
Implements basic high-level tiered storage

---------

Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg authored Apr 9, 2024
1 parent b8693b4 commit 23106d4
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 8 deletions.
4 changes: 3 additions & 1 deletion src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "server/test_utils.h"

#include "server/acl/acl_commands_def.h"
#include "util/fibers/fibers.h"

extern "C" {
#include "redis/zmalloc.h"
Expand Down Expand Up @@ -661,7 +662,8 @@ void BaseFamilyTest::ExpectConditionWithinTimeout(const std::function<bool()>& c
if (condition()) {
break;
}
absl::SleepFor(absl::Milliseconds(10));
ThisFiber::SleepFor(5ms);
// absl::SleepFor(absl::Milliseconds(10)); ??
}

EXPECT_LE(absl::Now(), deadline)
Expand Down
133 changes: 132 additions & 1 deletion src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,31 @@

#include <mimalloc.h>

#include <memory>
#include <optional>
#include <variant>

#include "absl/cleanup/cleanup.h"
#include "absl/flags/internal/flag.h"
#include "base/flags.h"
#include "base/logging.h"
#include "server/common.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "util/fibers/fibers.h"
#include "server/table.h"
#include "server/tiering/common.h"
#include "server/tiering/op_manager.h"
#include "server/tiering/small_bins.h"

ABSL_FLAG(uint32_t, tiered_storage_max_pending_writes, 32,
"Maximal number of pending writes per thread");
ABSL_FLAG(uint32_t, tiered_storage_throttle_us, 1,
"Slow down tiered storage writes for at most this usec in case of I/O saturation "
"specified by tiered_storage_max_pending_writes. 0 - do not throttle.");

ABSL_FLAG(bool, tiered_storage_v2_cache_fetched, true,
"WIP: Load results of offloaded reads to memory");

namespace dfly {

using namespace std;
Expand Down Expand Up @@ -770,4 +782,123 @@ bool TieredStorage::CanExternalizeEntry(PrimeIterator it) {
!it->second.IsExternal() && EligibleForOffload(it->second.Size());
}

class TieredStorageV2::ShardOpManager : public tiering::OpManager {
public:
ShardOpManager(TieredStorageV2* ts, DbSlice* db_slice) : ts_{ts}, db_slice_{db_slice} {
cache_fetched_ = absl::GetFlag(FLAGS_tiered_storage_v2_cache_fetched);
}

// Find entry by key in db_slice and store external segment in place of original value
void SetExternal(std::string_view key, tiering::DiskSegment segment) {
if (auto pv = Find(key); pv) {
pv->SetIoPending(false);
pv->SetExternal(segment.offset, segment.length); // TODO: Handle memory stats
}
}

void ClearIoPending(std::string_view key) {
if (auto pv = Find(key); pv)
pv->SetIoPending(false);
}

// Find entry by key and store it's up-to-date value in place of external segment
void SetInMemory(std::string_view key, std::string_view value) {
if (auto pv = Find(key); pv) {
pv->Reset(); // TODO: account for memory
pv->SetString(value);
}
}

void ReportStashed(EntryId id, tiering::DiskSegment segment) override {
if (holds_alternative<string_view>(id)) {
SetExternal(get<string_view>(id), segment);
} else {
for (const auto& [sub_key, sub_segment] :
ts_->bins_->ReportStashed(get<tiering::SmallBins::BinId>(id), segment))
SetExternal(string_view{sub_key}, sub_segment);
}
}

void ReportFetched(EntryId id, std::string_view value, tiering::DiskSegment segment) override {
DCHECK(holds_alternative<string_view>(id)); // we never issue reads for bins

if (!cache_fetched_)
return;

SetInMemory(get<string_view>(id), value);

// Delete value
if (segment.length >= 2_KB) {
Delete(segment);
} else {
if (auto bin_segment = ts_->bins_->Delete(segment); bin_segment)
Delete(*bin_segment);
}
}

private:
PrimeValue* Find(std::string_view key) {
// TODO: Get DbContext for transaction for correct dbid and time
auto it = db_slice_->FindMutable(DbContext{}, key);
return IsValid(it.it) ? &it.it->second : nullptr;
}

bool cache_fetched_ = false;
TieredStorageV2* ts_;
DbSlice* db_slice_;
};

TieredStorageV2::TieredStorageV2(DbSlice* db_slice)
: op_manager_{make_unique<ShardOpManager>(this, db_slice)},
bins_{make_unique<tiering::SmallBins>()} {
}

TieredStorageV2::~TieredStorageV2() {
}

std::error_code TieredStorageV2::Open(string_view path) {
return op_manager_->Open(path);
}

void TieredStorageV2::Close() {
op_manager_->Close();
}

util::fb2::Future<std::string> TieredStorageV2::Read(string_view key, const PrimeValue& value) {
DCHECK(value.IsExternal());
return op_manager_->Read(key, value.GetExternalSlice());
}

void TieredStorageV2::Stash(string_view key, PrimeValue* value) {
string buf;
string_view value_sv = value->GetSlice(&buf);
value->SetIoPending(true);

if (value->Size() >= 2_KB) {
if (auto ec = op_manager_->Stash(key, value_sv); ec)
value->SetIoPending(false);
} else if (auto bin = bins_->Stash(key, value_sv); bin) {
if (auto ec = op_manager_->Stash(bin->first, bin->second); ec) {
for (const string& key : bins_->ReportStashAborted(bin->first))
op_manager_->ClearIoPending(key); // clear IO_PENDING flag
}
}
}
void TieredStorageV2::Delete(string_view key, PrimeValue* value) {
if (value->IsExternal()) {
tiering::DiskSegment segment = value->GetExternalSlice();
if (segment.length >= 2_KB) {
op_manager_->Delete(segment);
} else if (auto bin = bins_->Delete(segment); bin) {
op_manager_->Delete(*bin);
}
} else {
if (value->Size() >= 2_KB) {
op_manager_->Delete(key);
} else if (auto bin = bins_->Delete(key); bin) {
op_manager_->Delete(*bin);
}
}
}

} // namespace dfly
35 changes: 35 additions & 0 deletions src/server/tiered_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//
#pragma once

#include <memory>

#include "util/fibers/future.h"
#ifdef __linux__

#include <absl/container/flat_hash_map.h>
Expand All @@ -16,6 +19,38 @@ namespace dfly {

class DbSlice;

namespace tiering {
class SmallBins;
};

// Manages offloaded values
class TieredStorageV2 {
class ShardOpManager;

public:
explicit TieredStorageV2(DbSlice* db_slice);
~TieredStorageV2(); // drop forward declared unique_ptrs

TieredStorageV2(TieredStorageV2&& other) = delete;
TieredStorageV2(const TieredStorageV2& other) = delete;

std::error_code Open(std::string_view path);
void Close();

// Read offloaded value. It must be of external type
util::fb2::Future<std::string> Read(std::string_view key, const PrimeValue& value);

// Stash value. Sets IO_PENDING flag and unsets it on error or when finished
void Stash(std::string_view key, PrimeValue* value);

// Delete value. Must either have pending IO or be offloaded (of external type)
void Delete(std::string_view key, PrimeValue* value);

private:
std::unique_ptr<ShardOpManager> op_manager_;
std::unique_ptr<tiering::SmallBins> bins_;
};

class TieredStorage {
public:
enum : uint16_t { kMinBlobLen = 64 };
Expand Down
87 changes: 87 additions & 0 deletions src/server/tiered_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,27 @@
// See LICENSE for licensing terms.
//

#include "server/tiered_storage.h"

#include <absl/strings/str_cat.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "absl/flags/internal/flag.h"
#include "base/flags.h"
#include "base/logging.h"
#include "facade/facade_test.h"
#include "server/engine_shard_set.h"
#include "server/test_utils.h"
#include "util/fibers/fibers.h"

using namespace std;
using namespace testing;
using absl::SetFlag;
using absl::StrCat;

ABSL_DECLARE_FLAG(string, tiered_prefix);
ABSL_DECLARE_FLAG(bool, tiered_storage_v2_cache_fetched);

namespace dfly {

Expand All @@ -33,6 +40,35 @@ class TieredStorageTest : public BaseFamilyTest {
static void SetUpTestSuite();
};

class TieredStorageV2Test : public BaseFamilyTest {
protected:
TieredStorageV2Test() {
num_threads_ = 1;
}

void SetUp() override {
// TODO: Use FlagSaver if there is need to run V1 tests after V2
absl::SetFlag(&FLAGS_tiered_prefix, "");
absl::SetFlag(&FLAGS_tiered_storage_v2_cache_fetched, true);

BaseFamilyTest::SetUp();
auto* shard = shard_set->Await(0, [] { return EngineShard::tlocal(); });
storage_.emplace(&shard->db_slice());
shard_set->Await(0, [storage = &*storage_] {
auto ec = storage->Open(absl::StrCat("/tmp/tiered_storage_test", 1));
EXPECT_FALSE(ec);
});
}

void TearDown() override {
shard_set->Await(0, [storage = &*storage_] { storage->Close(); });
BaseFamilyTest::TearDown();
}

public:
std::optional<TieredStorageV2> storage_;
};

void TieredStorageTest::SetUpTestSuite() {
BaseFamilyTest::SetUpTestSuite();
SetFlag(&FLAGS_tiered_prefix, "/tmp/spill");
Expand Down Expand Up @@ -318,4 +354,55 @@ TEST_F(TieredStorageTest, GetValueValidation) {
EXPECT_EQ(m.db_stats[0].tiered_entries, 0);
}

TEST_F(TieredStorageV2Test, SimpleStash) {
// Create simple values
vector<pair<string, string>> values(20);
for (unsigned i = 0; i < values.size(); i++) {
// 3 kb is above small bins size
values[i] = {absl::StrCat("key", i), string(3_KB, char('A' + i))};
Run({"set", values[i].first, values[i].second});
}

vector<util::fb2::Future<string>> futures;
shard_set->Await(0, [this, &values, &futures] {
auto& db_slice = EngineShard::tlocal()->db_slice();

// Schedule STASH for values
for (const auto& [key, _] : values) {
auto it = db_slice.FindMutable(DbContext{}, key);
storage_->Stash(key, &it.it->second);
}

// Wait for all values to be stashed
ExpectConditionWithinTimeout([&values, &db_slice] {
for (auto [key, _] : values)
if (db_slice.FindMutable(DbContext{}, key).it->second.HasIoPending())
return false;
return true;
});

// Now read all the values
for (const auto& [key, _] : values) {
auto it = db_slice.FindMutable(DbContext{}, key);
EXPECT_TRUE(it.it->second.IsExternal());
futures.emplace_back(storage_->Read(key, it.it->second));
}
});

// Wait for futures and assert correct values were read
for (unsigned i = 0; i < values.size(); i++)
EXPECT_EQ(futures[i].get(), values[i].second);

shard_set->Await(0, [&values] {
auto& db_slice = EngineShard::tlocal()->db_slice();

// Make sure all values were loaded back to memory
for (const auto& [key, value] : values) {
auto it = db_slice.FindMutable(DbContext{}, key);
std::string buf;
EXPECT_EQ(it.it->second.GetSlice(&buf), value);
}
});
}

} // namespace dfly
12 changes: 6 additions & 6 deletions src/server/tiering/op_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,18 @@ void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment
}

void OpManager::ProcessRead(size_t offset, std::string_view value) {
auto node = pending_reads_.extract(offset);
ReadOp& info = node.mapped();
ReadOp* info = &pending_reads_.at(offset);

for (auto& ko : info.key_ops) {
auto key_value = value.substr(ko.segment.offset - info.segment.offset, ko.segment.length);
for (auto& ko : info->key_ops) {
auto key_value = value.substr(ko.segment.offset - info->segment.offset, ko.segment.length);
for (auto& fut : ko.futures)
fut.set_value(std::string{key_value});
ReportFetched(Borrowed(ko.id), key_value, ko.segment);
}

if (info.delete_requested)
storage_.MarkAsFree(info.segment);
if (info->delete_requested)
storage_.MarkAsFree(info->segment);
pending_reads_.erase(offset);
}

OpManager::EntryOps& OpManager::ReadOp::ForId(EntryId id, DiskSegment key_segment) {
Expand Down

0 comments on commit 23106d4

Please sign in to comment.