Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support qlist compression when accounting for memory #4233

Merged
merged 1 commit into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 35 additions & 30 deletions src/core/qlist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ inline ssize_t NodeSetEntry(quicklistNode* node, uint8_t* entry) {
bool CompressNode(quicklistNode* node) {
DCHECK(node->encoding == QUICKLIST_NODE_ENCODING_RAW);
DCHECK(!node->dont_compress);
#ifdef SERVER_TEST
node->attempted_compress = 1;
#endif

/* validate that the node is neither
* tail nor head (it has prev and next)*/
Expand Down Expand Up @@ -219,12 +216,13 @@ bool CompressNode(quicklistNode* node) {
return true;
}

bool CompressNodeIfNeeded(quicklistNode* node) {
ssize_t CompressNodeIfNeeded(quicklistNode* node) {
DCHECK(node);
if (node->encoding == QUICKLIST_NODE_ENCODING_RAW && !node->dont_compress) {
return CompressNode(node);
if (CompressNode(node))
return ((quicklistLZF*)node->entry)->sz - node->sz;
}
return false;
return 0;
}

/* Uncompress the listpack in 'node' and update encoding details.
Expand All @@ -247,17 +245,24 @@ bool DecompressNode(bool recompress, quicklistNode* node) {

/* Decompress only compressed nodes.
recompress: if true, the node will be marked for recompression after decompression.
returns by how much the size of the node has increased.
*/
void DecompressNodeIfNeeded(bool recompress, quicklistNode* node) {
ssize_t DecompressNodeIfNeeded(bool recompress, quicklistNode* node) {
if ((node) && (node)->encoding == QUICKLIST_NODE_ENCODING_LZF) {
DecompressNode(recompress, node);
size_t compressed_sz = ((quicklistLZF*)node->entry)->sz;
if (DecompressNode(recompress, node)) {
return node->sz - compressed_sz;
}
}
return 0;
}

void RecompressOnly(quicklistNode* node) {
ssize_t RecompressOnly(quicklistNode* node) {
if (node->recompress && !node->dont_compress) {
CompressNode(node);
if (CompressNode(node))
return ((quicklistLZF*)node->entry)->sz - node->sz;
}
return 0;
}

quicklistNode* SplitNode(quicklistNode* node, int offset, bool after, ssize_t* diff) {
Expand Down Expand Up @@ -564,7 +569,7 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
if (QL_NODE_IS_PLAIN(node) || (at_tail && after) || (at_head && !after)) {
InsertPlainNode(node, elem, insert_opt);
} else {
DecompressNodeIfNeeded(true, node);
malloc_size_ += DecompressNodeIfNeeded(true, node);
ssize_t diff_existing = 0;
quicklistNode* new_node = SplitNode(node, it.offset_, after, &diff_existing);
quicklistNode* entry_node = InsertPlainNode(node, elem, insert_opt);
Expand All @@ -576,32 +581,32 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {

/* Now determine where and how to insert the new element */
if (!full) {
DecompressNodeIfNeeded(true, node);
malloc_size_ += DecompressNodeIfNeeded(true, node);
uint8_t* new_entry = LP_Insert(node->entry, elem, it.zi_, after ? LP_AFTER : LP_BEFORE);
malloc_size_ += NodeSetEntry(node, new_entry);
node->count++;
RecompressOnly(node);
malloc_size_ += RecompressOnly(node);
} else {
bool insert_tail = at_tail && after;
bool insert_head = at_head && !after;
if (insert_tail && avail_next) {
/* If we are: at tail, next has free space, and inserting after:
* - insert entry at head of next node. */
auto* new_node = node->next;
DecompressNodeIfNeeded(true, new_node);
malloc_size_ += DecompressNodeIfNeeded(true, new_node);
malloc_size_ += NodeSetEntry(new_node, LP_Prepend(new_node->entry, elem));
new_node->count++;
RecompressOnly(new_node);
RecompressOnly(node);
malloc_size_ += RecompressOnly(new_node);
malloc_size_ += RecompressOnly(node);
} else if (insert_head && avail_prev) {
/* If we are: at head, previous has free space, and inserting before:
* - insert entry at tail of previous node. */
auto* new_node = node->prev;
DecompressNodeIfNeeded(true, new_node);
malloc_size_ += DecompressNodeIfNeeded(true, new_node);
malloc_size_ += NodeSetEntry(new_node, LP_Append(new_node->entry, elem));
new_node->count++;
RecompressOnly(new_node);
RecompressOnly(node);
malloc_size_ += RecompressOnly(new_node);
malloc_size_ += RecompressOnly(node);
} else if (insert_tail || insert_head) {
/* If we are: full, and our prev/next has no available space, then:
* - create new node and attach to qlist */
Expand All @@ -610,7 +615,7 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
} else {
/* else, node is full we need to split it. */
/* covers both after and !after cases */
DecompressNodeIfNeeded(true, node);
malloc_size_ += DecompressNodeIfNeeded(true, node);
ssize_t diff_existing = 0;
auto* new_node = SplitNode(node, it.offset_, after, &diff_existing);
auto func = after ? LP_Prepend : LP_Append;
Expand Down Expand Up @@ -710,8 +715,8 @@ void QList::Compress(quicklistNode* node) {
int depth = 0;
int in_depth = 0;
while (depth++ < compress_) {
DecompressNodeIfNeeded(false, forward);
DecompressNodeIfNeeded(false, reverse);
malloc_size_ += DecompressNodeIfNeeded(false, forward);
malloc_size_ += DecompressNodeIfNeeded(false, reverse);

if (forward == node || reverse == node)
in_depth = 1;
Expand All @@ -726,11 +731,11 @@ void QList::Compress(quicklistNode* node) {
}

if (!in_depth && node)
CompressNodeIfNeeded(node);
malloc_size_ += CompressNodeIfNeeded(node);

/* At this point, forward and reverse are one node beyond depth */
CompressNodeIfNeeded(forward);
CompressNodeIfNeeded(reverse);
malloc_size_ += CompressNodeIfNeeded(forward);
malloc_size_ += CompressNodeIfNeeded(reverse);
}

/* Attempt to merge listpacks within two nodes on either side of 'center'.
Expand Down Expand Up @@ -801,8 +806,8 @@ quicklistNode* QList::MergeNodes(quicklistNode* center) {
* Returns the input node picked to merge against or NULL if
* merging was not possible. */
quicklistNode* QList::ListpackMerge(quicklistNode* a, quicklistNode* b) {
DecompressNodeIfNeeded(false, a);
DecompressNodeIfNeeded(false, b);
malloc_size_ += DecompressNodeIfNeeded(false, a);
malloc_size_ += DecompressNodeIfNeeded(false, b);
if ((lpMerge(&a->entry, &b->entry))) {
/* We merged listpacks! Now remove the unused quicklistNode. */
quicklistNode *keep = NULL, *nokeep = NULL;
Expand Down Expand Up @@ -1047,14 +1052,14 @@ bool QList::Erase(const long start, unsigned count) {
if (delete_entire_node || QL_NODE_IS_PLAIN(node)) {
DelNode(node);
} else {
DecompressNodeIfNeeded(true, node);
malloc_size_ += DecompressNodeIfNeeded(true, node);
malloc_size_ += NodeSetEntry(node, lpDeleteRange(node->entry, offset, del));
node->count -= del;
count_ -= del;
if (node->count == 0) {
DelNode(node);
} else {
RecompressOnly(node);
malloc_size_ += RecompressOnly(node);
}
}

Expand All @@ -1081,7 +1086,7 @@ bool QList::Iterator::Next() {
int plain = QL_NODE_IS_PLAIN(current_);
if (!zi_) {
/* If !zi, use current index. */
DecompressNodeIfNeeded(true, current_);
const_cast<QList*>(owner_)->malloc_size_ += DecompressNodeIfNeeded(true, current_);
if (ABSL_PREDICT_FALSE(plain))
zi_ = current_->entry;
else
Expand Down
58 changes: 54 additions & 4 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ extern "C" {
#include "base/logging.h"
#include "core/compact_object.h"
#include "core/qlist.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"
#include "server/blocking_controller.h"
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
Expand Down Expand Up @@ -65,6 +67,7 @@ struct ObjInfo {

// for lists - how many nodes do they have.
unsigned num_nodes = 0;
unsigned num_compressed = 0;

enum LockStatus { NONE, S, X } lock_status = NONE;

Expand Down Expand Up @@ -212,11 +215,23 @@ unsigned AddObjHist(PrimeIterator it, ObjHist* hist) {

if (pv.ObjType() == OBJ_LIST) {
IterateList(pv, per_entry_cb, 0, -1);
if (pv.Encoding() == kEncodingQL2) {
const QList* ql = static_cast<QList*>(pv.RObjPtr());
val_len = ql->MallocUsed(true);
}
} else if (pv.ObjType() == OBJ_ZSET) {
IterateSortedSet(pv.GetRobjWrapper(),
[&](ContainerEntry entry, double) { return per_entry_cb(entry); });
if (pv.Encoding() == OBJ_ENCODING_SKIPLIST) {
detail::SortedMap* smap = static_cast<detail::SortedMap*>(pv.RObjPtr());
val_len = smap->MallocSize();
}
} else if (pv.ObjType() == OBJ_SET) {
IterateSet(pv, per_entry_cb);
if (pv.Encoding() == kEncodingStrMap2) {
StringSet* ss = static_cast<StringSet*>(pv.RObjPtr());
val_len = ss->ObjMallocUsed() + ss->SetMallocUsed();
}
} else if (pv.ObjType() == OBJ_HASH) {
if (pv.Encoding() == kEncodingListPack) {
uint8_t intbuf[LP_INTBUF_SIZE];
Expand Down Expand Up @@ -322,6 +337,14 @@ ObjInfo InspectOp(ConnectionContext* cntx, string_view key) {
if (pv.ObjType() == OBJ_LIST && pv.Encoding() == kEncodingQL2) {
const QList* qlist = static_cast<const QList*>(pv.RObjPtr());
oinfo.num_nodes = qlist->node_count();
auto* node = qlist->Head();

while (node) {
if (node->encoding == QUICKLIST_NODE_ENCODING_LZF) {
++oinfo.num_compressed;
}
node = node->next;
}
}

if (pv.IsExternal()) {
Expand Down Expand Up @@ -357,13 +380,31 @@ OpResult<ValueCompressInfo> EstimateCompression(ConnectionContext* cntx, string_
}

// Only strings are supported right now.
if (it->second.ObjType() != OBJ_STRING) {
if (it->second.ObjType() != OBJ_STRING && it->second.ObjType() != OBJ_LIST) {
return OpStatus::WRONG_TYPE;
}
ValueCompressInfo info;

if (it->second.ObjType() == OBJ_LIST) {
if (it->second.Encoding() != kEncodingQL2) {
return OpStatus::WRONG_TYPE;
}

const QList* src = static_cast<const QList*>(it->second.RObjPtr());
info.raw_size = src->MallocUsed(true);
QList qlist(-2, 1);
auto copy_cb = [&](QList::Entry entry) {
qlist.Push(entry.view(), QList::HEAD);
return true;
};
src->Iterate(copy_cb, 0, -1);
info.compressed_size = qlist.MallocUsed(true);
return info;
}

string scratch;
string_view value = it->second.GetSlice(&scratch);

ValueCompressInfo info;
info.raw_size = value.size();
info.compressed_size = info.raw_size;

Expand Down Expand Up @@ -849,7 +890,10 @@ void DebugCmd::Inspect(string_view key, CmdArgList args, facade::SinkReplyBuilde
ShardId sid = Shard(key, ess.size());
VLOG(1) << "DebugCmd::Inspect " << key;

bool check_compression = (args.size() == 1) && ArgS(args, 0) == "COMPRESS";
bool check_compression = false;
if (args.size() == 1) {
check_compression = absl::AsciiStrToUpper(ArgS(args, 0)) == "COMPRESS";
}
string resp;

if (check_compression) {
Expand Down Expand Up @@ -886,7 +930,13 @@ void DebugCmd::Inspect(string_view key, CmdArgList args, facade::SinkReplyBuilde
}

if (res.num_nodes) {
StrAppend(&resp, " ns:", res.num_nodes);
// node count
StrAppend(&resp, " nc:", res.num_nodes);
}

if (res.num_compressed) {
// compressed nodes
StrAppend(&resp, " cn:", res.num_compressed);
}

if (res.lock_status != ObjInfo::NONE) {
Expand Down
Loading