Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Azure Storage SDK to a modern version #629

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config/cdConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ const config = require('painless-config')

const cd_azblob = {
connection: config.get('CRAWLER_AZBLOB_CONNECTION_STRING'),
container: config.get('CRAWLER_AZBLOB_CONTAINER_NAME')
container: config.get('CRAWLER_AZBLOB_CONTAINER_NAME'),
account: config.get('CRAWLER_AZBLOB_ACCOUNT_NAME'),
}

const githubToken = config.get('CRAWLER_GITHUB_TOKEN')
Expand Down Expand Up @@ -110,6 +111,7 @@ module.exports = {
},
azqueue: {
connectionString: cd_azblob.connection,
account: cd_azblob.account,
queueName: config.get('CRAWLER_HARVESTS_QUEUE_NAME') || 'harvests'
},
'cd(azblob)': cd_azblob,
Expand Down
137 changes: 45 additions & 92 deletions ghcrawler/providers/queuing/storageQueue.js
Original file line number Diff line number Diff line change
@@ -1,52 +1,46 @@
// Copyright (c) Microsoft Corporation and others. Made available under the MIT license.
// SPDX-License-Identifier: MIT

const { QueueServiceClient } = require('@azure/storage-queue')
const qlimit = require('qlimit')
const { cloneDeep } = require('lodash')

class StorageQueue {
/**
* @param {QueueServiceClient} client
* @param {string} name
* @param {string} queueName
* @param {object} formatter
* @param {object} options
*/
constructor(client, name, queueName, formatter, options) {
this.client = client
this.name = name
this.queueName = queueName
this.messageFormatter = formatter
this.options = options
this.logger = options.logger
this.queueClient = client.getQueueClient(this.queueName)
}

async subscribe() {
return new Promise((resolve, reject) => {
this.client.createQueueIfNotExists(this.queueName, error => {
if (error) {
return reject(error)
}
this.logger.info(`Subscribed to ${this.queueName} using Queue Storage`)
resolve()
})
})
await this.queueClient.createIfNotExists()
this.logger.info(`Subscribed to ${this.queueName} using Queue Storage`)
}

async unsubscribe() {
return
// No specific unsubscribe logic for Azure Queue Storage
}

async push(requests, option) {
requests = Array.isArray(requests) ? requests : [requests]
return Promise.all(
requests.map(
qlimit(this.options.parallelPush || 1)(request => {
qlimit(this.options.parallelPush || 1)(async request => {
const body = JSON.stringify(request)
return new Promise((resolve, reject) => {
this.client.createMessage(this.queueName, body, option, (error, queueMessageResult) => {
if (error) {
return reject(error)
}
this._log('Queued', request)
resolve(this._buildMessageReceipt(queueMessageResult, request))
})
})
})
)
const queueMessageResult = await this.queueClient.sendMessage(body)
this._log('Queued', request)
return this._buildMessageReceipt(queueMessageResult, request)
}))
)
}

Expand All @@ -56,47 +50,32 @@ class StorageQueue {
}

async pop() {
const msgOptions = { numOfMessages: 1, visibilityTimeout: this.options.visibilityTimeout || 60 * 60 }
return new Promise((resolve, reject) => {
this.client.getMessages(this.queueName, msgOptions, (error, result) => {
if (error) {
return reject(error)
}
const message = result[0]
if (!message) {
this.logger.verbose('No messages to receive')
return resolve(null)
}
if (this.options.maxDequeueCount && message.dequeueCount > this.options.maxDequeueCount) {
this.logger.verbose('maxDequeueCount exceeded')
this.client.deleteMessage(this.queueName, message.messageId, message.popReceipt, error => {
if (error) return reject(error)
resolve(null)
})
} else {
message.body = JSON.parse(message.messageText)
const request = this.messageFormatter(message)
request._message = message
this._log('Popped', message.body)
resolve(request)
}
})
})
const msgOptions = { numberOfMessages: 1, visibilityTimeout: this.options.visibilityTimeout || 60 * 60 }
const response = await this.queueClient.receiveMessages(msgOptions)
const message = response.receivedMessageItems[0]
if (!message) {
this.logger.verbose('No messages to receive')
return null
}
if (this.options.maxDequeueCount && message.dequeueCount > this.options.maxDequeueCount) {
this.logger.verbose('maxDequeueCount exceeded')
await this.queueClient.deleteMessage(message.messageId, message.popReceipt)
return null
} else {
message.body = JSON.parse(message.messageText)
const request = this.messageFormatter(message)
request._message = message
this._log('Popped', message.body)
return request
}
}

async done(request) {
if (!request || !request._message) {
return
}
return new Promise((resolve, reject) => {
this.client.deleteMessage(this.queueName, request._message.messageId, request._message.popReceipt, error => {
if (error) {
return reject(error)
}
this._log('ACKed', request._message.body)
resolve()
})
})
await this.queueClient.deleteMessage(request._message.messageId, request._message.popReceipt)
this._log('ACKed', request._message.body)
}

async defer(request) {
Expand All @@ -110,47 +89,21 @@ class StorageQueue {
await this.updateVisibilityTimeout(request)
}

updateVisibilityTimeout(request, visibilityTimeout = 0) {
return new Promise((resolve, reject) => {
// visibilityTimeout is updated to 0 to unlock/unlease the message
this.client.updateMessage(
this.queueName,
request._message.messageId,
request._message.popReceipt,
visibilityTimeout,
(error, result) => {
if (error) {
return reject(error)
}
this._log('NAKed', request._message.body)
resolve(this._buildMessageReceipt(result, request._message.body))
}
)
async updateVisibilityTimeout(request, visibilityTimeout = 0) {
await this.queueClient.updateMessage(request._message.messageId, request._message.popReceipt, {
visibilityTimeout
})
this._log('NAKed', request._message.body)
}

async flush() {
return new Promise((resolve, reject) => {
this.client.deleteQueue(this.queueName, error => {
if (error) return reject(error)
this.client.createQueueIfNotExists(this.queueName, error => {
if (error) return reject(error)
resolve()
})
})
})
await this.queueClient.clearMessages()
this.logger.info(`Flushed all messages from ${this.queueName}`)
}

async getInfo() {
return new Promise(resolve => {
this.client.getQueueMetadata(this.queueName, (result, error) => {
if (error) {
this.logger.error(error)
resolve(null)
}
resolve({ count: result[0].approximateMessageCount })
})
})
const properties = await this.queueClient.getProperties()
return { count: properties.approximateMessagesCount }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This likely applies in other places as well. In the original code, error handling logged the error. In the new code, it appears that error handling depends on how await handles exceptions instead of using try and catch blocks to log the error in the same way as the original.

Suggested change
const properties = await this.queueClient.getProperties()
return { count: properties.approximateMessagesCount }
try {
const properties = await this.queueClient.getProperties()
return { count: properties.approximateMessagesCount }
} catch (error) {
this.logger.error(error) // Log the error
return null // Handle the error by returning null
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's a good point. There was one more place like this, I've updated it as well.

}

getName() {
Expand Down
25 changes: 21 additions & 4 deletions ghcrawler/providers/queuing/storageQueueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,31 @@
// SPDX-License-Identifier: MIT

const AttenuatedQueue = require('./attenuatedQueue')
const AzureStorage = require('azure-storage')
const { QueueServiceClient } = require('@azure/storage-queue')
const Request = require('../../lib/request')
const StorageQueue = require('./storageQueue')
const { DefaultAzureCredential } = require('@azure/identity')

class StorageQueueManager {
constructor(connectionString) {
const retryOperations = new AzureStorage.ExponentialRetryPolicyFilter()
this.client = AzureStorage.createQueueService(connectionString).withFilter(retryOperations)
constructor(connectionString, options) {
const pipelineOptions = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the use of pipelineOptions. Makes it easier to see the configs related to the queues.

retryOptions: {
maxTries: 3,
retryDelayInMs: 1000,
maxRetryDelayInMs: 120 * 1000,
tryTimeoutInMs: 30000,
retryPolicyType: StorageRetryPolicyType.EXPONENTIAL
}
}
if (connectionString) {
this.client = QueueServiceClient.fromConnectionString(connectionString, pipelineOptions)
} else {
this.client = new QueueServiceClient(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice update here for maintaining the old approach and setting up a more flexible long term approach.

`https://${options.account}.queue.core.windows.net`,
new DefaultAzureCredential(),
pipelineOptions
)
}
}

createQueueClient(name, formatter, options) {
Expand Down
43 changes: 36 additions & 7 deletions ghcrawler/providers/storage/azureBlobFactory.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,44 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT

const AzureStorage = require('azure-storage')
// @ts-check
const { BlobServiceClient, StorageRetryPolicyType } = require('@azure/storage-blob')
const AzureStorageDocStore = require('./storageDocStore')
const { DefaultAzureCredential } = require ('@azure/identity');

/**
* @param {object} options
* @param {string} options.account
* @param {string} options.connection
* @param {string} options.container
* @param {object} options.logger
*/
module.exports = options => {
options.logger.info('creating azure storage store')
const { account, key, connection, container } = options
const retryOperations = new AzureStorage.ExponentialRetryPolicyFilter()
const blobService = connection
? AzureStorage.createBlobService(connection).withFilter(retryOperations)
: AzureStorage.createBlobService(account, key).withFilter(retryOperations)
return new AzureStorageDocStore(blobService, container, options)
const { account, connection, container } = options

const pipelineOptions = {
retryOptions: {
maxTries: 3,
retryDelayInMs: 1000,
maxRetryDelayInMs: 120 * 1000,
tryTimeoutInMs: 30000,
retryPolicyType: StorageRetryPolicyType.EXPONENTIAL
}
}

let blobServiceClient
if (connection) {
options.logger.info('using connection string')
blobServiceClient = BlobServiceClient.fromConnectionString(connection, pipelineOptions)
} else if (account) {
options.logger.info('using default credentials')
blobServiceClient = new BlobServiceClient(`https://${account}.blob.core.windows.net`, new DefaultAzureCredential(), pipelineOptions)
} else {
throw new Error('either connection or account must be provided')
}

const containerClient = blobServiceClient.getContainerClient(container)

return new AzureStorageDocStore(containerClient, options)
}
Loading
Loading