Skip to content

Commit

Permalink
feat(snapshot): move load path to snapshot storage (#1830)
Browse files Browse the repository at this point in the history
* feat(snapshot): move snapshot storage to new file

* feat(snapshot): move file load path to snapshot storage

* feat(snapshot): move load paths to snapshot storage

* feat(snapshot): move open read file to snapshot storage

* feat(snapshot): remove redundant includes from save stages controller

* feat(snapshot): remove sv string literal
  • Loading branch information
andydunstall authored Sep 11, 2023
1 parent fcebb6a commit 9fac9a7
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 275 deletions.
1 change: 1 addition & 0 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc
protocol_client.cc
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
detail/save_stages_controller.cc
detail/snapshot_storage.cc
set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
Expand Down
192 changes: 1 addition & 191 deletions src/server/detail/save_stages_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,14 @@
#include "server/detail/save_stages_controller.h"

#include <absl/strings/match.h>
#include <absl/strings/str_replace.h>
#include <absl/strings/strip.h>

#include "base/flags.h"
#include "base/logging.h"
#include "io/file_util.h"
#include "server/main_service.h"
#include "server/script_mgr.h"
#include "server/search/doc_index.h"
#include "server/transaction.h"
#include "strings/human_readable.h"
#include "util/cloud/s3.h"
#include "util/fibers/fiber_file.h"
#include "util/uring/uring_file.h"

using namespace std;

Expand All @@ -38,39 +32,14 @@ namespace fs = std::filesystem;

namespace {

const size_t kBucketConnectMs = 2000;

#ifdef __linux__
const int kRdbWriteFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT;
#endif

constexpr string_view kS3Prefix = "s3://"sv;

bool IsCloudPath(string_view path) {
return absl::StartsWith(path, kS3Prefix);
}

// Returns bucket_name, obj_path for an s3 path.
optional<pair<string, string>> GetBucketPath(string_view path) {
string_view clean = absl::StripPrefix(path, kS3Prefix);

size_t pos = clean.find('/');
if (pos == string_view::npos)
return nullopt;

string bucket_name{clean.substr(0, pos)};
string obj_path{clean.substr(pos + 1)};
return make_pair(move(bucket_name), move(obj_path));
}

string FormatTs(absl::Time now) {
return absl::FormatTime("%Y-%m-%dT%H:%M:%S", now, absl::LocalTimeZone());
}

void SubstituteFilenameTsPlaceholder(fs::path* filename, std::string_view replacement) {
*filename = absl::StrReplaceAll(filename->string(), {{"{timestamp}", replacement}});
}

// Create a directory and all its parents if they don't exist.
error_code CreateDirs(fs::path dir_path) {
error_code ec;
Expand All @@ -94,32 +63,6 @@ void ExtendDfsFilenameWithShard(int shard, string_view extension, fs::path* file
SetExtension(absl::Dec(shard, absl::kZeroPad4), extension, filename);
}

// takes ownership over the file.
class LinuxWriteWrapper : public io::Sink {
public:
LinuxWriteWrapper(fb2::LinuxFile* lf) : lf_(lf) {
}

io::Result<size_t> WriteSome(const iovec* v, uint32_t len) final;

error_code Close() {
return lf_->Close();
}

private:
unique_ptr<fb2::LinuxFile> lf_;
off_t offset_ = 0;
};

io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
io::Result<size_t> res = lf_->WriteSome(v, len, offset_, 0);
if (res) {
offset_ += *res;
}

return res;
}

} // namespace

GenericError ValidateFilename(const fs::path& filename, bool new_version) {
Expand Down Expand Up @@ -151,144 +94,11 @@ GenericError ValidateFilename(const fs::path& filename, bool new_version) {
return {};
}

FileSnapshotStorage::FileSnapshotStorage(FiberQueueThreadPool* fq_threadpool)
: fq_threadpool_{fq_threadpool} {
}

io::Result<std::pair<io::Sink*, uint8_t>, GenericError> FileSnapshotStorage::OpenFile(
const std::string& path) {
if (fq_threadpool_) { // EPOLL
auto res = util::OpenFiberWriteFile(path, fq_threadpool_);
if (!res) {
return nonstd::make_unexpected(GenericError(res.error(), "Couldn't open file for writing"));
}

return std::pair(*res, FileType::FILE);
} else {
#ifdef __linux__
auto res = OpenLinux(path, kRdbWriteFlags, 0666);
if (!res) {
return nonstd::make_unexpected(GenericError(
res.error(),
"Couldn't open file for writing (is direct I/O supported by the file system?)"));
}

uint8_t file_type = FileType::FILE | FileType::IO_URING;
if (kRdbWriteFlags & O_DIRECT) {
file_type |= FileType::DIRECT;
}
return std::pair(new LinuxWriteWrapper(res->release()), file_type);
#else
LOG(FATAL) << "Linux I/O is not supported on this platform";
#endif
}
}

AwsS3SnapshotStorage::AwsS3SnapshotStorage(util::cloud::AWS* aws) : aws_{aws} {
}

io::Result<std::pair<io::Sink*, uint8_t>, GenericError> AwsS3SnapshotStorage::OpenFile(
const std::string& path) {
DCHECK(aws_);

optional<pair<string, string>> bucket_path = GetBucketPath(path);
if (!bucket_path) {
return nonstd::make_unexpected(GenericError("Invalid S3 path"));
}
auto [bucket_name, obj_path] = *bucket_path;

cloud::S3Bucket bucket(*aws_, bucket_name);
error_code ec = bucket.Connect(kBucketConnectMs);
if (ec) {
return nonstd::make_unexpected(GenericError(ec, "Couldn't connect to S3 bucket"));
}
auto res = bucket.OpenWriteFile(obj_path);
if (!res) {
return nonstd::make_unexpected(GenericError(res.error(), "Couldn't open file for writing"));
}

return std::pair<io::Sink*, uint8_t>(*res, FileType::CLOUD);
}

string InferLoadFile(string_view dir, cloud::AWS* aws) {
fs::path data_folder;
string bucket_name, obj_path;

if (dir.empty()) {
data_folder = fs::current_path();
} else {
if (IsCloudPath(dir)) {
CHECK(aws);
auto res = GetBucketPath(dir);
if (!res) {
LOG(ERROR) << "Invalid S3 path: " << dir;
return {};
}
data_folder = dir;
bucket_name = res->first;
obj_path = res->second;
} else {
error_code file_ec;
data_folder = fs::canonical(dir, file_ec);
if (file_ec) {
LOG(ERROR) << "Data directory error: " << file_ec.message() << " for dir " << dir;
return {};
}
}
}

LOG(INFO) << "Data directory is " << data_folder;

const auto& dbname = GetFlag(FLAGS_dbfilename);
if (dbname.empty())
return string{};

if (IsCloudPath(dir)) {
cloud::S3Bucket bucket(*aws, bucket_name);
ProactorBase* proactor = shard_set->pool()->GetNextProactor();
auto ec = proactor->Await([&] { return bucket.Connect(kBucketConnectMs); });
if (ec) {
LOG(ERROR) << "Couldn't connect to S3 bucket: " << ec.message();
return {};
}

fs::path fl_path{obj_path};
fl_path.append(dbname);

LOG(INFO) << "Loading from s3 path s3://" << bucket_name << "/" << fl_path;
// TODO: to load from S3 file.
return {};
}

fs::path fl_path = data_folder.append(dbname);
if (fs::exists(fl_path))
return fl_path.generic_string();

SubstituteFilenameTsPlaceholder(&fl_path, "*");
if (!fl_path.has_extension()) {
fl_path += "*";
}
io::Result<io::StatShortVec> short_vec = io::StatFiles(fl_path.generic_string());

if (short_vec) {
// io::StatFiles returns a list of sorted files. Because our timestamp format has the same
// time order and lexicographic order we iterate from the end to find the latest snapshot.
auto it = std::find_if(short_vec->rbegin(), short_vec->rend(), [](const auto& stat) {
return absl::EndsWith(stat.name, ".rdb") || absl::EndsWith(stat.name, "summary.dfs");
});
if (it != short_vec->rend())
return it->name;
} else {
LOG(WARNING) << "Could not stat " << fl_path << ", error " << short_vec.error().message();
}
return string{};
}

GenericError RdbSnapshot::Start(SaveMode save_mode, const std::string& path,
const RdbSaver::GlobalData& glob_data) {
VLOG(1) << "Saving RDB " << path;

auto res = snapshot_storage_->OpenFile(path);
auto res = snapshot_storage_->OpenWriteFile(path);
if (!res) {
return res.error();
}
Expand Down
40 changes: 1 addition & 39 deletions src/server/detail/save_stages_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <filesystem>

#include "server/detail/snapshot_storage.h"
#include "server/rdb_save.h"
#include "server/server_family.h"
#include "util/cloud/aws.h"
Expand All @@ -19,45 +20,6 @@ class Service;

namespace detail {

enum FileType : uint8_t {
FILE = (1u << 0),
CLOUD = (1u << 1),
IO_URING = (1u << 2),
DIRECT = (1u << 3),
};

class SnapshotStorage {
public:
virtual ~SnapshotStorage() = default;

// Opens the file at the given path, and returns the open file and file
// type, which is a bitmask of FileType.
virtual io::Result<std::pair<io::Sink*, uint8_t>, GenericError> OpenFile(
const std::string& path) = 0;
};

class FileSnapshotStorage : public SnapshotStorage {
public:
FileSnapshotStorage(FiberQueueThreadPool* fq_threadpool);

io::Result<std::pair<io::Sink*, uint8_t>, GenericError> OpenFile(
const std::string& path) override;

private:
util::fb2::FiberQueueThreadPool* fq_threadpool_;
};

class AwsS3SnapshotStorage : public SnapshotStorage {
public:
AwsS3SnapshotStorage(util::cloud::AWS* aws);

io::Result<std::pair<io::Sink*, uint8_t>, GenericError> OpenFile(
const std::string& path) override;

private:
util::cloud::AWS* aws_;
};

struct SaveStagesInputs {
bool use_dfs_format_;
std::string_view basename_;
Expand Down
Loading

0 comments on commit 9fac9a7

Please sign in to comment.