-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new command for emptying queus in RabbitMq and MySql #853
Conversation
|
||
$io->success('Successfully cleared message queues.'); | ||
return 0; | ||
} catch (NostoException $e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exception is never thrown, you're throwing RuntimeException
protected function execute(InputInterface $input, OutputInterface $output): int | ||
{ | ||
$io = new SymfonyStyle($input, $output); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing a check if Magento_AsynchronousOperations
is installed, see this example: https://github.com/Nosto/nosto-magento2/blob/develop/Model/Service/Sync/AbstractBulkPublisher.php#L164
protected function configure() | ||
{ | ||
// Define command name. | ||
$this->setName('nosto:clear:queue') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specify that is related to message queue, would be more descriptive.
'nosto_product_sync.update', | ||
'nosto_product_sync.delete' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are constants available, see
public const NOSTO_SYNC_MESSAGE_QUEUE = 'nosto_product_sync.update'; |
and
public const NOSTO_DELETE_MESSAGE_QUEUE = 'nosto_product_sync.delete'; |
* @param string $queueName | ||
* @return bool | ||
*/ | ||
protected function queueExists($channel, string $queueName): bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing $channel
type
. $exception->getMessage() | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
. $exception->getMessage() | |
); | |
. $exception->getMessage()); |
* @param $connection | ||
* @return void | ||
*/ | ||
private function clearQueueMessages(string $topicName, $connection): void |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing types
$select = $connection->select() | ||
->from($queueMessageTable, ['id']) | ||
->where('topic_name = ?', $topicName); | ||
$messageIds = $connection->fetchCol($select); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you wanna go with this approach, you might wanna paginate this. Suppose you have 1 billion messages, wouldn't be nice to load all of them in memory at once.
$magentoOperationTable = $this->resourceConnection->getTableName('magento_operation'); | ||
$magentoBulkTable = $this->resourceConnection->getTableName('magento_bulk'); | ||
|
||
// Get all IDs from "magento_operation" table. | ||
$selectBulkUuids = $connection->select() | ||
->from($magentoOperationTable, ['bulk_uuid']) | ||
->where('topic_name = ?', $topicName); | ||
$bulkUuids = $connection->fetchCol($selectBulkUuids); | ||
|
||
// Delete related records from "magento_bulk" table. | ||
if (!empty($bulkUuids)) { | ||
$connection->delete($magentoBulkTable, ['uuid IN (?)' => $bulkUuids]); | ||
} | ||
|
||
// Delete records from "magento_operation" table. | ||
$connection->delete($magentoOperationTable, ['topic_name = ?' => $topicName]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be better handled by querying for the bulk ID's and using this method to delete them:
https://github.com/magento/magento2/blob/2.4-develop/app/code/Magento/AsynchronousOperations/Model/BulkManagement.php#L237
Also notice that you only want to delete the ones with status = STATUS_TYPE_OPEN
$this->clearRabbitMQQueue($topicName, $io); | ||
$this->clearDBQueues($topicName, $io); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to clear both at the same time, you need to check which one is being used and clear only either RabbitMQ or MySQL
Adding new command