Skip to content

Commit

Permalink
support batching queries with limits > 100
Browse files Browse the repository at this point in the history
  • Loading branch information
autopulated committed Apr 24, 2024
1 parent c22c2da commit 7b7d958
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 4 deletions.
19 changes: 15 additions & 4 deletions lib/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const kConditionGT = Symbol('>');
const kConditionGTE = Symbol('>=');
const kConditionBetween = Symbol('between');
const kConditionBegins = Symbol('begins');
const kBatchGetItemLimit = 100;

const supportedQueryConditions = new Map([
// dynamodm query condition => [internal identifier, number of arguments required]
Expand Down Expand Up @@ -315,7 +316,7 @@ class BaseModel {
let {rawQueryOptions, rawFetchOptions, ...otherOptions} = options ?? {};

// returns an array of models (possibly empty)
const rawQuery = BaseModel.#convertQuery(this, query, Object.assign({startAfter: otherOptions.startAfter, limit: otherOptions.limmit}, rawQueryOptions));
const rawQuery = BaseModel.#convertQuery(this, query, Object.assign({startAfter: otherOptions.startAfter, limit: otherOptions.limit}, rawQueryOptions));
const pending = [];
let errorOccurred = false;
const setErrorOccurred = () => { errorOccurred = true; };
Expand Down Expand Up @@ -348,7 +349,7 @@ class BaseModel {
// options are as queryMany, except Ids are returned, so there are no rawFetchOptions
let {rawQueryOptions, ...otherOptions} = options ?? {};
otherOptions = Object.assign({limit: 50}, otherOptions);
const rawQuery = BaseModel.#convertQuery(this, query, Object.assign({startAfter: otherOptions.startAfter, limit: otherOptions.limmit}, rawQueryOptions));
const rawQuery = BaseModel.#convertQuery(this, query, Object.assign({startAfter: otherOptions.startAfter, limit: otherOptions.limit}, rawQueryOptions));
const results = [];
for await (const batch of BaseModel.#rawQueryIdsBatchIterator(this, rawQuery, otherOptions)) {
results.push(batch);
Expand Down Expand Up @@ -592,8 +593,6 @@ class BaseModel {
}

// get an array of instances of this schema by id
// At most 100 items can be fetched at one time (the limit to the dynamodb BatchGetItem request size)
// TODO: the 100 item limit should be lifted here by splitting into 100-item batches internally.
static async #getByIds(DerivedModel, ids, rawOptions) {
// only the ConsistentRead option is supported
const { ConsistentRead, abortSignal } = rawOptions ?? {};
Expand All @@ -617,6 +616,12 @@ class BaseModel {
let Keys = ids.map(id => ({ [schema.idFieldName]: id }));
const results = new Map();
let retryCount = 0;
let keysExceedingLimit;
// At most kBatchGetItemLimit (100) items can be fetched at one time
// (the limit to the dynamodb BatchGetItem request size), so if
// rawOptions.limit is greater than this, batch the request.
keysExceedingLimit = Keys.slice(kBatchGetItemLimit);
Keys = Keys.slice(0, kBatchGetItemLimit);
while(Keys.length) {
const command = new BatchGetCommand({
RequestItems: {
Expand All @@ -639,6 +644,12 @@ class BaseModel {
retryCount += 1;
await delayMs(table[kTableGetBackoffDelayMs](retryCount));
}
// if there's any room after the unprocessed keys from the
// response, request some of the keys that haven't been requested
// yet as well:
const spaceAvailable = kBatchGetItemLimit - Keys.length;
Keys = Keys.concat(keysExceedingLimit.slice(0, spaceAvailable));
keysExceedingLimit = keysExceedingLimit.slice(spaceAvailable);
}
// return the results by mapping the original ids, so that the results are in the same order
return ids.map(
Expand Down
55 changes: 55 additions & 0 deletions test/queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -681,3 +681,58 @@ t.test('queries:', async t => {

t.end();
});

t.test('largeQueries', async t => {
const table = DynamoDM.Table({ name: 'test-table-largequeries'});
const XSchema = DynamoDM.Schema('x', {
properties: {
id: DynamoDM.DocIdField,
x: {type: 'string'},
n: {type: 'number'},
b: DynamoDM.Binary,
}
}, {index: {
'sortedByN': {
hashKey: 'type',
sortKey: 'n'
},
'sortedByB': {
hashKey: 'x',
sortKey: 'b'
},
} });

const X = table.model(XSchema);
const all_xs = [];

const N = 507;
for (let i = 0; i < N; i++) {
const x = await new X({x:'constant', n: 1000 - i, b: Buffer.from(`b=${i}`), }).save();
all_xs.push(x);
}

t.after(async () => {
await table.deleteTable();
table.destroyConnection();
});

t.test('queryMany', async t => {
t.test('on type index', async t => {
const xs = await X.queryMany({ type: 'x' }, {limit: 1000});
t.equal(xs.length, all_xs.length, 'should return all N of this type');
t.equal(xs[0].constructor, (new X()).constructor, 'should have the correct constructor');
});

t.test('with limit', async t => {
const xs = await X.queryMany({ type: 'x' }, {limit: N-13});
t.equal(xs.length, N-13, 'should return the requested number of items');
});

t.test('sorted result', async t => {
const xs = await X.queryMany({ type: 'x', n: {$gte:0} }, {limit: 1000});
t.equal(xs.length, all_xs.length, 'should return all N of this type');
t.equal(xs.every((x, idx) => (idx === 0 || x.n > xs[idx-1].n)), true, 'should return correctly sorted result');
});
});

});

0 comments on commit 7b7d958

Please sign in to comment.