Skip to content

Commit

Permalink
chore: Add initial bindings for QList in list_family (#4093)
Browse files Browse the repository at this point in the history
The feature is guarded by list_experimental_v2 flag, which is disabled.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Nov 10, 2024
1 parent 1819e51 commit 75c961e
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 48 deletions.
28 changes: 21 additions & 7 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ extern "C" {
#include "base/pod_array.h"
#include "core/bloom.h"
#include "core/detail/bitpacking.h"
#include "core/qlist.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"

ABSL_RETIRED_FLAG(bool, use_set2, true, "If true use DenseSet for an optimized set data structure");

ABSL_FLAG(bool, experimental_flat_json, false, "If true uses flat json implementation.");

namespace dfly {
Expand Down Expand Up @@ -67,6 +66,19 @@ inline void FreeObjSet(unsigned encoding, void* ptr, MemoryResource* mr) {
}
}

void FreeList(unsigned encoding, void* ptr, MemoryResource* mr) {
switch (encoding) {
case OBJ_ENCODING_QUICKLIST:
quicklistRelease((quicklist*)ptr);
break;
case kEncodingQL2:
CompactObj::DeleteMR<QList>(ptr);
break;
default:
LOG(FATAL) << "Unknown list encoding type";
}
}

size_t MallocUsedSet(unsigned encoding, void* ptr) {
switch (encoding) {
case kEncodingStrMap2: {
Expand Down Expand Up @@ -288,8 +300,9 @@ size_t RobjWrapper::MallocUsed() const {
CHECK_EQ(OBJ_ENCODING_RAW, encoding_);
return InnerObjMallocUsed();
case OBJ_LIST:
DCHECK_EQ(encoding_, OBJ_ENCODING_QUICKLIST);
return QlMAllocSize((quicklist*)inner_obj_);
if (encoding_ == OBJ_ENCODING_QUICKLIST)
return QlMAllocSize((quicklist*)inner_obj_);
return ((QList*)inner_obj_)->MallocUsed();
case OBJ_SET:
return MallocUsedSet(encoding_, inner_obj_);
case OBJ_HASH:
Expand All @@ -312,7 +325,9 @@ size_t RobjWrapper::Size() const {
DCHECK_EQ(OBJ_ENCODING_RAW, encoding_);
return sz_;
case OBJ_LIST:
return quicklistCount((quicklist*)inner_obj_);
if (encoding_ == OBJ_ENCODING_QUICKLIST)
return quicklistCount((quicklist*)inner_obj_);
return ((QList*)inner_obj_)->Size();
case OBJ_ZSET: {
switch (encoding_) {
case OBJ_ENCODING_SKIPLIST: {
Expand Down Expand Up @@ -367,8 +382,7 @@ void RobjWrapper::Free(MemoryResource* mr) {
mr->deallocate(inner_obj_, 0, 8); // we do not keep the allocated size.
break;
case OBJ_LIST:
CHECK_EQ(encoding_, OBJ_ENCODING_QUICKLIST);
quicklistRelease((quicklist*)inner_obj_);
FreeList(encoding_, inner_obj_, mr);
break;
case OBJ_SET:
FreeObjSet(encoding_, inner_obj_, mr);
Expand Down
2 changes: 1 addition & 1 deletion src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
namespace dfly {

constexpr unsigned kEncodingIntSet = 0;
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet
constexpr unsigned kEncodingQL2 = 1;
constexpr unsigned kEncodingListPack = 3;
constexpr unsigned kEncodingJsonCons = 0;
constexpr unsigned kEncodingJsonFlat = 1;
Expand Down
113 changes: 76 additions & 37 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ extern "C" {

#include "base/flags.h"
#include "base/logging.h"
#include "core/qlist.h"
#include "server/blocking_controller.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
Expand Down Expand Up @@ -59,6 +60,8 @@ ABSL_FLAG(int32_t, list_max_listpack_size, -2, "Maximum listpack size, default i
*/

ABSL_FLAG(int32_t, list_compress_depth, 0, "Compress depth of the list. Default is no compression");
ABSL_FLAG(bool, list_experimental_v2, false,
"Compress depth of the list. Default is no compression");

namespace dfly {

Expand All @@ -73,6 +76,10 @@ quicklist* GetQL(const PrimeValue& mv) {
return (quicklist*)mv.RObjPtr();
}

QList* GetQLV2(const PrimeValue& mv) {
return (QList*)mv.RObjPtr();
}

void* listPopSaver(unsigned char* data, size_t sz) {
return new string((char*)data, sz);
}
Expand Down Expand Up @@ -265,26 +272,46 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
res = std::move(*op_res);
}

quicklist* ql = nullptr;
size_t len = 0;
DVLOG(1) << "OpPush " << key << " new_key " << res.is_new;
quicklist* ql = nullptr;
QList* ql_v2 = nullptr;

if (res.is_new) {
ql = quicklistCreate();
quicklistSetOptions(ql, GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
res.it->second.InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, ql);
if (absl::GetFlag(FLAGS_list_experimental_v2)) {
ql_v2 = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
res.it->second.InitRobj(OBJ_LIST, kEncodingQL2, ql_v2);
} else {
ql = quicklistCreate();
quicklistSetOptions(ql, GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
res.it->second.InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, ql);
}
} else {
if (res.it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;
ql = GetQL(res.it->second);
if (res.it->second.Encoding() == kEncodingQL2) {
ql_v2 = GetQLV2(res.it->second);
} else {
ql = GetQL(res.it->second);
}
}

// Left push is LIST_HEAD.
int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;

for (string_view v : vals) {
auto vsds = WrapSds(v);
quicklistPush(ql, vsds, sdslen(vsds), pos);
if (ql) {
// Left push is LIST_HEAD.
int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
for (string_view v : vals) {
auto vsds = WrapSds(v);
quicklistPush(ql, vsds, sdslen(vsds), pos);
}
len = quicklistCount(ql);
} else {
QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
for (string_view v : vals) {
ql_v2->Push(v, where);
}
len = ql_v2->Size();
}

if (res.is_new) {
Expand All @@ -305,7 +332,7 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
RecordJournal(op_args, command, mapped, 2);
}

return quicklistCount(ql);
return len;
}

OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, uint32_t count,
Expand All @@ -320,26 +347,26 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, u

auto it = it_res->it;
quicklist* ql = GetQL(it->second);

auto prev_len = quicklistCount(ql);
StringVec res;
if (quicklistCount(ql) < count) {
count = quicklistCount(ql);
if (prev_len < count) {
count = prev_len;
}
res.reserve(count);

if (return_results) {
for (unsigned i = 0; i < count; ++i) {
res.push_back(ListPop(dir, ql));
}
} else {
for (unsigned i = 0; i < count; ++i) {
ListPop(dir, ql);
res.reserve(count);
}

for (unsigned i = 0; i < count; ++i) {
string val = ListPop(dir, ql);
if (return_results) {
res.push_back(std::move(val));
}
}

it_res->post_updater.Run();

if (quicklistCount(ql) == 0) {
if (count == prev_len) {
CHECK(db_slice.Del(op_args.db_cntx, it));
}

Expand Down Expand Up @@ -418,31 +445,43 @@ OpResult<uint32_t> OpLen(const OpArgs& op_args, std::string_view key) {
if (!res)
return res.status();

quicklist* ql = GetQL(res.value()->second);
if (res.value()->second.Encoding() == kEncodingQL2) {
QList* ql = GetQLV2(res.value()->second);
return ql->Size();
}

quicklist* ql = GetQL(res.value()->second);
return quicklistCount(ql);
}

OpResult<string> OpIndex(const OpArgs& op_args, std::string_view key, long index) {
auto res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST);
if (!res)
return res.status();
quicklist* ql = GetQL(res.value()->second);
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter = quicklistGetIteratorAtIdx(ql, AL_START_TAIL, index);
if (!iter)
return OpStatus::KEY_NOTFOUND;

quicklistNext(iter, &entry);
string str;

if (entry.value) {
str.assign(reinterpret_cast<char*>(entry.value), entry.sz);
if (res.value()->second.Encoding() == kEncodingQL2) {
QList* ql = GetQLV2(res.value()->second);
auto it = ql->GetIterator(index);
if (!it.Next())
return OpStatus::KEY_NOTFOUND;
str = it.Get().to_string();
} else {
str = absl::StrCat(entry.longval);
}
quicklistReleaseIterator(iter);
quicklist* ql = GetQL(res.value()->second);
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter = quicklistGetIteratorAtIdx(ql, AL_START_TAIL, index);
if (!iter)
return OpStatus::KEY_NOTFOUND;

quicklistNext(iter, &entry);

if (entry.value) {
str.assign(reinterpret_cast<char*>(entry.value), entry.sz);
} else {
str = absl::StrCat(entry.longval);
}
quicklistReleaseIterator(iter);
}
return str;
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
case OBJ_SET:
if (compact_enc == kEncodingIntSet)
return RDB_TYPE_SET_INTSET;
else if (compact_enc == kEncodingStrMap || compact_enc == kEncodingStrMap2) {
else if (compact_enc == kEncodingStrMap2) {
if (((StringSet*)pv.RObjPtr())->ExpirationUsed())
return RDB_TYPE_SET_WITH_EXPIRY;
else
Expand Down
2 changes: 0 additions & 2 deletions src/server/set_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ extern "C" {
#include "server/journal/journal.h"
#include "server/transaction.h"

ABSL_DECLARE_FLAG(bool, use_set2);

namespace dfly {

using namespace facade;
Expand Down

0 comments on commit 75c961e

Please sign in to comment.