Skip to content

Commit

Permalink
v3.2.7 (#46)
Browse files Browse the repository at this point in the history
Unified storage
  • Loading branch information
wxingda authored Apr 1, 2021
1 parent 71f83d4 commit 98a121e
Show file tree
Hide file tree
Showing 59 changed files with 4,607 additions and 1,970 deletions.
13 changes: 13 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
release
debug
build
gtest
flatbuffers
flatbuffers-1.11.0
fbs-gen
.vscode
.idea
GPATH
GRTAGS
GTAGS
third_party/faiss
18 changes: 11 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
# GAMMA ENGINE
#--------------------------------------------
project(gamma_engine C CXX)
cmake_minimum_required(VERSION 3.0)
cmake_minimum_required(VERSION 3.17)

list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake/Modules)

option(BUILD_TEST "Build tests" off)
option(BUILD_WITH_GPU "Build gamma with gpu index support" off)
option(BUILD_TOOLS "Build tools" off)
option(TABLE_STR_INT64 "table string int64 address" off)

exec_program(
"sh"
Expand All @@ -28,7 +27,10 @@ set(GAMMA_SEARCH ${CMAKE_CURRENT_SOURCE_DIR}/search)
set(GAMMA_INDEX ${CMAKE_CURRENT_SOURCE_DIR}/index)
set(GAMMA_INDEX_IMPL ${CMAKE_CURRENT_SOURCE_DIR}/index/impl)
set(GAMMA_INDEX_IMPL_HNSWLIB ${CMAKE_CURRENT_SOURCE_DIR}/index/impl/hnswlib)
set(GAMMA_INDEX_IMPL_SSG ${CMAKE_CURRENT_SOURCE_DIR}/index/impl/ssg)
set(GAMMA_VECTOR ${CMAKE_CURRENT_SOURCE_DIR}/vector)
set(STORAGE ${CMAKE_CURRENT_SOURCE_DIR}/storage)
set(STORAGE_COMPRESS ${CMAKE_CURRENT_SOURCE_DIR}/storage/compress)
set(GAMMA_REALTIME ${CMAKE_CURRENT_SOURCE_DIR}/realtime)
set(GAMMA_TABLE ${CMAKE_CURRENT_SOURCE_DIR}/table)
set(GAMMA_C_API ${CMAKE_CURRENT_SOURCE_DIR}/c_api)
Expand Down Expand Up @@ -67,11 +69,6 @@ ELSE (DEFINED ENV{ZFP_HOME})
MESSAGE(STATUS "ZFP home isn't set, so COMPRESS is not supported! ")
ENDIF (DEFINED ENV{ZFP_HOME})

if(TABLE_STR_INT64)
MESSAGE(STATUS "TABLE_STR_INT64 is on")
ADD_DEFINITIONS(-DTABLE_STR_INT64)
endif(TABLE_STR_INT64)

#INCLUDE DIR
include_directories(
${THIRDPARTY}
Expand All @@ -86,7 +83,10 @@ include_directories(
${GAMMA_INDEX}
${GAMMA_INDEX_IMPL}
${GAMMA_INDEX_IMPL_HNSWLIB}
${GAMMA_INDEX_IMPL_SSG}
${GAMMA_VECTOR}
${STORAGE}
${STORAGE_COMPRESS}
${GAMMA_REALTIME}
${GAMMA_C_API}
${GAMMA_IO}
Expand All @@ -108,6 +108,7 @@ set(LIBRARIES
-lcrypto
-lzstd
-lopenblas
-ltbb
)

#ADDITIONAL SOURCE DIR
Expand All @@ -119,8 +120,11 @@ aux_source_directory(${UTIL} DIR_SRCS)
aux_source_directory(${COMMON} DIR_SRCS)
aux_source_directory(${GAMMA_SEARCH} DIR_SRCS)
aux_source_directory(${GAMMA_INDEX} DIR_SRCS)
aux_source_directory(${STORAGE} DIR_SRCS)
aux_source_directory(${STORAGE_COMPRESS} DIR_SRCS)
aux_source_directory(${GAMMA_INDEX_IMPL} DIR_SRCS)
aux_source_directory(${GAMMA_INDEX_IMPL_HNSWLIB} DIR_SRCS)
aux_source_directory(${GAMMA_INDEX_IMPL_SSG} DIR_SRCS)
aux_source_directory(${GAMMA_VECTOR} DIR_SRCS)
aux_source_directory(${GAMMA_REALTIME} DIR_SRCS)
aux_source_directory(${GAMMA_TABLE} DIR_SRCS)
Expand Down
40 changes: 39 additions & 1 deletion c_api/api_data/gamma_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,22 @@ namespace tig_gamma {

int Config::Serialize(char **out, int *out_len) {
flatbuffers::FlatBufferBuilder builder;

std::vector<flatbuffers::Offset<gamma_api::CacheInfo>>
cache_vector(cache_infos_.size());
int i = 0;
for (auto &c : cache_infos_) {
auto cache = gamma_api::CreateCacheInfo(builder,
builder.CreateString(c.field_name),
c.cache_size);
cache_vector[i++] = cache;
}
auto cache_vec = builder.CreateVector(cache_vector);
auto config =
gamma_api::CreateConfig(builder, builder.CreateString(path_),
builder.CreateString(log_dir_));
builder.CreateString(log_dir_),
cache_vec);

builder.Finish(config);
*out_len = builder.GetSize();
*out = (char *)malloc(*out_len * sizeof(char));
Expand All @@ -26,6 +39,16 @@ void Config::Deserialize(const char *data, int len) {

path_ = config_->path()->str();
log_dir_ = config_->log_dir()->str();

size_t cache_num = config_->cache_infos()->size();
cache_infos_.resize(cache_num);
for (size_t i = 0; i < cache_num; ++i) {
auto c = config_->cache_infos()->Get(i);
struct CacheInfo cache_info;
cache_info.field_name = c->field_name()->str();
cache_info.cache_size = c->cache_size();
cache_infos_[i] = cache_info;
}
}

const std::string &Config::Path() {
Expand All @@ -42,4 +65,19 @@ const std::string &Config::LogDir() {

void Config::SetLogDir(std::string &log_dir) { log_dir_ = log_dir; }

void Config::AddCacheInfo(const struct CacheInfo &cache) {
cache_infos_.push_back(cache);
}

void Config::AddCacheInfo(struct CacheInfo &&cache) {
cache_infos_.emplace_back(std::forward<struct CacheInfo>(cache));
}

void Config::AddCacheInfo(std::string name, int cache_size) {
struct CacheInfo c;
c.field_name = name;
c.cache_size = cache_size;
cache_infos_.push_back(c);
}

} // namespace tig_gamma
34 changes: 34 additions & 0 deletions c_api/api_data/gamma_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,31 @@
#include "config_generated.h"
#include "gamma_raw_data.h"


namespace tig_gamma {

struct CacheInfo {
std::string field_name;
int cache_size;
CacheInfo() {}

CacheInfo(const CacheInfo &other) { *this = other; }

CacheInfo &operator=(const CacheInfo &other) {
field_name = other.field_name;
cache_size = other.cache_size;
return *this;
}

CacheInfo(CacheInfo &&other) { *this = std::move(other); }

CacheInfo &operator=(CacheInfo &&other) {
field_name = std::move(other.field_name);
cache_size = other.cache_size;
return *this;
}
};

class Config : public RawData {
public:
Config() { config_ = nullptr; }
Expand All @@ -30,11 +53,22 @@ class Config : public RawData {

void SetLogDir(std::string &log_dir);

void AddCacheInfo(const struct CacheInfo &cache);

void AddCacheInfo(struct CacheInfo &&cache);

void AddCacheInfo(std::string name, int cache_size);

std::vector<CacheInfo> &CacheInfos() { return cache_infos_; }

void ClearCacheInfos() { cache_infos_.resize(0); }

private:
gamma_api::Config *config_;

std::string path_;
std::string log_dir_;
std::vector<CacheInfo> cache_infos_;
};

} // namespace tig_gamma
6 changes: 3 additions & 3 deletions c_api/api_data/gamma_doc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ void Doc::Deserialize(const char *data, int len) {
size_t fields_num = doc_->fields()->size();

if (fields_num != table_field_num + vector_field_num) {
LOG(ERROR) << "Add Doc fields num [" << fields_num
<< "], not equal to table_field_num [" << table_field_num
<< "] + vector_field_num [" << vector_field_num << "]";
LOG(WARNING) << "Add Doc fields num [" << fields_num
<< "], not equal to table_field_num [" << table_field_num
<< "] + vector_field_num [" << vector_field_num << "]";
return;
}

Expand Down
16 changes: 16 additions & 0 deletions c_api/gamma_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,19 @@ int DelDocByQuery(void *engine, const char *request_str, int len) {
static_cast<tig_gamma::GammaEngine *>(engine)->DelDocByQuery(request);
return ret;
}

int SetConfig(void *engine, const char *config_str, int len) {
tig_gamma::Config config;
config.Deserialize(config_str, len);
int ret =
static_cast<tig_gamma::GammaEngine *>(engine)->SetConfig(config);
return ret;
}

int GetConfig(void *engine, char **config_str, int *len) {
tig_gamma::Config config;
int res =
static_cast<tig_gamma::GammaEngine *>(engine)->GetConfig(config);
if (res == 0) { res = config.Serialize(config_str, len); }
return res;
}
16 changes: 16 additions & 0 deletions c_api/gamma_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,22 @@ int Search(void *engine, const char *request_str, int req_len,
*/
int DelDocByQuery(void *engine, const char *request_str, int len);

/** alter all cache size by query
*
* @param engine search engine pointer
* @param cache_str caches' serialized string
* @return 0 successed, 1 failed
*/
int SetConfig(void *engine, const char *config_str, int len);

/** get all cache size by query
*
* @param engine search engine pointer
* @param cache_str caches' serialized string
* @return 0 successed, 1 failed
*/
int GetConfig(void *engine, char **config_str, int *len);

#ifdef __cplusplus
}
#endif
Expand Down
6 changes: 6 additions & 0 deletions idl/fbs/config.fbs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
namespace gamma_api;

table CacheInfo {
field_name:string;
cache_size:int;
}

table Config {
path:string;
log_dir:string;
cache_infos:[CacheInfo];
}

root_type Config;
3 changes: 2 additions & 1 deletion index/impl/gamma_index_ivfpq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,8 @@ int GammaIVFPQIndex::Load(const std::string &index_dir) {
READ1(indexed_vec_count_);
if (indexed_vec_count_ < 0 ||
indexed_vec_count_ > vector_->MetaInfo()->size_) {
LOG(ERROR) << "invalid indexed count=" << indexed_vec_count_;
LOG(ERROR) << "invalid indexed count [" << indexed_vec_count_
<< "] vector size [" << vector_->MetaInfo()->size_ << "]";
return INTERNAL_ERR;
}
// precomputed table not stored. It is cheaper to recompute it
Expand Down
13 changes: 8 additions & 5 deletions index/impl/gpu/gamma_index_ivfpq_gpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,11 @@ int ParseFilters(GammaSearchCondition *condition,
template <class T>
bool IsInRange(Table *table, RangeFilter &range, long docid) {
T value = 0;
table->GetField<T>(docid, range.field, value);
std::string field_value;
int field_id = table->GetAttrIdx(range.field);
table->GetFieldRawValue(docid, field_id, field_value);
memcpy(&value, field_value.c_str(), sizeof(value));

T lower_value, upper_value;
memcpy(&lower_value, range.lower_value.c_str(), range.lower_value.size());
memcpy(&upper_value, range.upper_value.c_str(), range.upper_value.size());
Expand Down Expand Up @@ -718,11 +722,10 @@ bool FilteredByTermFilter(GammaSearchCondition *condition,
auto term = condition->term_filters[i];

std::string field_value;
table::DecompressStr decompress_str;
int len = condition->table->GetFieldString(docid, term.field, field_value,
decompress_str);
int field_id = condition->table->GetAttrIdx(term.field);
condition->table->GetFieldRawValue(docid, field_id, field_value);
vector<string> field_items;
if (len >= 0) field_items = utils::split(field_value, kDelim);
if (field_value.size() >= 0) field_items = utils::split(field_value, kDelim);

bool all_in_field_items;
if (term.is_union == static_cast<int>(FilterOperator::Or))
Expand Down
7 changes: 4 additions & 3 deletions index/retrieval_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
#pragma once

#include <vector>
#include <tbb/concurrent_queue.h>

#include "concurrentqueue/concurrentqueue.h"

// #include "concurrentqueue/concurrentqueue.h"
#include "reflector.h"
#include "utils.h"

Expand Down Expand Up @@ -178,7 +180,6 @@ class ScopeVectors {

size_t Size() { return ptr_.size(); }

private:
std::vector<const uint8_t *> ptr_;
std::vector<bool> deletable_;
};
Expand Down Expand Up @@ -292,7 +293,7 @@ class RetrievalModel {
virtual int Load(const std::string &dir) = 0;

VectorReader *vector_;
moodycamel::ConcurrentQueue<int> updated_vids_;
tbb::concurrent_bounded_queue<int> updated_vids_;
// warining: indexed_count_ is only used by framework, sub-class cann't use it
int indexed_count_;
};
5 changes: 0 additions & 5 deletions io/memory_raw_vector_io.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
#ifdef WITH_ROCKSDB

#ifndef MEMORY_RAW_VECTOR_IO_H_
#define MEMORY_RAW_VECTOR_IO_H_

#pragma once

#include <string>
Expand Down Expand Up @@ -32,6 +29,4 @@ struct MemoryRawVectorIO : public RawVectorIO, public AsyncFlusher {

} // namespace tig_gamma

#endif

#endif // WITH_ROCKSDB
18 changes: 3 additions & 15 deletions io/mmap_raw_vector_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,14 @@ namespace tig_gamma {
int MmapRawVectorIO::Init() { return 0; }

int MmapRawVectorIO::Dump(int start_vid, int end_vid) {
for (int i = start_vid / raw_vector->segment_size_;
i < end_vid / raw_vector->segment_size_; i++) {
int ret = raw_vector->file_mappers_[i]->Sync();
if (ret) return ret;
}
return 0;
}

int MmapRawVectorIO::Load(int vec_num) {
int seg_num = vec_num / raw_vector->segment_size_ + 1;
int offset = vec_num % raw_vector->segment_size_;
for (int i = 1; i < seg_num; ++i) {
int ret = raw_vector->Extend();
if (ret) {
LOG(ERROR) << "load extend error, i=" << i << ", ret=" << ret;
return ret;
}
if (raw_vector->storage_mgr_->Truncate(vec_num)) {
LOG(ERROR) << "truncate gamma db error, vec_num=" << vec_num;
return INTERNAL_ERR;
}
assert(raw_vector->nsegment_ == seg_num);
raw_vector->file_mappers_[seg_num - 1]->SetCurrIdx(offset);
raw_vector->MetaInfo()->size_ = vec_num;
LOG(INFO) << "mmap load success! vec num=" << vec_num;
return 0;
Expand Down
4 changes: 0 additions & 4 deletions io/mmap_raw_vector_io.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#ifndef MMAP_RAW_VECTOR_IO_H_
#define MMAP_RAW_VECTOR_IO_H_

#pragma once

#include <string>
Expand All @@ -24,4 +21,3 @@ struct MmapRawVectorIO : public RawVectorIO {

} // namespace tig_gamma

#endif
Loading

0 comments on commit 98a121e

Please sign in to comment.