Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect unreachable nodes [WIP] #509

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5af1a71
resolve conflicts after rebase
alonre24 Jul 18, 2024
ca5bcfb
Move graph data structures to a new file, add new serialization versi…
alonre24 Jul 1, 2024
57ae7f9
tmp adding full benchmark file path for debug
alonre24 Jul 1, 2024
8f94f23
Add the files for the test
alonre24 Jul 2, 2024
71ffd4a
revert file name
alonre24 Jul 2, 2024
393801b
revert file name + format
alonre24 Jul 2, 2024
ef97988
Collect unreachable nodes. WIP - connect them and test (+ fix memory …
alonre24 Jul 2, 2024
913d37b
reinsert elements to graph (wip - deadlock since existing outgoing ed…
alonre24 Jul 10, 2024
dbc6a41
remove neighbors before reinsert, fixes
alonre24 Jul 15, 2024
838e07b
small fixes (WIP)
alonre24 Jul 16, 2024
d273e16
Move to set + fix many bugs + decrease n vectors in long tests
alonre24 Jul 17, 2024
153186b
fix tests and rebase finish
alonre24 Jul 18, 2024
b1e9b8d
Add tests + small fixes and cleanups
alonre24 Jul 19, 2024
bc39e2f
Use alpha (poc)
alonre24 Jul 22, 2024
47247c9
fix alpha
alonre24 Jul 22, 2024
f5272dd
connect unreachable nodes permanent
alonre24 Jul 22, 2024
1db59e0
connect unreachable nodes permanent while resize index
alonre24 Jul 22, 2024
005a958
connect unreachable nodes permanent IN TIERED ONLY before adding / re…
alonre24 Jul 22, 2024
46b7564
bring alpha back
alonre24 Jul 22, 2024
13069dc
fix - clear the right collection
alonre24 Jul 22, 2024
ed8baa6
use different alpha for first reinsertion and later
alonre24 Jul 22, 2024
15b724e
use verbose log
alonre24 Jul 23, 2024
631f771
lock both locks in info to have a coherent picture + fix bug - do not…
alonre24 Jul 27, 2024
a07a876
.
alonre24 Jul 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/VecSim/algorithms/hnsw/graph_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct ElementLevelData {
: incomingUnidirectionalEdges(new (allocator) vecsim_stl::vector<idType>(allocator)),
totalIncomingLinks(0), numLinks(0) {}

/*************************** Getters ********************************/
linkListSize getNumLinks() const { return this->numLinks; }
idType getLinkAtPos(size_t pos) const {
assert(pos < numLinks);
Expand All @@ -38,11 +39,14 @@ struct ElementLevelData {
const vecsim_stl::vector<idType> &getIncomingEdges() const {
return *incomingUnidirectionalEdges;
}
size_t inDegree() const { return totalIncomingLinks; }
std::vector<idType> copyLinks() {
std::vector<idType> links_copy;
links_copy.assign(links, links + numLinks);
return links_copy;
}

/************************************ Setters ****************************/
// Sets the outgoing links of the current element.
// Assumes that the object has the capacity to hold all the links.
void setLinks(vecsim_stl::vector<idType> &links) {
Expand Down
370 changes: 328 additions & 42 deletions src/VecSim/algorithms/hnsw/hnsw.h

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions src/VecSim/algorithms/hnsw/hnsw_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ HNSWIndex<DataType, DistType>::HNSWIndex(std::ifstream &input, const HNSWParams
Serializer::EncodingVersion version)
: VecSimIndexAbstract<DataType, DistType>(abstractInitParams), Serializer(version),
maxElements(RoundUpInitialCapacity(params->initialCapacity, this->blockSize)),
epsilon(params->epsilon), vectorBlocks(this->allocator), graphDataBlocks(this->allocator),
idToMetaData(maxElements, this->allocator),
epsilon(params->epsilon), unreachableNodes(this->allocator), hardUnreachableNodes(this->allocator),
vectorBlocks(this->allocator),
graphDataBlocks(this->allocator), idToMetaData(maxElements, this->allocator),
visitedNodesHandlerPool(1, maxElements, this->allocator) {

this->restoreIndexFields(input);
Expand Down
15 changes: 13 additions & 2 deletions src/VecSim/algorithms/hnsw/hnsw_tiered.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ class TieredHNSWIndex : public VecSimTieredIndex<DataType, DistType> {
TIERED_LOG(VecSimCommonStrings::LOG_VERBOSE_STRING,
"running asynchronous GC for tiered HNSW index");
this->executeReadySwapJobs(this->pendingSwapJobsThreshold);
// Try to reinsert permanent unreachable nodes
this->mainIndexGuard.lock_shared();
this->getHNSWIndex()->template connectUnreachableNodes<true>();
this->mainIndexGuard.unlock_shared();
}
void acquireSharedLocks() override {
this->flatIndexGuard.lock_shared();
Expand Down Expand Up @@ -423,9 +427,13 @@ void TieredHNSWIndex<DataType, DistType>::insertVectorToHNSW(
hnsw_index->lockIndexDataGuard();
// Check if resizing is needed for HNSW index (requires write lock).
if (hnsw_index->indexCapacity() == hnsw_index->indexSize()) {
hnsw_index->unlockIndexDataGuard();
// Try to reinsert permanent unreachable nodes
TIERED_LOG(VecSimCommonStrings::LOG_VERBOSE_STRING,
"Going over permanent unreachable nodes:");
hnsw_index->template connectUnreachableNodes<true>();
// Release the inner HNSW data lock before we re-acquire the global HNSW lock.
this->mainIndexGuard.unlock_shared();
hnsw_index->unlockIndexDataGuard();
this->mainIndexGuard.lock();
hnsw_index->lockIndexDataGuard();

Expand Down Expand Up @@ -472,6 +480,8 @@ void TieredHNSWIndex<DataType, DistType>::insertVectorToHNSW(
if (state.elementMaxLevel > state.currMaxLevel) {
hnsw_index->unlockIndexDataGuard();
}
// Reinsert nodes that became unreachable due to this operation.
hnsw_index->connectUnreachableNodes();
this->mainIndexGuard.unlock_shared();
}
}
Expand Down Expand Up @@ -589,7 +599,8 @@ void TieredHNSWIndex<DataType, DistType>::executeRepairJob(HNSWRepairJob *job) {
this->idToRepairJobsGuard.unlock();

hnsw_index->repairNodeConnections(job->node_id, job->level);

// Reinsert nodes that became unreachable due to this operation.
hnsw_index->connectUnreachableNodes();
this->mainIndexGuard.unlock_shared();
}

Expand Down
12 changes: 5 additions & 7 deletions src/VecSim/utils/vecsim_stl.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,16 @@ class set : public VecsimBaseObject, public std::set<T, std::less<T>, VecsimSTLA
: VecsimBaseObject(alloc), std::set<T, std::less<T>, VecsimSTLAllocator<T>>(alloc) {}
};

template <typename T>
class unordered_set
: public VecsimBaseObject,
public std::unordered_set<T, std::hash<T>, std::equal_to<T>, VecsimSTLAllocator<T>> {
template <typename T, typename hash = std::hash<T>>
class unordered_set : public VecsimBaseObject,
public std::unordered_set<T, hash, std::equal_to<T>, VecsimSTLAllocator<T>> {
public:
explicit unordered_set(const std::shared_ptr<VecSimAllocator> &alloc)
: VecsimBaseObject(alloc),
std::unordered_set<T, std::hash<T>, std::equal_to<T>, VecsimSTLAllocator<T>>(alloc) {}
std::unordered_set<T, hash, std::equal_to<T>, VecsimSTLAllocator<T>>(alloc) {}
explicit unordered_set(size_t n_bucket, const std::shared_ptr<VecSimAllocator> &alloc)
: VecsimBaseObject(alloc),
std::unordered_set<T, std::hash<T>, std::equal_to<T>, VecsimSTLAllocator<T>>(n_bucket,
alloc) {}
std::unordered_set<T, hash, std::equal_to<T>, VecsimSTLAllocator<T>>(n_bucket, alloc) {}
};

} // namespace vecsim_stl
2 changes: 1 addition & 1 deletion src/VecSim/vec_sim_tiered_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ template <typename DataType, typename DistType>
VecSimIndexInfo VecSimTieredIndex<DataType, DistType>::info() const {
VecSimIndexInfo info;
this->flatIndexGuard.lock_shared();
this->mainIndexGuard.lock();
VecSimIndexInfo frontendInfo = this->frontendIndex->info();
this->flatIndexGuard.unlock_shared();

this->mainIndexGuard.lock();
VecSimIndexInfo backendInfo = this->backendIndex->info();
this->mainIndexGuard.unlock();

Expand Down
14 changes: 11 additions & 3 deletions tests/unit/test_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const size_t vecsimAllocationOverhead = VecSimAllocator::getAllocationOverheadSi

const size_t hashTableNodeSize = getLabelsLookupNodeSize();

const size_t unreachableNodeHashTableNodeSize = getUnreachableNodeSize();

class AllocatorTest : public ::testing::Test {};
struct SimpleObject : public VecsimBaseObject {
public:
Expand Down Expand Up @@ -323,8 +325,9 @@ TYPED_TEST(IndexAllocatorTest, testIncomingEdgesSet) {

/* Compute the expected allocation delta:
* 1. empty incoming edges set in every level (+ allocator's header).
* 2. A node in the labels_lookup has table (+ allocator's header). If rehashing occurred, we
* account also for the diff in the buckets size (each bucket has sizeof(size_t) overhead).
* 2. A node in the labels_lookup hash table (+ allocator's header). If rehashing occurred, we
* account also for the diff in the buckets size (each bucket has sizeof(size_t) overhead). Same
* applies for the unreachable nodes unoredered set.
* 3. Account for allocating link lists for levels higher than 0, if exists.
* 4. Finally, expect an allocation of the data buffer in the incoming edges vector of vec1 due
* to the insertion, and the fact that vec1 will re-select its neighbours.
Expand All @@ -333,7 +336,10 @@ TYPED_TEST(IndexAllocatorTest, testIncomingEdgesSet) {
(vec_max_level + 1) * (sizeof(vecsim_stl::vector<idType>) + vecsimAllocationOverhead) +
hashTableNodeSize;
size_t buckets_diff = hnswIndex->labelLookup.bucket_count() - buckets_num_before;
expected_allocation_delta += buckets_diff * sizeof(size_t);
size_t unreachable_nodes_overhead =
hnswIndex->unreachableNodes.bucket_count() * sizeof(graphNodeType) +
vecsimAllocationOverhead;
expected_allocation_delta += buckets_diff * sizeof(size_t) + unreachable_nodes_overhead;
if (vec_max_level > 0) {
expected_allocation_delta +=
hnswIndex->levelDataSize * vec_max_level + vecsimAllocationOverhead;
Expand Down Expand Up @@ -448,6 +454,8 @@ TYPED_TEST(IndexAllocatorTest, test_hnsw_reclaim_memory) {
// All data structures' memory returns to as it was, with the exceptional of the labels_lookup
// (STL unordered_map with hash table implementation), that leaves some empty buckets.
size_t hash_table_memory = hnswIndex->labelLookup.bucket_count() * sizeof(size_t);
// This applies for the unordered ser unreachable nodes as well.
hash_table_memory += hnswIndex->unreachableNodes.bucket_count() * sizeof(size_t);
// Data block vectors do not shrink on resize so extra memory is expected.
size_t block_vectors_memory = sizeof(DataBlock) * (hnswIndex->graphDataBlocks.capacity() +
hnswIndex->vectorBlocks.capacity()) +
Expand Down
10 changes: 6 additions & 4 deletions tests/unit/test_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,15 +539,17 @@ TEST(CommonAPITest, testlogTieredIndex) {
GenerateAndAddVector<float>(tiered_index, 4, 1);
mock_thread_pool.thread_iteration();
tiered_index->deleteVector(1);
ASSERT_EQ(log.logBuffer.size(), 4);
ASSERT_EQ(log.logBuffer.size(), 5);
ASSERT_EQ(log.logBuffer[0],
"verbose: " + log.prefix + "Updating HNSW index capacity from 0 to 1024");
ASSERT_EQ(log.logBuffer[1],
ASSERT_EQ(log.logBuffer[1], "debug: " + log.prefix + "New entry point due to deletion is " +
std::to_string(INVALID_ID));
ASSERT_EQ(log.logBuffer[2],
"verbose: " + log.prefix +
"Tiered HNSW index GC: there are 1 ready swap jobs. Start executing 1 swap jobs");
ASSERT_EQ(log.logBuffer[2],
"verbose: " + log.prefix + "Updating HNSW index capacity from 1024 to 0");
ASSERT_EQ(log.logBuffer[3],
"verbose: " + log.prefix + "Updating HNSW index capacity from 1024 to 0");
ASSERT_EQ(log.logBuffer[4],
"verbose: " + log.prefix + "Tiered HNSW index GC: done executing 1 swap jobs");
}

Expand Down
42 changes: 37 additions & 5 deletions tests/unit/test_hnsw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ TYPED_TEST(HNSWTest, hnsw_bad_params) {
}

TYPED_TEST(HNSWTest, hnsw_delete_entry_point) {
size_t n = 10000;
size_t n = 1000;
size_t dim = 4;
size_t M = 2;

Expand All @@ -931,11 +931,13 @@ TYPED_TEST(HNSWTest, hnsw_delete_entry_point) {

ASSERT_TRUE(index != NULL);

int64_t vec[dim];
for (size_t i = 0; i < dim; i++)
vec[i] = i;
for (size_t j = 0; j < n; j++)
for (size_t j = 0; j < n; j++) {
TEST_DATA_T vec[dim];
for (size_t i = 0; i < dim; i++) {
vec[i] = std::rand() / (TEST_DATA_T)RAND_MAX;
}
VecSimIndex_AddVector(index, vec, j);
}

VecSimIndexInfo info = VecSimIndex_Info(index);

Expand Down Expand Up @@ -2267,3 +2269,33 @@ TYPED_TEST(HNSWTest, FitMemoryTest) {

VecSimIndex_Free(index);
}

TYPED_TEST(HNSWTest, NewNodeIsReachable) {
size_t dim = 4;
size_t n = 100;
HNSWParams params = {.dim = dim, .metric = VecSimMetric_L2, .efConstruction = 10};
VecSimIndex *index = this->CreateNewIndex(params);
auto hnsw_index = this->CastToHNSW_Single(index);

// Add 99 vectors
for (size_t i = 0; i < n; i++) {
GenerateAndAddVector<TEST_DATA_T>(index, dim, i, i);
}
// Mark all vectors as deleted, except from first.
for (size_t i = 1; i < n; i++) {
hnsw_index->markDelete(i);
}
// Insert New node. The scan should not go through deleted candidates (otherwise, the new vector
// would have been unreachable.
EXPECT_EQ(hnsw_index->getEntryPointLabel(), 0);
GenerateAndAddVector<TEST_DATA_T>(index, dim, n, n);

TEST_DATA_T query[] = {(TEST_DATA_T)n, (TEST_DATA_T)n, (TEST_DATA_T)n, (TEST_DATA_T)n};
auto verify_n_reachable = [&](size_t id, double score, size_t index) {
ASSERT_EQ(id, n);
ASSERT_EQ(score, 0);
};
runTopKSearchTest(index, query, 1, verify_n_reachable);

VecSimIndex_Free(index);
}
12 changes: 7 additions & 5 deletions tests/unit/test_hnsw_multi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ TYPED_TEST(HNSWMultiTest, test_query_runtime_params_user_build_args) {
}

TYPED_TEST(HNSWMultiTest, hnsw_delete_entry_point) {
size_t n = 10000;
size_t n = 1000;
size_t per_label = 5;
size_t dim = 4;
size_t M = 2;
Expand All @@ -1239,11 +1239,13 @@ TYPED_TEST(HNSWMultiTest, hnsw_delete_entry_point) {

ASSERT_TRUE(index != NULL);

TEST_DATA_T vec[dim];
for (size_t i = 0; i < dim; i++)
vec[i] = i;
for (size_t j = 0; j < n; j++)
for (size_t j = 0; j < n; j++) {
TEST_DATA_T vec[dim];
for (size_t i = 0; i < dim; i++) {
vec[i] = std::rand() / (TEST_DATA_T)RAND_MAX;
}
VecSimIndex_AddVector(index, vec, j / per_label);
}

VecSimIndexInfo info = VecSimIndex_Info(index);

Expand Down
6 changes: 1 addition & 5 deletions tests/unit/test_hnsw_tiered.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ TYPED_TEST(HNSWTieredIndexTestBasic, deleteFromHNSWMultiLevels) {

TYPED_TEST(HNSWTieredIndexTest, deleteFromHNSWWithRepairJobExec) {
// Create TieredHNSW index instance with a mock queue.
size_t n = 1000;
size_t n = 200;
size_t dim = 4;
bool isMulti = TypeParam::isMulti();

Expand Down Expand Up @@ -2953,10 +2953,6 @@ TYPED_TEST(HNSWTieredIndexTest, switchWriteModes) {
// (the label that we just inserted), and the first result should be this vector.
auto ver_res = [&](size_t label, double score, size_t index) {
if (index == 0) {
if (label != i % n_labels + n_labels && !TypeParam::isMulti()) {
// TODO: remove after we have a mechanism for connecting new elements
return; // this is flaky - ignore for now
}
EXPECT_EQ(label, i % n_labels + n_labels);
EXPECT_DOUBLE_EQ(score, 0);
}
Expand Down
9 changes: 9 additions & 0 deletions tests/unit/test_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,12 @@ size_t getLabelsLookupNodeSize() {
size_t memory_after = allocator->getAllocationSize();
return memory_after - memory_before;
}

size_t getUnreachableNodeSize() {
std::shared_ptr<VecSimAllocator> allocator = VecSimAllocator::newVecsimAllocator();
auto dummy_lookup = vecsim_stl::unordered_set<graphNodeType, hashForPair>(1, allocator);
size_t memory_before = allocator->getAllocationSize();
dummy_lookup.insert({1, 1}); // Insert a dummy {key, value} element pair.
size_t memory_after = allocator->getAllocationSize();
return memory_after - memory_before;
}
2 changes: 2 additions & 0 deletions tests/unit/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ void runRangeQueryTest(VecSimIndex *index, const void *query, double radius,

size_t getLabelsLookupNodeSize();

size_t getUnreachableNodeSize();

inline double GetInfVal(VecSimType type) {
if (type == VecSimType_FLOAT64) {
return exp(500);
Expand Down
Loading