Skip to content

Commit

Permalink
Support query iterator (#309)
Browse files Browse the repository at this point in the history
* query iterator

Signed-off-by: ruiyi.jiang <[email protected]>

* finish query iterator

Signed-off-by: ruiyi.jiang <[email protected]>

* test more

Signed-off-by: ruiyi.jiang <[email protected]>

* move getQueryIterator to utils/function and add tests for it

Signed-off-by: ruiyi.jiang <[email protected]>

* rename pageSize -> batchSize

Signed-off-by: ruiyi.jiang <[email protected]>

* init search iterator

Signed-off-by: ruiyi.jiang <[email protected]>

* search iterator part1

Signed-off-by: ryjiang <[email protected]>

* stash

Signed-off-by: ryjiang <[email protected]>

* refine

Signed-off-by: ruiyi.jiang <[email protected]>

* refine

Signed-off-by: ryjiang <[email protected]>

* refine

Signed-off-by: ryjiang <[email protected]>

* refine id expression

Signed-off-by: ruiyi.jiang <[email protected]>

* refine

Signed-off-by: ruiyi.jiang <[email protected]>

* refine

Signed-off-by: ryjiang <[email protected]>

* refine

Signed-off-by: ryjiang <[email protected]>

* refine

Signed-off-by: ryjiang <[email protected]>

* init consine

Signed-off-by: ryjiang <[email protected]>

* close debug

Signed-off-by: ryjiang <[email protected]>

---------

Signed-off-by: ruiyi.jiang <[email protected]>
Signed-off-by: ryjiang <[email protected]>
  • Loading branch information
shanghaikid authored Apr 26, 2024
1 parent 329d70e commit fcdff93
Show file tree
Hide file tree
Showing 9 changed files with 1,161 additions and 6 deletions.
5 changes: 5 additions & 0 deletions milvus/const/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ export const DEFAULT_HTTP_ENDPOINT_VERSION = 'v2'; // api version, default v1

export const DEFAULT_POOL_MAX = 10; // default max pool client number
export const DEFAULT_POOL_MIN = 2; // default min pool client number

export const DEFAULT_MIN_INT64 = `-9223372036854775807`; // min int64
export const DEFAULT_MAX_SEARCH_SIZE = 16384; // max query/search size
export const DEFAULT_MAX_L2_DISTANCE = 99999999; // max l2 distance
export const DEFAULT_MIN_COSINE_DISTANCE = -2.0; // min cosine distance
17 changes: 17 additions & 0 deletions milvus/const/milvus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,23 @@ export const DataTypeMap: { [key in keyof typeof DataType]: number } = {
SparseFloatVector: 104,
};

// data type string enum
export enum DataTypeStringEnum {
None = 'None',
Bool = 'Bool',
Int8 = 'Int8',
Int16 = 'Int16',
Int32 = 'Int32',
Int64 = 'Int64',
Float = 'Float',
Double = 'Double',
VarChar = 'VarChar',
Array = 'Array',
JSON = 'JSON',
BinaryVector = 'BinaryVector',
FloatVector = 'FloatVector',
}

// RBAC: operate user role type
export enum OperateUserRoleType {
AddUserToRole = 0,
Expand Down
24 changes: 24 additions & 0 deletions milvus/grpc/Collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import {
parseToKeyValue,
CreateCollectionWithFieldsReq,
CreateCollectionWithSchemaReq,
FieldSchema,
} from '../';

/**
Expand Down Expand Up @@ -1057,4 +1058,27 @@ export class Collection extends Database {

return pkFieldType;
}

/**
* Get the primary field
*/
async getPkField(data: DescribeCollectionReq): Promise<FieldSchema> {
// get collection info
const collectionInfo = await this.describeCollection(data);

// pk field
let pkField: FieldSchema = collectionInfo.schema.fields[0];
// extract key information
for (let i = 0; i < collectionInfo.schema.fields.length; i++) {
const f = collectionInfo.schema.fields[i];

// get pk field info
if (f.is_primary_key) {
pkField = f;
break;
}
}

return pkField;
}
}
270 changes: 270 additions & 0 deletions milvus/grpc/Data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import {
SearchReq,
SearchRes,
SearchSimpleReq,
SearchIteratorReq,
DEFAULT_TOPK,
HybridSearchReq,
promisify,
findKeyValue,
Expand All @@ -53,6 +55,12 @@ import {
CountReq,
CountResult,
DEFAULT_COUNT_QUERY_STRING,
getQueryIteratorExpr,
QueryIteratorReq,
getRangeFromSearchResult,
SearchResultData,
getPKFieldExpr,
DEFAULT_MAX_SEARCH_SIZE,
SparseFloatVector,
sparseRowsToBytes,
getSparseDim,
Expand Down Expand Up @@ -478,6 +486,268 @@ export class Data extends Collection {
};
}

// async searchIterator(data: SearchIteratorReq): Promise<any> {
// // store client
// const client = this;
// // get collection info
// const pkField = await this.getPkField(data);
// // get available count
// const count = await client.count({
// collection_name: data.collection_name,
// expr: data.expr || data.filter || '',
// });
// // make sure limit is not exceed the total count
// const total = data.limit > count.data ? count.data : data.limit;
// // make sure batch size is exceed the total count
// let batchSize = data.batchSize > total ? total : data.batchSize;
// // make sure batch size is not exceed max search size
// batchSize =
// batchSize > DEFAULT_MAX_SEARCH_SIZE ? DEFAULT_MAX_SEARCH_SIZE : batchSize;

// // init expr
// const initExpr = data.expr || data.filter || '';
// // init search params object
// data.params = data.params || {};
// data.limit = batchSize;

// // user range filter set
// const initRadius = Number(data.params.radius) || 0;
// const initRangeFilter = Number(data.params.range_filter) || 0;
// // range params object
// const rangeFilterParams = {
// radius: initRadius,
// rangeFilter: initRangeFilter,
// expr: initExpr,
// };

// // force quite if true, at first, if total is 0, return done
// let done = total === 0;
// // batch result store
// let lastBatchRes: SearchResultData[] = [];

// // build cache
// const cache = await client.search({
// ...data,
// limit: total > DEFAULT_MAX_SEARCH_SIZE ? DEFAULT_MAX_SEARCH_SIZE : total,
// });

// return {
// currentTotal: 0,
// [Symbol.asyncIterator]() {
// return {
// currentTotal: this.currentTotal,
// async next() {
// // check if reach the limit
// if (
// (this.currentTotal >= total && this.currentTotal !== 0) ||
// done
// ) {
// return { done: true, value: lastBatchRes };
// }

// // batch result container
// const batchRes: SearchResultData[] = [];
// const bs =
// this.currentTotal + batchSize > total
// ? total - this.currentTotal
// : batchSize;

// // keep getting search data if not reach the batch size
// while (batchRes.length < bs) {
// // search results container
// let searchResults: SearchResults = {
// status: { error_code: 'SUCCESS', reason: '' },
// results: [],
// };

// // Iterate through the cached data, adding it to the search results container until the batch size is reached.
// if (cache.results.length > 0) {
// while (
// cache.results.length > 0 &&
// searchResults.results.length < bs
// ) {
// searchResults.results.push(cache.results.shift()!);
// }
// } else if (searchResults.results.length < bs) {
// // build search params, overwrite range filter
// if (rangeFilterParams.radius && rangeFilterParams.rangeFilter) {
// data.params = {
// ...data.params,
// radius: rangeFilterParams.radius,
// range_filter:
// rangeFilterParams.rangeFilter,
// };
// }
// // set search expr
// data.expr = rangeFilterParams.expr;

// console.log('search param', data.params, data.expr);

// // iterate search, if no result, double the radius, until we doubled for 5 times
// let newSearchRes = await client.search(data);
// let retry = 0;
// while (newSearchRes.results.length === 0 && retry < 5) {
// newSearchRes = await client.search(data);
// if (searchResults.results.length === 0) {
// const newRadius = rangeFilterParams.radius * 2;

// data.params = {
// ...data.params,
// radius: newRadius,
// };
// }

// retry++;
// }

// // combine search results
// searchResults.results = [
// ...searchResults.results,
// ...newSearchRes.results,
// ];
// }

// console.log('return', searchResults.results);

// // filter result, batchRes should be unique
// const filterResult = searchResults.results.filter(
// r =>
// !lastBatchRes.some(l => l.id === r.id) &&
// !batchRes.some(c => c.id === r.id)
// );

// // fill filter result to batch result, it should not exceed the batch size
// for (let i = 0; i < filterResult.length; i++) {
// if (batchRes.length < bs) {
// batchRes.push(filterResult[i]);
// }
// }

// // get data range about last batch result
// const resultRange = getRangeFromSearchResult(filterResult);

// console.log('result range', resultRange);

// // if no more result, force quite
// if (resultRange.lastDistance === 0) {
// done = true;
// return { done: false, value: batchRes };
// }

// // update next range and expr
// rangeFilterParams.rangeFilter = resultRange.lastDistance;
// rangeFilterParams.radius =
// rangeFilterParams.radius + resultRange.radius;
// rangeFilterParams.expr = getPKFieldExpr({
// pkField,
// value: resultRange.id as string,
// expr: initExpr,
// });

// console.log('last', rangeFilterParams);
// }

// // store last result
// lastBatchRes = batchRes;

// // update current total
// this.currentTotal += batchRes.length;

// // return batch result
// return { done: false, value: batchRes };
// },
// };
// },
// };
// }

/**
* Executes a query and returns an async iterator that allows iterating over the results in batches.
*
* @param {QueryIteratorReq} data - The query iterator request data.
* @returns {Promise<any>} - An async iterator that yields batches of query results.
* @throws {Error} - If an error occurs during the query execution.
*
* @example
* const queryData = {
* collection_name: 'my_collection',
* expr: 'age > 30',
* limit: 100,
* pageSize: 10
* };
*
* const iterator = await queryIterator(queryData);
*
* for await (const batch of iterator) {
* console.log(batch); // Process each batch of query results
* }
*/
async queryIterator(data: QueryIteratorReq): Promise<any> {
// get collection info
const pkField = await this.getPkField(data);
// store client;
const client = this;
// expr
const userExpr = data.expr || data.filter || '';
// get count
const count = await client.count({
collection_name: data.collection_name,
expr: userExpr,
});
// total should be the minimum of total and count
const total = data.limit > count.data ? count.data : data.limit;
const batchSize =
data.batchSize > DEFAULT_MAX_SEARCH_SIZE
? DEFAULT_MAX_SEARCH_SIZE
: data.batchSize;

// local variables
let expr = userExpr;
let lastBatchRes: Record<string, any> = [];
let lastPKId: string | number = '';
let currentBatchSize = batchSize; // Store the current batch size

// return iterator
return {
currentTotal: 0,
[Symbol.asyncIterator]() {
return {
currentTotal: this.currentTotal,
async next() {
// if reach the limit, return done
if (this.currentTotal >= total) {
return { done: true, value: lastBatchRes };
}
// set limit for current batch
data.limit = currentBatchSize; // Use the current batch size

// get current page expr
data.expr = getQueryIteratorExpr({
expr: expr,
pkField,
lastPKId,
});

// search data
const res = await client.query(data);

// get last item of the data
const lastItem = res.data[res.data.length - 1];
// update last pk id
lastPKId = lastItem && lastItem[pkField.name];

// store last batch result
lastBatchRes = res.data;
// update current total
this.currentTotal += lastBatchRes.length;
// Update the current batch size based on remaining data
currentBatchSize = Math.min(batchSize, total - this.currentTotal);
return { done: false, value: lastBatchRes };
},
};
},
};
}
// alias
hybridSearch = this.search;

Expand Down
18 changes: 17 additions & 1 deletion milvus/types/Data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ export interface MutationResult extends resStatusResponse {
}

export interface QueryResults extends resStatusResponse {
data: { [x: string]: any }[];
data: Record<string, any>[];
}

export interface CountResult extends resStatusResponse {
Expand Down Expand Up @@ -284,6 +284,16 @@ export interface SearchReq extends collectionNameReq {
transformers?: OutputTransformers; // provide custom data transformer for specific data type like bf16 or f16 vectors
}

export interface SearchIteratorReq
extends Omit<
SearchSimpleReq,
'data' | 'vectors' | 'offset' | 'limit' | 'topk'
> {
data: number[]; // data to search
batchSize: number;
limit: number;
}

// simplified search api parameter type
export interface SearchSimpleReq extends collectionNameReq {
partition_names?: string[]; // partition names
Expand Down Expand Up @@ -393,6 +403,12 @@ export interface QueryReq extends collectionNameReq {
transformers?: OutputTransformers; // provide custom data transformer for specific data type like bf16 or f16 vectors
}

export interface QueryIteratorReq
extends Omit<QueryReq, 'ids' | 'offset' | 'limit'> {
limit: number;
batchSize: number;
}

export interface GetReq extends collectionNameReq {
ids: string[] | number[]; // primary key values
output_fields?: string[]; // fields to return
Expand Down
Loading

0 comments on commit fcdff93

Please sign in to comment.