From 7658332b55002e292b1aa8b72a575cc173cd5553 Mon Sep 17 00:00:00 2001 From: Borys Date: Wed, 4 Dec 2024 13:52:36 +0200 Subject: [PATCH] test: add test for PendingBuf --- src/server/journal/journal_test.cc | 70 ++++++++++++++++++++++++++++++ src/server/journal/streamer.cc | 2 +- src/server/journal/streamer.h | 50 +-------------------- 3 files changed, 72 insertions(+), 50 deletions(-) diff --git a/src/server/journal/journal_test.cc b/src/server/journal/journal_test.cc index b15f3f46a107..649f2f051121 100644 --- a/src/server/journal/journal_test.cc +++ b/src/server/journal/journal_test.cc @@ -2,6 +2,7 @@ #include "base/gtest.h" #include "base/logging.h" +#include "server/journal/pending_buf.h" #include "server/journal/serializer.h" #include "server/journal/types.h" #include "server/serializer_commons.h" @@ -125,5 +126,74 @@ TEST(Journal, WriteRead) { } } +TEST(Journal, PendingBuf) { + PendingBuf pbuf; + + ASSERT_TRUE(pbuf.empty()); + ASSERT_EQ(pbuf.size(), 0); + + pbuf.push("one"); + pbuf.push(" small"); + pbuf.push(" test"); + + ASSERT_FALSE(pbuf.empty()); + ASSERT_EQ(pbuf.size(), 14); + + { + auto& sending_buf = pbuf.PrepareSendingBuf(); + ASSERT_EQ(sending_buf.buf.size(), 3); + ASSERT_EQ(sending_buf.mem_size, 14); + + ASSERT_EQ(sending_buf.buf[0], "one"); + ASSERT_EQ(sending_buf.buf[1], " small"); + ASSERT_EQ(sending_buf.buf[2], " test"); + } + + const size_t string_num = 2000; + for (size_t i = 0; i < string_num; ++i) { + pbuf.push("big_test"); + } + + ASSERT_FALSE(pbuf.empty()); + ASSERT_EQ(pbuf.size(), 14 + string_num * 8); + + pbuf.Pop(); + + ASSERT_FALSE(pbuf.empty()); + ASSERT_EQ(pbuf.size(), string_num * 8); + + const auto next_buf_size = std::min(PendingBuf::Buf::max_buf_size, string_num); + { + auto& sending_buf = pbuf.PrepareSendingBuf(); + ASSERT_EQ(sending_buf.buf.size(), next_buf_size); + ASSERT_EQ(sending_buf.mem_size, next_buf_size * 8); + + for (const auto& s : sending_buf.buf) { + ASSERT_EQ(s, "big_test"); + } + } + + pbuf.Pop(); + + if (next_buf_size < string_num) { + const auto last_buf_size = string_num - next_buf_size; + ASSERT_FALSE(pbuf.empty()); + ASSERT_EQ(pbuf.size(), last_buf_size * 8); + + auto& sending_buf = pbuf.PrepareSendingBuf(); + ASSERT_EQ(sending_buf.buf.size(), last_buf_size); + ASSERT_EQ(sending_buf.mem_size, last_buf_size * 8); + + for (const auto& s : sending_buf.buf) { + ASSERT_EQ(s, "big_test"); + } + + pbuf.Pop(); + } + + ASSERT_TRUE(pbuf.empty()); + ASSERT_EQ(pbuf.size(), 0); +} + } // namespace journal } // namespace dfly diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 5f1fe83ea91b..02d7f1c916dc 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -99,7 +99,7 @@ void JournalStreamer::AsyncWrite() { return; } - const auto& cur_buf = pending_buf_.PrepareNext(); + const auto& cur_buf = pending_buf_.PrepareSendingBuf(); in_flight_bytes_ = cur_buf.mem_size; total_sent_ += cur_buf.mem_size; diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 5055bd018c2f..a18615a053f1 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -9,6 +9,7 @@ #include "server/common.h" #include "server/db_slice.h" #include "server/journal/journal.h" +#include "server/journal/pending_buf.h" #include "server/journal/serializer.h" #include "server/rdb_save.h" @@ -66,55 +67,6 @@ class JournalStreamer { journal::Journal* journal_; - class PendingBuf { - public: - struct Buf { - size_t mem_size = 0; - absl::InlinedVector buf; - - static constexpr size_t max_buf_size = 1024; // depends on UIO_MAXIOV - }; - - PendingBuf() : bufs_(1) { - } - - bool empty() const { - return std::all_of(bufs_.begin(), bufs_.end(), [](const auto& b) { return b.buf.empty(); }); - } - - void push(std::string str) { - DCHECK(!bufs_.empty()); - if (bufs_.back().buf.size() == Buf::max_buf_size) { - bufs_.emplace_back(); - } - auto& fron_buf = bufs_.back(); - - fron_buf.mem_size += str.size(); - fron_buf.buf.push_back(std::move(str)); - } - - // should be called to get the next buffer for sending - const Buf& PrepareNext() { - if (bufs_.size() == 1) { - bufs_.emplace_back(); - } - return bufs_.front(); - } - - // should be called when the buf from PrepareNext() method was sent - void Pop() { - DCHECK(bufs_.size() >= 2); - bufs_.pop_front(); - } - - size_t size() const { - return std::accumulate(bufs_.begin(), bufs_.end(), 0, - [](size_t s, const auto& b) { return s + b.mem_size; }); - } - - private: - std::deque bufs_; - }; PendingBuf pending_buf_; size_t in_flight_bytes_ = 0, total_sent_ = 0;