Skip to content

Commit

Permalink
test: add test for PendingBuf
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Dec 4, 2024
1 parent c1ce60d commit 7658332
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 50 deletions.
70 changes: 70 additions & 0 deletions src/server/journal/journal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "base/gtest.h"
#include "base/logging.h"
#include "server/journal/pending_buf.h"
#include "server/journal/serializer.h"
#include "server/journal/types.h"
#include "server/serializer_commons.h"
Expand Down Expand Up @@ -125,5 +126,74 @@ TEST(Journal, WriteRead) {
}
}

TEST(Journal, PendingBuf) {
PendingBuf pbuf;

ASSERT_TRUE(pbuf.empty());
ASSERT_EQ(pbuf.size(), 0);

pbuf.push("one");
pbuf.push(" small");
pbuf.push(" test");

ASSERT_FALSE(pbuf.empty());
ASSERT_EQ(pbuf.size(), 14);

{
auto& sending_buf = pbuf.PrepareSendingBuf();
ASSERT_EQ(sending_buf.buf.size(), 3);
ASSERT_EQ(sending_buf.mem_size, 14);

ASSERT_EQ(sending_buf.buf[0], "one");
ASSERT_EQ(sending_buf.buf[1], " small");
ASSERT_EQ(sending_buf.buf[2], " test");
}

const size_t string_num = 2000;
for (size_t i = 0; i < string_num; ++i) {
pbuf.push("big_test");
}

ASSERT_FALSE(pbuf.empty());
ASSERT_EQ(pbuf.size(), 14 + string_num * 8);

pbuf.Pop();

ASSERT_FALSE(pbuf.empty());
ASSERT_EQ(pbuf.size(), string_num * 8);

const auto next_buf_size = std::min(PendingBuf::Buf::max_buf_size, string_num);
{
auto& sending_buf = pbuf.PrepareSendingBuf();
ASSERT_EQ(sending_buf.buf.size(), next_buf_size);
ASSERT_EQ(sending_buf.mem_size, next_buf_size * 8);

for (const auto& s : sending_buf.buf) {
ASSERT_EQ(s, "big_test");
}
}

pbuf.Pop();

if (next_buf_size < string_num) {
const auto last_buf_size = string_num - next_buf_size;
ASSERT_FALSE(pbuf.empty());
ASSERT_EQ(pbuf.size(), last_buf_size * 8);

auto& sending_buf = pbuf.PrepareSendingBuf();
ASSERT_EQ(sending_buf.buf.size(), last_buf_size);
ASSERT_EQ(sending_buf.mem_size, last_buf_size * 8);

for (const auto& s : sending_buf.buf) {
ASSERT_EQ(s, "big_test");
}

pbuf.Pop();
}

ASSERT_TRUE(pbuf.empty());
ASSERT_EQ(pbuf.size(), 0);
}

} // namespace journal
} // namespace dfly
2 changes: 1 addition & 1 deletion src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void JournalStreamer::AsyncWrite() {
return;
}

const auto& cur_buf = pending_buf_.PrepareNext();
const auto& cur_buf = pending_buf_.PrepareSendingBuf();

in_flight_bytes_ = cur_buf.mem_size;
total_sent_ += cur_buf.mem_size;
Expand Down
50 changes: 1 addition & 49 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "server/common.h"
#include "server/db_slice.h"
#include "server/journal/journal.h"
#include "server/journal/pending_buf.h"
#include "server/journal/serializer.h"
#include "server/rdb_save.h"

Expand Down Expand Up @@ -66,55 +67,6 @@ class JournalStreamer {

journal::Journal* journal_;

class PendingBuf {
public:
struct Buf {
size_t mem_size = 0;
absl::InlinedVector<std::string, 8> buf;

static constexpr size_t max_buf_size = 1024; // depends on UIO_MAXIOV
};

PendingBuf() : bufs_(1) {
}

bool empty() const {
return std::all_of(bufs_.begin(), bufs_.end(), [](const auto& b) { return b.buf.empty(); });
}

void push(std::string str) {
DCHECK(!bufs_.empty());
if (bufs_.back().buf.size() == Buf::max_buf_size) {
bufs_.emplace_back();
}
auto& fron_buf = bufs_.back();

fron_buf.mem_size += str.size();
fron_buf.buf.push_back(std::move(str));
}

// should be called to get the next buffer for sending
const Buf& PrepareNext() {
if (bufs_.size() == 1) {
bufs_.emplace_back();
}
return bufs_.front();
}

// should be called when the buf from PrepareNext() method was sent
void Pop() {
DCHECK(bufs_.size() >= 2);
bufs_.pop_front();
}

size_t size() const {
return std::accumulate(bufs_.begin(), bufs_.end(), 0,
[](size_t s, const auto& b) { return s + b.mem_size; });
}

private:
std::deque<Buf> bufs_;
};
PendingBuf pending_buf_;

size_t in_flight_bytes_ = 0, total_sent_ = 0;
Expand Down

0 comments on commit 7658332

Please sign in to comment.