-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add QueuerUtil with BatchSend capability.
TODO: [] Add abstraction layer that reads off of the provider core extension [] Allow other queue methods, like fifoqueues, etc., where the dev doesn't need to track much of the implementation details per provider. Good luck!
- Loading branch information
1 parent
9db6b89
commit 1e84ca7
Showing
2 changed files
with
65 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
const { QueuerUtil } = require('./queuer-util'); | ||
module.exports = { | ||
QueuerUtil, | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
const { v4: uuid } = require('uuid'); | ||
const AWS = require('aws-sdk'); | ||
const awsConfigOptions = () => { | ||
const config = {}; | ||
if (process.env.IS_OFFLINE) { | ||
console.log('Running in offline mode'); | ||
} | ||
if (process.env.AWS_ENDPOINT) { | ||
config.endpoint = process.env.AWS_ENDPOINT; | ||
} | ||
return config; | ||
}; | ||
AWS.config.update(awsConfigOptions()); | ||
const sqs = new AWS.SQS(); | ||
|
||
const QueuerUtil = { | ||
batchSend: async (entries = [], queueUrl) => { | ||
console.log( | ||
`Enqueuing ${entries.length} entries on SQS to queue ${queueUrl}` | ||
); | ||
const buffer = []; | ||
const batchSize = 10; | ||
|
||
for (const entry of entries) { | ||
buffer.push({ | ||
Id: uuid(), | ||
MessageBody: JSON.stringify(entry), | ||
}); | ||
// Sends 10, then purges the buffer | ||
if (buffer.length === batchSize) { | ||
console.log('Buffer at 10, sending batch'); | ||
await sqs | ||
.sendMessageBatch({ | ||
Entries: buffer, | ||
QueueUrl: queueUrl, | ||
}) | ||
.promise(); | ||
// Purge the buffer | ||
buffer.splice(0, buffer.length); | ||
} | ||
} | ||
console.log('Buffer at end, sending final batch'); | ||
|
||
// If any remaining entries under 10 are left in the buffer, send and return | ||
if (buffer.length > 0) { | ||
console.log(buffer); | ||
return sqs | ||
.sendMessageBatch({ | ||
Entries: buffer, | ||
QueueUrl: queueUrl, | ||
}) | ||
.promise(); | ||
} | ||
|
||
// If we're exact... just return an empty object for now | ||
|
||
return {}; | ||
}, | ||
}; | ||
|
||
module.exports = { QueuerUtil }; |