Skip to content

Commit

Permalink
chore: Implement LMOVE over QList (#4104)
Browse files Browse the repository at this point in the history
* chore: Implement LMOVE over QList

All tests in list_family_test besides LTrim pass.

---------

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Nov 10, 2024
1 parent 1eef773 commit f745f31
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 36 deletions.
120 changes: 84 additions & 36 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ void* listPopSaver(unsigned char* data, size_t sz) {
return new string((char*)data, sz);
}

QList::Where ToWhere(ListDir dir) {
return dir == ListDir::LEFT ? QList::HEAD : QList::TAIL;
}

enum InsertParam { INSERT_BEFORE, INSERT_AFTER };

string ListPop(ListDir dir, quicklist* ql) {
Expand Down Expand Up @@ -166,7 +170,7 @@ std::string OpBPop(Transaction* t, EngineShard* shard, std::string_view key, Lis
len = quicklistCount(ql);
} else {
QList* ql = GetQLV2(it->second);
QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
QList::Where where = ToWhere(dir);
value = ql->Pop(where);
len = ql->Size();
}
Expand Down Expand Up @@ -195,19 +199,37 @@ OpResult<string> OpMoveSingleShard(const OpArgs& op_args, string_view src, strin
return src_res.status();

auto src_it = src_res->it;
quicklist* src_ql = GetQL(src_it->second);
quicklist* src_ql = nullptr;
QList* srcql_v2 = nullptr;
quicklist* dest_ql = nullptr;
QList* destql_v2 = nullptr;
string val;
size_t prev_len = 0;

if (src == dest) { // simple case.
string val = ListPop(src_dir, src_ql);
if (src_it->second.Encoding() == OBJ_ENCODING_QUICKLIST) {
src_ql = GetQL(src_it->second);
prev_len = quicklistCount(src_ql);
} else {
DCHECK_EQ(src_it->second.Encoding(), kEncodingQL2);
srcql_v2 = GetQLV2(src_it->second);
prev_len = srcql_v2->Size();
}

int pos = (dest_dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
quicklistPush(src_ql, val.data(), val.size(), pos);
if (src == dest) { // simple case.
if (src_ql) {
val = ListPop(src_dir, src_ql);
int pos = (dest_dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
quicklistPush(src_ql, val.data(), val.size(), pos);
} else {
val = srcql_v2->Pop(ToWhere(src_dir));
srcql_v2->Push(val, ToWhere(dest_dir));
}

return val;
}

quicklist* dest_ql = nullptr;
src_res->post_updater.Run();

auto op_res = db_slice.AddOrFind(op_args.db_cntx, dest);
RETURN_ON_BAD_STATUS(op_res);
auto& dest_res = *op_res;
Expand All @@ -217,26 +239,43 @@ OpResult<string> OpMoveSingleShard(const OpArgs& op_args, string_view src, strin
src_it = src_res->it;

if (dest_res.is_new) {
dest_ql = quicklistCreate();
quicklistSetOptions(dest_ql, GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
dest_res.it->second.InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, dest_ql);
DCHECK(IsValid(src_it));
if (absl::GetFlag(FLAGS_list_experimental_v2)) {
destql_v2 = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
dest_res.it->second.InitRobj(OBJ_LIST, kEncodingQL2, destql_v2);
} else {
dest_ql = quicklistCreate();
quicklistSetOptions(dest_ql, GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
dest_res.it->second.InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, dest_ql);
}
} else {
if (dest_res.it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;

dest_ql = GetQL(dest_res.it->second);
if (dest_res.it->second.Encoding() == kEncodingQL2) {
destql_v2 = GetQLV2(dest_res.it->second);
} else {
DCHECK_EQ(dest_res.it->second.Encoding(), OBJ_ENCODING_QUICKLIST);
dest_ql = GetQL(dest_res.it->second);
}
}

string val = ListPop(src_dir, src_ql);
int pos = (dest_dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
quicklistPush(dest_ql, val.data(), val.size(), pos);
if (src_ql) {
DCHECK(dest_ql);
val = ListPop(src_dir, src_ql);
int pos = (dest_dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
quicklistPush(dest_ql, val.data(), val.size(), pos);
} else {
DCHECK(srcql_v2);
DCHECK(destql_v2);
val = srcql_v2->Pop(ToWhere(src_dir));
destql_v2->Push(val, ToWhere(dest_dir));
}

src_res->post_updater.Run();
dest_res.post_updater.Run();

if (quicklistCount(src_ql) == 0) {
if (prev_len == 1) {
CHECK(db_slice.Del(op_args.db_cntx, src_it));
}

Expand All @@ -254,17 +293,28 @@ OpResult<string> Peek(const OpArgs& op_args, string_view key, ListDir dir, bool
if (!fetch)
return OpStatus::OK;

quicklist* ql = GetQL(it_res.value()->second);
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter = (dir == ListDir::LEFT) ? quicklistGetIterator(ql, AL_START_HEAD)
: quicklistGetIterator(ql, AL_START_TAIL);
CHECK(quicklistNext(iter, &entry));
quicklistReleaseIterator(iter);

if (entry.value)
return string(reinterpret_cast<char*>(entry.value), entry.sz);
else
return absl::StrCat(entry.longval);
const PrimeValue& pv = it_res.value()->second;
DCHECK_GT(pv.Size(), 0u); // should be not-empty.

if (pv.Encoding() == OBJ_ENCODING_QUICKLIST) {
quicklist* ql = GetQL(it_res.value()->second);
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter =
quicklistGetIterator(ql, (dir == ListDir::LEFT) ? AL_START_HEAD : AL_START_TAIL);

CHECK(quicklistNext(iter, &entry));
quicklistReleaseIterator(iter);

return (entry.value) ? string(reinterpret_cast<char*>(entry.value), entry.sz)
: absl::StrCat(entry.longval);
}

DCHECK_EQ(pv.Encoding(), kEncodingQL2);
QList* ql = GetQLV2(pv);
auto it = ql->GetIterator(ToWhere(dir));
CHECK(it.Next());

return it.Get().to_string();
}

OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
Expand Down Expand Up @@ -319,7 +369,7 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
}
len = quicklistCount(ql);
} else {
QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
QList::Where where = ToWhere(dir);
for (string_view v : vals) {
ql_v2->Push(v, where);
}
Expand Down Expand Up @@ -373,10 +423,11 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, u
res.reserve(count);
}

QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
QList::Where where = ToWhere(dir);
for (unsigned i = 0; i < count; ++i) {
string val = ql->Pop(where);
if (return_results) {
res.push_back(ql->Pop(where));
res.push_back(std::move(val));
}
}
} else {
Expand Down Expand Up @@ -625,10 +676,7 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, string_view ele

auto it = ql->GetIterator(where);
auto is_match = [&](const QList::Entry& entry) {
if (is_int) {
return entry.is_int() && entry.ival() == ival;
}
return entry == elem;
return is_int ? entry.is_int() && entry.ival() == ival : entry == elem;
};

while (it.Next()) {
Expand Down
1 change: 1 addition & 0 deletions src/server/list_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ TEST_F(ListFamilyTest, LMove) {

resp = Run({"lmove", kKey1, kKey2, "LEFT", "RIGHT"});
ASSERT_THAT(resp, "1");
ASSERT_THAT(Run({"llen", kKey1}), IntArg(4));

resp = Run({"lmove", kKey1, kKey2, "LEFT", "LEFT"});
ASSERT_THAT(resp, "2");
Expand Down

0 comments on commit f745f31

Please sign in to comment.