-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new command for emptying queus in RabbitMq and MySql #853
Changes from 6 commits
7e03d70
26ee8cc
f733ad9
a9f269a
af54526
378dc83
d2f8f9a
570a839
f23fdcf
f7c2eca
43bc665
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,201 @@ | ||||||||
<?php | ||||||||
|
||||||||
namespace Nosto\Tagging\Console\Command; | ||||||||
|
||||||||
use Exception; | ||||||||
use Magento\Framework\Amqp\Config; | ||||||||
use Magento\Framework\App\ResourceConnection; | ||||||||
use Nosto\NostoException; | ||||||||
use RuntimeException; | ||||||||
use Symfony\Component\Console\Command\Command; | ||||||||
use Symfony\Component\Console\Style\SymfonyStyle; | ||||||||
use Symfony\Component\Console\Input\InputInterface; | ||||||||
use Symfony\Component\Console\Output\OutputInterface; | ||||||||
|
||||||||
class NostoClearQueueCommand extends Command | ||||||||
{ | ||||||||
/** | ||||||||
* Nosto Queues | ||||||||
*/ | ||||||||
private const QUEUE_TOPICS = [ | ||||||||
'nosto_product_sync.update', | ||||||||
'nosto_product_sync.delete' | ||||||||
]; | ||||||||
|
||||||||
/** | ||||||||
* @var Config | ||||||||
*/ | ||||||||
private Config $amqpConfig; | ||||||||
Check warning on line 28 in Console/Command/NostoClearQueueCommand.php GitHub Actions / Phan Analysis
|
||||||||
|
||||||||
/** | ||||||||
* @var ResourceConnection | ||||||||
*/ | ||||||||
private ResourceConnection $resourceConnection; | ||||||||
|
||||||||
public function __construct( | ||||||||
ResourceConnection $resourceConnection, | ||||||||
Config $amqpConfig | ||||||||
) { | ||||||||
$this->resourceConnection = $resourceConnection; | ||||||||
$this->amqpConfig = $amqpConfig; | ||||||||
parent::__construct(); | ||||||||
} | ||||||||
|
||||||||
protected function configure() | ||||||||
{ | ||||||||
// Define command name. | ||||||||
$this->setName('nosto:clear:queue') | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Specify that is related to message queue, would be more descriptive. |
||||||||
->setDescription('Clear all message queues for Nosto product sync topics.'); | ||||||||
parent::configure(); | ||||||||
} | ||||||||
|
||||||||
protected function execute(InputInterface $input, OutputInterface $output): int | ||||||||
{ | ||||||||
$io = new SymfonyStyle($input, $output); | ||||||||
|
||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing a check if |
||||||||
try { | ||||||||
foreach (self::QUEUE_TOPICS as $topicName) { | ||||||||
$this->clearQueue($topicName, $io); | ||||||||
} | ||||||||
|
||||||||
$io->success('Successfully cleared message queues.'); | ||||||||
return 0; | ||||||||
} catch (NostoException $e) { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This exception is never thrown, you're throwing |
||||||||
$io->error('An error occurred while clearing message queues: ' . $e->getMessage()); | ||||||||
return 1; | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Clear MySql and RabbitMq queues by name. | ||||||||
* | ||||||||
* @param string $topicName | ||||||||
* @param SymfonyStyle $io | ||||||||
* @return void | ||||||||
*/ | ||||||||
private function clearQueue(string $topicName, SymfonyStyle $io): void | ||||||||
{ | ||||||||
$this->clearRabbitMQQueue($topicName, $io); | ||||||||
$this->clearDBQueues($topicName, $io); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no need to clear both at the same time, you need to check which one is being used and clear only either RabbitMQ or MySQL |
||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Clear DB. | ||||||||
* | ||||||||
* @param string $topicName | ||||||||
* @param SymfonyStyle $io | ||||||||
* @return void | ||||||||
*/ | ||||||||
private function clearDBQueues(string $topicName, SymfonyStyle $io): void | ||||||||
{ | ||||||||
// Get connection. | ||||||||
$connection = $this->resourceConnection->getConnection(); | ||||||||
|
||||||||
// Start DB transaction. | ||||||||
$connection->beginTransaction(); | ||||||||
try { | ||||||||
// Emptying DB tables. | ||||||||
$this->clearQueueMessages($topicName, $connection); | ||||||||
$this->clearRelatedRecords($topicName, $connection); | ||||||||
$connection->commit(); | ||||||||
} catch (Exception $exception) { | ||||||||
$connection->rollBack(); | ||||||||
$io->error('An error occurred while clearing DB queues for topic ' | ||||||||
. $topicName . ': ' | ||||||||
. $exception->getMessage() | ||||||||
); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
} | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Emptying queue message tables. | ||||||||
* | ||||||||
* @param string $topicName | ||||||||
* @param $connection | ||||||||
* @return void | ||||||||
*/ | ||||||||
private function clearQueueMessages(string $topicName, $connection): void | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing types |
||||||||
{ | ||||||||
$queueMessageTable = $this->resourceConnection->getTableName('queue_message'); | ||||||||
$queueMessageStatusTable = $this->resourceConnection->getTableName('queue_message_status'); | ||||||||
|
||||||||
// Get all IDs from "queue_message" table. | ||||||||
$select = $connection->select() | ||||||||
->from($queueMessageTable, ['id']) | ||||||||
->where('topic_name = ?', $topicName); | ||||||||
$messageIds = $connection->fetchCol($select); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you wanna go with this approach, you might wanna paginate this. Suppose you have 1 billion messages, wouldn't be nice to load all of them in memory at once. |
||||||||
|
||||||||
// Delete related records from "queue_message_status" table. | ||||||||
if (!empty($messageIds)) { | ||||||||
$connection->delete($queueMessageStatusTable, ['message_id IN (?)' => $messageIds]); | ||||||||
} | ||||||||
|
||||||||
// Delete records from "queue_message" table. | ||||||||
$connection->delete($queueMessageTable, ['topic_name = ?' => $topicName]); | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Emptying related tables. | ||||||||
* | ||||||||
* @param string $topicName | ||||||||
* @param $connection | ||||||||
* @return void | ||||||||
*/ | ||||||||
private function clearRelatedRecords(string $topicName, $connection): void | ||||||||
{ | ||||||||
$magentoOperationTable = $this->resourceConnection->getTableName('magento_operation'); | ||||||||
$magentoBulkTable = $this->resourceConnection->getTableName('magento_bulk'); | ||||||||
|
||||||||
// Get all IDs from "magento_operation" table. | ||||||||
$selectBulkUuids = $connection->select() | ||||||||
->from($magentoOperationTable, ['bulk_uuid']) | ||||||||
->where('topic_name = ?', $topicName); | ||||||||
$bulkUuids = $connection->fetchCol($selectBulkUuids); | ||||||||
|
||||||||
// Delete related records from "magento_bulk" table. | ||||||||
if (!empty($bulkUuids)) { | ||||||||
$connection->delete($magentoBulkTable, ['uuid IN (?)' => $bulkUuids]); | ||||||||
} | ||||||||
|
||||||||
// Delete records from "magento_operation" table. | ||||||||
$connection->delete($magentoOperationTable, ['topic_name = ?' => $topicName]); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be better handled by querying for the bulk ID's and using this method to delete them: Also notice that you only want to delete the ones with status = |
||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Clear RabbitMq Queues by name. | ||||||||
* | ||||||||
* @param string $queueName | ||||||||
* @param SymfonyStyle $io | ||||||||
* @return void | ||||||||
*/ | ||||||||
private function clearRabbitMQQueue(string $queueName, SymfonyStyle $io): void | ||||||||
{ | ||||||||
try { | ||||||||
// Get RabbitMq channel. | ||||||||
$channel = $this->amqpConfig->getChannel(); | ||||||||
|
||||||||
// Empty queue if queue exists. | ||||||||
if ($this->queueExists($channel, $queueName)) { | ||||||||
$channel->queue_purge($queueName); | ||||||||
} | ||||||||
} catch (Exception $e) { | ||||||||
// Log the error or handle it as required. | ||||||||
$io->error('An error occurred while clearing RabbitMQ queue ' . $queueName . ': ' . $e->getMessage()); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you use sprintf for concatenation, will keep things more tidy |
||||||||
throw new RuntimeException('Failed to clear RabbitMQ queue: ' . $e->getMessage()); | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Check queue exist. | ||||||||
* | ||||||||
* @param $channel | ||||||||
* @param string $queueName | ||||||||
* @return bool | ||||||||
*/ | ||||||||
protected function queueExists($channel, string $queueName): bool | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing |
||||||||
{ | ||||||||
$queueInfo = $channel->queue_declare($queueName, true); | ||||||||
|
||||||||
return !empty($queueInfo); | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are constants available, see
nosto-magento2/Model/Service/Sync/Upsert/AsyncBulkPublisher.php
Line 44 in f958685
and
nosto-magento2/Model/Service/Sync/Delete/AsyncBulkPublisher.php
Line 43 in f1fee82