Skip to content

Commit

Permalink
chore: Implement list Pop/Erase functionality with QList (#4099)
Browse files Browse the repository at this point in the history
Adjusted OpRem/OpBPop/OpPop

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Nov 10, 2024
1 parent c43ba5f commit 9366c67
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 45 deletions.
34 changes: 34 additions & 0 deletions src/core/qlist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,40 @@ void QList::Push(string_view value, Where where) {
}
}

string QList::Pop(Where where) {
DCHECK_GT(count_, 0u);
quicklistNode* node;
if (where == HEAD) {
node = head_;
} else {
DCHECK_EQ(TAIL, where);
node = tail_;
}

/* The head and tail should never be compressed */
DCHECK(node->encoding != QUICKLIST_NODE_ENCODING_LZF);

string res;
if (ABSL_PREDICT_FALSE(QL_NODE_IS_PLAIN(node))) {
// TODO: We could avoid this copy by returning the pointer of the plain node.
// But the higher level APIs should support this.
res.assign(reinterpret_cast<char*>(node->entry), node->sz);
DelNode(node);
} else {
uint8_t* pos = where == HEAD ? lpFirst(node->entry) : lpLast(node->entry);
unsigned int vlen;
long long vlong;
uint8_t* vstr = lpGetValue(pos, &vlen, &vlong);
if (vstr) {
res.assign(reinterpret_cast<char*>(vstr), vlen);
} else {
res = absl::StrCat(vlong);
}
DelPackedIndex(node, &pos);
}
return res;
}

void QList::AppendListpack(unsigned char* zl) {
quicklistNode* node = CreateNode();
node->entry = zl;
Expand Down
13 changes: 11 additions & 2 deletions src/core/qlist.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ extern "C" {
#include "redis/quicklist.h"
}

#include <functional>
#include <absl/functional/function_ref.h>

#include <optional>
#include <string>
#include <variant>
Expand All @@ -35,6 +36,10 @@ class QList {
return std::get<std::string_view>(value_);
}

bool is_int() const {
return std::holds_alternative<int64_t>(value_);
}

int64_t ival() const {
return std::get<int64_t>(value_);
}
Expand Down Expand Up @@ -66,7 +71,7 @@ class QList {
friend class QList;
};

using IterateFunc = std::function<bool(Entry)>;
using IterateFunc = absl::FunctionRef<bool(Entry)>;
enum InsertOpt { BEFORE, AFTER };

QList();
Expand All @@ -85,6 +90,10 @@ class QList {
void Clear();

void Push(std::string_view value, Where where);

// Returns the popped value. Precondition: list is not empty.
std::string Pop(Where where);

void AppendListpack(unsigned char* zl);
void AppendPlain(unsigned char* zl, size_t sz);

Expand Down
151 changes: 108 additions & 43 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,25 @@ std::string OpBPop(Transaction* t, EngineShard* shard, std::string_view key, Lis
CHECK(it_res) << t->DebugId() << " " << key; // must exist and must be ok.

auto it = it_res->it;
quicklist* ql = GetQL(it->second);
std::string value;
size_t len;

if (it->second.Encoding() == OBJ_ENCODING_QUICKLIST) {
quicklist* ql = GetQL(it->second);

value = ListPop(dir, ql);
len = quicklistCount(ql);
} else {
QList* ql = GetQLV2(it->second);
QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
value = ql->Pop(where);
len = ql->Size();
}

std::string value = ListPop(dir, ql);
it_res->post_updater.Run();

OpArgs op_args = t->GetOpArgs(shard);
if (quicklistCount(ql) == 0) {
if (len == 0) {
DVLOG(1) << "deleting key " << key << " " << t->DebugId();
CHECK(op_args.GetDbSlice().Del(op_args.db_cntx, it));
}
Expand Down Expand Up @@ -346,24 +358,46 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, u
return StringVec{};

auto it = it_res->it;
quicklist* ql = GetQL(it->second);
auto prev_len = quicklistCount(ql);
size_t prev_len = 0;
StringVec res;
if (prev_len < count) {
count = prev_len;
}

if (return_results) {
res.reserve(count);
}
if (it->second.Encoding() == kEncodingQL2) {
QList* ql = GetQLV2(it->second);
prev_len = ql->Size();

if (prev_len < count) {
count = prev_len;
}

for (unsigned i = 0; i < count; ++i) {
string val = ListPop(dir, ql);
if (return_results) {
res.push_back(std::move(val));
res.reserve(count);
}
}

QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
for (unsigned i = 0; i < count; ++i) {
if (return_results) {
res.push_back(ql->Pop(where));
}
}
} else {
quicklist* ql = GetQL(it->second);
prev_len = quicklistCount(ql);

if (prev_len < count) {
count = prev_len;
}

if (return_results) {
res.reserve(count);
}

for (unsigned i = 0; i < count; ++i) {
string val = ListPop(dir, ql);
if (return_results) {
res.push_back(std::move(val));
}
}
}
it_res->post_updater.Run();

if (count == prev_len) {
Expand Down Expand Up @@ -572,46 +606,77 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, string_view ele
return it_res.status();

auto it = it_res->it;
quicklist* ql = GetQL(it->second);

int iter_direction = AL_START_HEAD;
long long index = 0;
if (count < 0) {
count = -count;
iter_direction = AL_START_TAIL;
index = -1;
}

quicklistIter qiter;
quicklistInitIterator(&qiter, ql, iter_direction, index);
quicklistEntry entry;
size_t len = 0;
unsigned removed = 0;

int64_t ival;

// try parsing the element into an integer.
int is_int = lpStringToInt64(elem.data(), elem.size(), &ival);

auto is_match = [&](const quicklistEntry& entry) {
if (is_int != (entry.value == nullptr))
return false;
if (it->second.Encoding() == kEncodingQL2) {
QList* ql = GetQLV2(it->second);
QList::Where where = QList::HEAD;

return is_int ? entry.longval == ival : ElemCompare(entry, elem);
};
if (count < 0) {
count = -count;
where = QList::TAIL;
}

auto it = ql->GetIterator(where);
auto is_match = [&](const QList::Entry& entry) {
if (is_int) {
return entry.is_int() && entry.ival() == ival;
}
return entry == elem;
};

while (quicklistNext(&qiter, &entry)) {
if (is_match(entry)) {
quicklistDelEntry(&qiter, &entry);
removed++;
if (count && removed == count)
break;
while (it.Next()) {
QList::Entry entry = it.Get();
if (is_match(entry)) {
it = ql->Erase(it);
removed++;
if (count && removed == count)
break;
}
}
len = ql->Size();
} else {
quicklist* ql = GetQL(it->second);

int iter_direction = AL_START_HEAD;
long long index = 0;
if (count < 0) {
count = -count;
iter_direction = AL_START_TAIL;
index = -1;
}
}

it_res->post_updater.Run();
quicklistIter qiter;
quicklistInitIterator(&qiter, ql, iter_direction, index);
quicklistEntry entry;

quicklistCompressIterator(&qiter);
auto is_match = [&](const quicklistEntry& entry) {
if (is_int != (entry.value == nullptr))
return false;

if (quicklistCount(ql) == 0) {
return is_int ? entry.longval == ival : ElemCompare(entry, elem);
};

while (quicklistNext(&qiter, &entry)) {
if (is_match(entry)) {
quicklistDelEntry(&qiter, &entry);
removed++;
if (count && removed == count)
break;
}
}
quicklistCompressIterator(&qiter);
len = quicklistCount(ql);
}
it_res->post_updater.Run();

if (len == 0) {
CHECK(db_slice.Del(op_args.db_cntx, it));
}

Expand Down

0 comments on commit 9366c67

Please sign in to comment.