Skip to content

Commit

Permalink
feat: add current_fork_perc in info all command (#2640)
Browse files Browse the repository at this point in the history
* add field current_snapshot_perc (instead of current_fork_perc)
* add field current_save_keys_processed
* add field current_save_keys_total
  • Loading branch information
kostasrim authored Feb 26, 2024
1 parent d54b600 commit d54f220
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 11 deletions.
31 changes: 31 additions & 0 deletions src/server/detail/save_stages_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <absl/strings/match.h>

#include <numeric>

#include "base/flags.h"
#include "base/logging.h"
#include "server/detail/snapshot_storage.h"
Expand Down Expand Up @@ -129,6 +131,11 @@ size_t RdbSnapshot::GetSaveBuffersSize() {
return saver_->GetTotalBuffersSize();
}

RdbSaver::SnapshotStats RdbSnapshot::GetCurrentSnapshotProgress() const {
CHECK(saver_);
return saver_->GetCurrentSnapshotProgress();
}

error_code RdbSnapshot::Close() {
#ifdef __linux__
if (is_linux_file_) {
Expand Down Expand Up @@ -202,6 +209,30 @@ size_t SaveStagesController::GetSaveBuffersSize() {
return total_bytes.load(memory_order_relaxed);
}

RdbSaver::SnapshotStats SaveStagesController::GetCurrentSnapshotProgress() const {
if (snapshots_.size() == 0) {
return {0, 0};
}

std::vector<RdbSaver::SnapshotStats> results(snapshots_.size());
auto fetch = [this, &results](ShardId sid) {
if (auto& snapshot = snapshots_[sid].first; snapshot) {
results[sid] = snapshot->GetCurrentSnapshotProgress();
}
};

if (use_dfs_format_) {
shard_set->RunBriefInParallel([&](EngineShard* es) { fetch(es->shard_id()); });
RdbSaver::SnapshotStats init{0, 0};
return std::accumulate(
results.begin(), results.end(), init, [](auto init, auto pr) -> RdbSaver::SnapshotStats {
return {init.current_keys + pr.current_keys, init.total_keys + pr.total_keys};
});
}
fetch(0);
return results[0];
}

// In the new version (.dfs) we store a file for every shard and one more summary file.
// Summary file is always last in snapshots array.
void SaveStagesController::SaveDfs() {
Expand Down
3 changes: 3 additions & 0 deletions src/server/detail/save_stages_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class RdbSnapshot {
error_code Close();
size_t GetSaveBuffersSize();

RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const;

const RdbTypeFreqMap& freq_map() const {
return freq_map_;
}
Expand Down Expand Up @@ -88,6 +90,7 @@ struct SaveStagesController : public SaveStagesInputs {
SaveInfo Finalize();
size_t GetSaveBuffersSize();
uint32_t GetCurrentSaveDuration();
RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const;

private:
// In the new version (.dfs) we store a file for every shard and one more summary file.
Expand Down
27 changes: 27 additions & 0 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,8 @@ class RdbSaver::Impl {

size_t GetTotalBuffersSize() const;

RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const;

error_code Flush() {
return aligned_buf_ ? aligned_buf_->Flush() : error_code{};
}
Expand Down Expand Up @@ -1265,6 +1267,27 @@ size_t RdbSaver::Impl::GetTotalBuffersSize() const {
return channel_bytes.load(memory_order_relaxed) + serializer_bytes.load(memory_order_relaxed);
}

RdbSaver::SnapshotStats RdbSaver::Impl::GetCurrentSnapshotProgress() const {
std::vector<RdbSaver::SnapshotStats> results(shard_snapshots_.size());

auto cb = [this, &results](ShardId sid) {
auto& snapshot = shard_snapshots_[sid];
results[sid] = snapshot->GetCurrentSnapshotProgress();
};

if (shard_snapshots_.size() == 1) {
cb(0);
return results[0];
}

shard_set->RunBriefInParallel([&](EngineShard* es) { cb(es->shard_id()); });
RdbSaver::SnapshotStats init{0, 0};
return std::accumulate(
results.begin(), results.end(), init, [](auto init, auto pr) -> RdbSaver::SnapshotStats {
return {init.current_keys + pr.current_keys, init.total_keys + pr.total_keys};
});
}

RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) {
StringVec script_bodies, search_indices;

Expand Down Expand Up @@ -1463,6 +1486,10 @@ size_t RdbSaver::GetTotalBuffersSize() const {
return impl_->GetTotalBuffersSize();
}

RdbSaver::SnapshotStats RdbSaver::GetCurrentSnapshotProgress() const {
return impl_->GetCurrentSnapshotProgress();
}

void SerializerBase::AllocateCompressorOnce() {
if (compressor_impl_) {
return;
Expand Down
7 changes: 7 additions & 0 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ class RdbSaver {
// Get total size of all rdb serializer buffers and items currently placed in channel
size_t GetTotalBuffersSize() const;

struct SnapshotStats {
size_t current_keys = 0;
size_t total_keys = 0;
};

SnapshotStats GetCurrentSnapshotProgress() const;

// Fetch global data to be serialized in summary part of a snapshot / full sync.
static GlobalData GetGlobalData(const Service* service);

Expand Down
31 changes: 21 additions & 10 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1940,26 +1940,37 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
}

if (should_enter("PERSISTENCE", true)) {
auto save_info = GetLastSaveInfo();

// when last success save
append("last_success_save", save_info.save_time);
append("last_saved_file", save_info.file_name);
append("last_success_save_duration_sec", save_info.success_duration_sec);

size_t is_loading = service_.GetGlobalState() == GlobalState::LOADING;
append("loading", is_loading);

size_t current_snap_keys = 0;
size_t total_snap_keys = 0;
double perc = 0;
bool is_saving = false;
uint32_t curent_durration_sec = 0;
{
lock_guard lk{save_mu_};
if (save_controller_) {
is_saving = true;
curent_durration_sec = save_controller_->GetCurrentSaveDuration();
auto res = save_controller_->GetCurrentSnapshotProgress();
if (res.total_keys != 0) {
current_snap_keys = res.current_keys;
total_snap_keys = res.total_keys;
perc = (static_cast<double>(current_snap_keys) / total_snap_keys) * 100;
}
}
}

append("current_snapshot_perc", perc);
append("current_save_keys_processed", current_snap_keys);
append("current_save_keys_total", total_snap_keys);

auto save_info = GetLastSaveInfo();
// when last success save
append("last_success_save", save_info.save_time);
append("last_saved_file", save_info.file_name);
append("last_success_save_duration_sec", save_info.success_duration_sec);

size_t is_loading = service_.GetGlobalState() == GlobalState::LOADING;
append("loading", is_loading);
append("saving", is_saving);
append("current_save_duration_sec", curent_durration_sec);

Expand Down
8 changes: 8 additions & 0 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
}

PrimeTable::Cursor cursor;
for (DbIndex db_indx = 0; db_indx < db_array_.size(); ++db_indx) {
stats_.keys_total += db_slice_->DbSize(db_indx);
}

for (DbIndex db_indx = 0; db_indx < db_array_.size(); ++db_indx) {
if (cll->IsCancelled())
return;
Expand Down Expand Up @@ -360,4 +364,8 @@ size_t SliceSnapshot::GetTotalChannelCapacity() const {
return dest_->GetSize();
}

RdbSaver::SnapshotStats SliceSnapshot::GetCurrentSnapshotProgress() const {
return {stats_.loop_serialized + stats_.side_saved, stats_.keys_total};
}

} // namespace dfly
7 changes: 6 additions & 1 deletion src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ class SliceSnapshot {
size_t GetTotalBufferCapacity() const; // In bytes
size_t GetTotalChannelCapacity() const; // In bytes

RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const;

private:
DbSlice* db_slice_;
DbTableArray db_array_;
Expand All @@ -151,8 +153,11 @@ class SliceSnapshot {
uint64_t rec_id_ = 0;

struct Stats {
size_t loop_serialized = 0, skipped = 0, side_saved = 0;
size_t loop_serialized = 0;
size_t skipped = 0;
size_t side_saved = 0;
size_t savecb_calls = 0;
size_t keys_total = 0;
} stats_;
};

Expand Down

0 comments on commit d54f220

Please sign in to comment.