Skip to content

Commit

Permalink
feat(server) : snapshot traverse physical buckets (#4084)
Browse files Browse the repository at this point in the history
Signed-off-by: adi_holden <[email protected]>
  • Loading branch information
adiholden authored Nov 11, 2024
1 parent 79aa5d4 commit 3ab2446
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 41 deletions.
75 changes: 59 additions & 16 deletions src/core/dash.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,20 +222,25 @@ class DashTable : public detail::DashTableBase {
// Returns: cursor with a random position
Cursor GetRandomCursor(absl::BitGen* bitgen);

// Traverses over a single bucket in table and calls cb(iterator) 0 or more
// Traverses over a single logical bucket in table and calls cb(iterator) 0 or more
// times. if cursor=0 starts traversing from the beginning, otherwise continues from where it
// stopped. returns 0 if the supplied cursor reached end of traversal. Traverse iterates at bucket
// granularity, which means for each non-empty bucket it calls cb per each entry in the bucket
// before returning. Unlike begin/end interface, traverse is stable during table mutations.
// It guarantees that if key exists (1)at the beginning of traversal, (2) stays in the table
// during the traversal, then Traverse() will eventually reach it even when the
// table shrinks or grows.
// Returns: cursor that is guaranteed to be less than 2^40.
// logical granularity, which means for each non-empty bucket it calls cb per each entry in the
// logical bucket before returning. Unlike begin/end interface, traverse is stable during table
// mutations. It guarantees that if key exists (1)at the beginning of traversal, (2) stays in the
// table during the traversal, then Traverse() will eventually reach it even when the table
// shrinks or grows. Returns: cursor that is guaranteed to be less than 2^40.
template <typename Cb> Cursor Traverse(Cursor curs, Cb&& cb);

// Takes an iterator pointing to an entry in a dash bucket and traverses all bucket's entries by
// calling cb(iterator) for every non-empty slot. The iteration goes over a physical bucket.
template <typename Cb> void TraverseBucket(const_iterator it, Cb&& cb);
// Traverses over physical buckets. It calls cb once for each bucket by passing a bucket iterator.
// if cursor=0 starts traversing from the beginning, otherwise continues from where
// it stopped. returns 0 if the supplied cursor reached end of traversal.
// Unlike Traverse, TraverseBuckets calls cb once on bucket iterator and not on each entry in
// bucket. TraverseBuckets is stable during table mutations. It guarantees traversing all buckets
// that existed at the beginning of traversal.
template <typename Cb> Cursor TraverseBuckets(Cursor curs, Cb&& cb);

Cursor AdvanceCursorBucketOrder(Cursor cursor);

// Traverses over a single bucket in table and calls cb(iterator). The traverse order will be
// segment by segment over physical backets.
Expand All @@ -253,6 +258,10 @@ class DashTable : public detail::DashTableBase {
return const_bucket_iterator{this, segment_id, uint8_t(bucket_id)};
}

bucket_iterator BucketIt(unsigned segment_id, unsigned bucket_id) {
return bucket_iterator{this, segment_id, uint8_t(bucket_id)};
}

iterator GetIterator(unsigned segment_id, unsigned bucket_id, unsigned slot_id) {
return iterator{this, segment_id, uint8_t(bucket_id), uint8_t(slot_id)};
}
Expand Down Expand Up @@ -958,14 +967,48 @@ auto DashTable<_Key, _Value, Policy>::Traverse(Cursor curs, Cb&& cb) -> Cursor {
return Cursor{global_depth_, sid, bid};
}

template <typename _Key, typename _Value, typename Policy>
auto DashTable<_Key, _Value, Policy>::AdvanceCursorBucketOrder(Cursor cursor) -> Cursor {
// We fix bid and go over all segments. Once we reach the end we increase bid and repeat.
uint32_t sid = cursor.segment_id(global_depth_);
uint8_t bid = cursor.bucket_id();
sid = NextSeg(sid);
if (sid >= segment_.size()) {
sid = 0;
++bid;

if (bid >= SegmentType::kTotalBuckets)
return 0; // "End of traversal" cursor.
}
return Cursor{global_depth_, sid, bid};
}

template <typename _Key, typename _Value, typename Policy>
template <typename Cb>
void DashTable<_Key, _Value, Policy>::TraverseBucket(const_iterator it, Cb&& cb) {
SegmentType& s = *segment_[it.seg_id_];
const auto& b = s.GetBucket(it.bucket_id_);
b.ForEachSlot([it, cb = std::move(cb), table = this](auto* bucket, uint8_t slot, bool probe) {
cb(iterator{table, it.seg_id_, it.bucket_id_, slot});
});
auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor cursor, Cb&& cb) -> Cursor {
if (cursor.bucket_id() >= SegmentType::kTotalBuckets) // sanity.
return 0;

constexpr uint32_t kMaxIterations = 8;
bool invoked = false;

for (uint32_t i = 0; i < kMaxIterations; ++i) {
uint32_t sid = cursor.segment_id(global_depth_);
uint8_t bid = cursor.bucket_id();
SegmentType* s = segment_[sid];
assert(s);

const auto& bucket = s->GetBucket(bid);
if (bucket.GetBusy()) { // Invoke callback only if bucket has elements.
cb(BucketIt(sid, bid));
invoked = true;
}

cursor = AdvanceCursorBucketOrder(cursor);
if (invoked || !cursor) // Break end of traversal or callback invoked.
return cursor;
}
return cursor;
}

} // namespace dfly
55 changes: 35 additions & 20 deletions src/core/dash_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -566,26 +566,6 @@ TEST_F(DashTest, Traverse) {
EXPECT_EQ(kNumItems - 1, nums.back());
}

TEST_F(DashTest, Bucket) {
constexpr auto kNumItems = 250;
for (size_t i = 0; i < kNumItems; ++i) {
dt_.Insert(i, 0);
}
std::vector<uint64_t> s;
auto it = dt_.begin();
auto bucket_it = Dash64::BucketIt(it);

dt_.TraverseBucket(it, [&](auto i) { s.push_back(i->first); });

unsigned num_items = 0;
while (!bucket_it.is_done()) {
ASSERT_TRUE(find(s.begin(), s.end(), bucket_it->first) != s.end());
++bucket_it;
++num_items;
}
EXPECT_EQ(s.size(), num_items);
}

TEST_F(DashTest, TraverseSegmentOrder) {
constexpr auto kNumItems = 50;
for (size_t i = 0; i < kNumItems; ++i) {
Expand All @@ -610,6 +590,41 @@ TEST_F(DashTest, TraverseSegmentOrder) {
EXPECT_EQ(kNumItems - 1, nums.back());
}

TEST_F(DashTest, TraverseBucketOrder) {
constexpr auto kNumItems = 18000;
for (size_t i = 0; i < kNumItems; ++i) {
dt_.Insert(i, i);
}
for (size_t i = 0; i < kNumItems; ++i) {
dt_.Erase(i);
}
constexpr auto kSparseItems = kNumItems / 50;
for (size_t i = 0; i < kSparseItems; ++i) { // create sparse table
dt_.Insert(i, i);
}

vector<unsigned> nums;
auto tr_cb = [&](Dash64::bucket_iterator it) {
VLOG(1) << "call cb";
while (!it.is_done()) {
nums.push_back(it->first);
VLOG(1) << it.bucket_id() << " " << it.slot_id() << " " << it->first;
++it;
}
};

Dash64::Cursor cursor;
do {
cursor = dt_.TraverseBuckets(cursor, tr_cb);
} while (cursor);

sort(nums.begin(), nums.end());
nums.resize(unique(nums.begin(), nums.end()) - nums.begin());
ASSERT_EQ(kSparseItems, nums.size());
EXPECT_EQ(0, nums[0]);
EXPECT_EQ(kSparseItems - 1, nums.back());
}

struct TestEvictionPolicy {
static constexpr bool can_evict = true;
static constexpr bool can_gc = false;
Expand Down
7 changes: 3 additions & 4 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
return;

PrimeTable::Cursor next =
db_slice_->Traverse(pt, cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this));
pt->TraverseBuckets(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this));
cursor = next;
PushSerialized(false);

Expand Down Expand Up @@ -242,14 +242,13 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
}
}

bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
++stats_.savecb_calls;

auto check = [&](auto v) {
if (v >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id()
<< " at " << v;
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << " at " << v;
++stats_.skipped;
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class SliceSnapshot {
void SwitchIncrementalFb(Context* cntx, LSN lsn);

// Called on traversing cursor by IterateBucketsFb.
bool BucketSaveCb(PrimeIterator it);
bool BucketSaveCb(PrimeTable::bucket_iterator it);

// Serialize single bucket.
// Returns number of serialized entries, updates bucket version to snapshot version.
Expand Down

0 comments on commit 3ab2446

Please sign in to comment.