Skip to content

Commit

Permalink
chore: implement Erase for a range (#4106)
Browse files Browse the repository at this point in the history
chore: implement Erase with a range

Also migrate more unit tests from valkey repo.
Finally, fix OpTrim

All tests `list_family_test --list_experimental_v2` pass.

Signed-off-by: Roman Gershman <[email protected]>

chore: implement OpTrim with QList
  • Loading branch information
romange authored Nov 10, 2024
1 parent 503bb4e commit 9b7af7d
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 8 deletions.
76 changes: 75 additions & 1 deletion src/core/qlist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ void QList::Compress(quicklistNode* node) {
reverse = reverse->prev;
}

if (!in_depth)
if (!in_depth && node)
CompressNodeIfNeeded(node);

/* At this point, forward and reverse are one node beyond depth */
Expand Down Expand Up @@ -1022,6 +1022,80 @@ auto QList::Erase(Iterator it) -> Iterator {
return it;
}

bool QList::Erase(const long start, unsigned count) {
if (count == 0)
return false;

unsigned extent = count; /* range is inclusive of start position */

if (start >= 0 && extent > (count_ - start)) {
/* if requesting delete more elements than exist, limit to list size. */
extent = count_ - start;
} else if (start < 0 && extent > (unsigned long)(-start)) {
/* else, if at negative offset, limit max size to rest of list. */
extent = -start; /* c.f. LREM -29 29; just delete until end. */
}

Iterator it = GetIterator(start);
quicklistNode* node = it.current_;
long offset = it.offset_;

/* iterate over next nodes until everything is deleted. */
while (extent) {
quicklistNode* next = node->next;

unsigned long del;
int delete_entire_node = 0;
if (offset == 0 && extent >= node->count) {
/* If we are deleting more than the count of this node, we
* can just delete the entire node without listpack math. */
delete_entire_node = 1;
del = node->count;
} else if (offset >= 0 && extent + offset >= node->count) {
/* If deleting more nodes after this one, calculate delete based
* on size of current node. */
del = node->count - offset;
} else if (offset < 0) {
/* If offset is negative, we are in the first run of this loop
* and we are deleting the entire range
* from this start offset to end of list. Since the Negative
* offset is the number of elements until the tail of the list,
* just use it directly as the deletion count. */
del = -offset;

/* If the positive offset is greater than the remaining extent,
* we only delete the remaining extent, not the entire offset.
*/
if (del > extent)
del = extent;
} else {
/* else, we are deleting less than the extent of this node, so
* use extent directly. */
del = extent;
}

if (delete_entire_node || QL_NODE_IS_PLAIN(node)) {
DelNode(node);
} else {
DecompressNodeIfNeeded(true, node);
node->entry = lpDeleteRange(node->entry, offset, del);
NodeUpdateSz(node);
node->count -= del;
count_ -= del;
if (node->count == 0) {
DelNode(node);
} else {
RecompressOnly(node);
}
}

extent -= del;
node = next;
offset = 0;
}
return true;
}

bool QList::Entry::operator==(std::string_view sv) const {
if (std::holds_alternative<int64_t>(value_)) {
char buf[absl::numbers_internal::kFastToBufferSize];
Expand Down
13 changes: 13 additions & 0 deletions src/core/qlist.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,25 @@ class QList {
return len_;
}

unsigned compress_param() const {
return compress_;
}

Iterator Erase(Iterator it);

// Returns true if elements were deleted, false if list has not changed.
// Negative start index is allowed.
bool Erase(const long start, unsigned count);

// Needed by tests and the rdb code.
const quicklistNode* Head() const {
return head_;
}

const quicklistNode* Tail() const {
return tail_;
}

private:
bool AllowCompression() const {
return compress_ != 0;
Expand Down
193 changes: 192 additions & 1 deletion src/core/qlist_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "core/qlist.h"

#include <absl/strings/str_cat.h>
#include <absl/strings/str_format.h>
#include <gmock/gmock.h>

#include "base/gtest.h"
Expand All @@ -21,6 +22,100 @@ namespace dfly {
using namespace std;
using namespace testing;

static int _ql_verify_compress(const QList& ql) {
int errors = 0;
unsigned compress_param = ql.compress_param();
if (compress_param > 0) {
const quicklistNode* node = ql.Head();
unsigned int low_raw = compress_param;
unsigned int high_raw = ql.node_count() - compress_param;

for (unsigned int at = 0; at < ql.node_count(); at++, node = node->next) {
if (node && (at < low_raw || at >= high_raw)) {
if (node->encoding != QUICKLIST_NODE_ENCODING_RAW) {
LOG(ERROR) << "Incorrect compression: node " << at << " is compressed at depth "
<< compress_param << " ((" << low_raw << "," << high_raw
<< " total nodes: " << ql.node_count() << "; size: " << node->sz
<< "; recompress: " << node->recompress;
errors++;
}
} else {
if (node->encoding != QUICKLIST_NODE_ENCODING_LZF && !node->attempted_compress) {
LOG(ERROR) << absl::StrFormat(
"Incorrect non-compression: node %d is NOT "
"compressed at depth %d ((%u, %u); total "
"nodes: %lu; size: %zu; recompress: %d; attempted: %d)",
at, compress_param, low_raw, high_raw, ql.node_count(), node->sz, node->recompress,
node->attempted_compress);
errors++;
}
}
}
}
return errors;
}

/* Verify list metadata matches physical list contents. */
static int ql_verify(const QList& ql, uint32_t nc, uint32_t count, uint32_t head_count,
uint32_t tail_count) {
int errors = 0;

if (nc != ql.node_count()) {
LOG(ERROR) << "quicklist length wrong: expected " << nc << " got " << ql.node_count();
errors++;
}

if (count != ql.Size()) {
LOG(ERROR) << "quicklist count wrong: expected " << count << " got " << ql.Size();
errors++;
}

auto* node = ql.Head();
size_t node_size = 0;
while (node) {
node_size += node->count;
node = node->next;
}

if (node_size != ql.Size()) {
LOG(ERROR) << "quicklist cached count not match actual count: expected " << ql.Size() << " got "
<< node_size;
errors++;
}

node = ql.Tail();
node_size = 0;
while (node) {
node_size += node->count;
node = node->prev;
}
if (node_size != ql.Size()) {
LOG(ERROR) << "has different forward count than reverse count! "
"Forward count is "
<< ql.Size() << ", reverse count is " << node_size;
errors++;
}

if (ql.node_count() == 0 && errors == 0) {
return 0;
}

if (ql.Head() && head_count != ql.Head()->count && head_count != lpLength(ql.Head()->entry)) {
LOG(ERROR) << absl::StrFormat("head count wrong: expected %u got cached %u vs. actual %lu",
head_count, ql.Head()->count, lpLength(ql.Head()->entry));
errors++;
}

if (ql.Tail() && tail_count != ql.Tail()->count && tail_count != lpLength(ql.Tail()->entry)) {
LOG(ERROR) << "tail count wrong: expected " << tail_count << "got cached " << ql.Tail()->count
<< " vs. actual " << lpLength(ql.Tail()->entry);
errors++;
}

errors += _ql_verify_compress(ql);
return errors;
}

class QListTest : public ::testing::Test {
protected:
QListTest() : mr_(mi_heap_get_backing()) {
Expand Down Expand Up @@ -183,4 +278,100 @@ TEST_P(OptionsTest, Numbers) {
EXPECT_EQ("xxxxxxxxxxxxxxxxxxxx", it.Get().view());
}

}; // namespace dfly
TEST_P(OptionsTest, DelRangeA) {
auto [fill, compress] = GetParam();
ql_ = QList(fill, compress);
long long nums[5000];
for (int i = 0; i < 33; i++) {
nums[i] = -5157318210846258176 + i;
ql_.Push(absl::StrCat(nums[i]), QList::TAIL);
}
if (fill == 32)
ql_verify(ql_, 2, 33, 32, 1);

/* ltrim 3 3 (keep [3,3] inclusive = 1 remaining) */
ql_.Erase(0, 3);
ql_.Erase(-29, 4000); /* make sure not loop forever */
if (fill == 32)
ql_verify(ql_, 1, 1, 1, 1);

auto it = ql_.GetIterator(0);
ASSERT_TRUE(it.Next());
EXPECT_EQ(-5157318210846258173, it.Get().ival());
}

TEST_P(OptionsTest, DelRangeB) {
auto [fill, _] = GetParam();
ql_ = QList(fill, QUICKLIST_NOCOMPRESS); // ignore compress parameter

long long nums[5000];
for (int i = 0; i < 33; i++) {
nums[i] = i;
ql_.Push(absl::StrCat(nums[i]), QList::TAIL);
}
if (fill == 32)
ql_verify(ql_, 2, 33, 32, 1);

/* ltrim 5 16 (keep [5,16] inclusive = 12 remaining) */
ql_.Erase(0, 5);
ql_.Erase(-16, 16);
if (fill == 32)
ql_verify(ql_, 1, 12, 12, 12);

auto it = ql_.GetIterator(0);
ASSERT_TRUE(it.Next());
EXPECT_EQ(5, it.Get().ival());

it = ql_.GetIterator(-1);
ASSERT_TRUE(it.Next());
EXPECT_EQ(16, it.Get().ival());

ql_.Push("bobobob", QList::TAIL);
it = ql_.GetIterator(-1);
ASSERT_TRUE(it.Next());
EXPECT_EQ("bobobob", it.Get().view());

for (int i = 0; i < 12; i++) {
it = ql_.GetIterator(i);
ASSERT_TRUE(it.Next());
EXPECT_EQ(i + 5, it.Get().ival());
}
}

TEST_P(OptionsTest, DelRangeC) {
auto [fill, compress] = GetParam();
ql_ = QList(fill, compress);
long long nums[5000];
for (int i = 0; i < 33; i++) {
nums[i] = -5157318210846258176 + i;
ql_.Push(absl::StrCat(nums[i]), QList::TAIL);
}
if (fill == 32)
ql_verify(ql_, 2, 33, 32, 1);

/* ltrim 3 3 (keep [3,3] inclusive = 1 remaining) */
ql_.Erase(0, 3);
ql_.Erase(-29, 4000); /* make sure not loop forever */
if (fill == 32)
ql_verify(ql_, 1, 1, 1, 1);
auto it = ql_.GetIterator(0);
ASSERT_TRUE(it.Next());
ASSERT_EQ(-5157318210846258173, it.Get().ival());
}

TEST_P(OptionsTest, DelRangeD) {
auto [fill, compress] = GetParam();
ql_ = QList(fill, compress);
long long nums[5000];
for (int i = 0; i < 33; i++) {
nums[i] = -5157318210846258176 + i;
ql_.Push(absl::StrCat(nums[i]), QList::TAIL);
}
if (fill == 32)
ql_verify(ql_, 2, 33, 32, 1);
ql_.Erase(-12, 3);

ASSERT_EQ(30, ql_.Size());
}

} // namespace dfly
18 changes: 12 additions & 6 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,8 @@ OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) {
return it_res.status();

auto it = it_res->it;
quicklist* ql = GetQL(it->second);
long llen = quicklistCount(ql);

long llen = it->second.Size();

/* convert negative indexes */
if (start < 0)
Expand All @@ -781,12 +781,18 @@ OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) {
rtrim = llen - end - 1;
}

quicklistDelRange(ql, 0, ltrim);
quicklistDelRange(ql, -rtrim, rtrim);

if (it->second.Encoding() == kEncodingQL2) {
QList* ql = GetQLV2(it->second);
ql->Erase(0, ltrim);
ql->Erase(-rtrim, rtrim);
} else {
quicklist* ql = GetQL(it->second);
quicklistDelRange(ql, 0, ltrim);
quicklistDelRange(ql, -rtrim, rtrim);
}
it_res->post_updater.Run();

if (quicklistCount(ql) == 0) {
if (it->second.Size() == 0) {
CHECK(db_slice.Del(op_args.db_cntx, it));
}
return OpStatus::OK;
Expand Down

0 comments on commit 9b7af7d

Please sign in to comment.