Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new command for emptying queus in RabbitMq and MySql #853

Merged
merged 11 commits into from
Jun 7, 2024
110 changes: 110 additions & 0 deletions Console/Command/NostoClearQueueCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
<?php

namespace Nosto\Tagging\Console\Command;

use Magento\Framework\Exception\LocalizedException;
use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig;
use Magento\Framework\MessageQueue\QueueRepository;
use RuntimeException;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class NostoClearQueueCommand extends Command
{
/**
* Nosto Product Sync Update label.
*
* @var string
*/
public const NOSTO_UPDATE_SYNC_MESSAGE_QUEUE = 'nosto_product_sync.update';

/**
* Nosto Product Sync Delete label.
*
* @var string
*/
public const NOSTO_DELETE_MESSAGE_QUEUE = 'nosto_product_sync.delete';

/**
* @var ConsumerConfig
*/
private $consumerConfig;

/**
* @var QueueRepository
*/
private $queueRepository;

private array $consumers = [
self::NOSTO_DELETE_MESSAGE_QUEUE,
self::NOSTO_UPDATE_SYNC_MESSAGE_QUEUE,
];

/**
* NostoClearQueueCommand constructor.
*
* @param ConsumerConfig $consumerConfig
* @param QueueRepository $queueRepository
*/
public function __construct(
ConsumerConfig $consumerConfig,
QueueRepository $queueRepository
) {
$this->consumerConfig = $consumerConfig;
$this->queueRepository = $queueRepository;
parent::__construct();
}

/**
* Configure the command and the arguments
*/
protected function configure()
{
$this->setName('nosto:clear:message-queue')
->setDescription('Clear all message queues for Nosto product sync topics.');
parent::configure();
}

/**
* @inheritDoc
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing a check if Magento_AsynchronousOperations is installed, see this example: https://github.com/Nosto/nosto-magento2/blob/develop/Model/Service/Sync/AbstractBulkPublisher.php#L164

try {
foreach ($this->consumers as $queueName) {
$this->clearQueue($io, $queueName);
}
$io->success('Successfully cleared message queues.');
} catch (RuntimeException|LocalizedException $e) {
$io->error('An error occurred while clearing message queues: ' . $e->getMessage());
return 1;
}
return 0;
}

/**
* Clear message queues by consumer name.
*
* @param SymfonyStyle $io
* @param string $consumerName
* @return void
* @throws LocalizedException
*/
private function clearQueue(SymfonyStyle $io, string $consumerName): void
{
$io->writeln(sprintf('Clearing messages from %s', $consumerName));
$io->createProgressBar();
$io->progressStart();
$consumerConfig = $this->consumerConfig->getConsumer($consumerName);
$queue = $this->queueRepository->get($consumerConfig->getConnection(), $consumerConfig->getQueue());
while ($message = $queue->dequeue()) {
$io->progressAdvance(1);
$queue->acknowledge($message);
}
$io->progressFinish();
}
}
8 changes: 8 additions & 0 deletions etc/di.xml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@
<item name="nosto_generate_customer_reference_command" xsi:type="object">
Nosto\Tagging\Console\Command\NostoGenerateCustomerReferenceCommand
</item>
<item name="nosto_clear_queue_command" xsi:type="object">
Nosto\Tagging\Console\Command\NostoClearQueueCommand
</item>
</argument>
</arguments>
</type>
Expand Down Expand Up @@ -234,4 +237,9 @@
<argument name="batchSize" xsi:type="number">500</argument>
</arguments>
</type>
<type name="Nosto\Tagging\Console\Command\NostoClearQueueCommand">
<arguments>
<argument name="amqpConfig" xsi:type="object">Magento\Framework\Amqp\Config</argument>
</arguments>
</type>
</config>
Loading