Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue Worker filtered removing messages from all queues #117

Open
andreinocenti opened this issue Feb 25, 2023 · 6 comments
Open

Queue Worker filtered removing messages from all queues #117

andreinocenti opened this issue Feb 25, 2023 · 6 comments
Assignees
Labels
bug Something isn't working
Milestone

Comments

@andreinocenti
Copy link

andreinocenti commented Feb 25, 2023

Hello,

I'm using cake4 and mongoDb as transport

I've the following scenario where I've 2 different messages in 2 different queues

QueueManager::push(ExampleJob::class, ['id' => 1, 'code' => 'oi1'], ['queue' => 'test']);
QueueManager::push(ExampleJob::class, ['id' => 1, 'code' => 'oi1'], ['queue' => 'new-test']);

And when I run the worker by queue, all messages are removed from the database but only the correct queue is called.
If I run the cli below, both jobs will be removed from database, but only the first one will run.
bin/cake queue worker --queue=test

Am I using it wrongly or is this a bug?

Thank you

@markstory markstory added this to the 1.0.0 milestone Feb 26, 2023
@markstory markstory added the bug Something isn't working label Feb 26, 2023
@markstory
Copy link
Member

How would someone go about reproducing the issue you're having? Are you able to reproduce this issue using a transport that isn't mongo db?

@andreinocenti
Copy link
Author

andreinocenti commented Feb 27, 2023

I didn't try it in another transport, only did it with mongodb. But if it helps I can try it with another.
This simple steps can reproduce it (I tried in 2 different enviroments with the same infra)

Push 2 jobs to the queue, both with different 'queue' options as below
QueueManager::push(ExampleJob::class, ['id' => 1, 'code' => 'oi1'], ['queue' => 'test']);
QueueManager::push(ExampleJob::class, ['id' => 1, 'code' => 'oi1'], ['queue' => 'new-test']);

Open any database client (I'm using MongoDB Compass) to check if the jobs were saved on the queue collection.

After that, run the worker as below
bin/cake queue worker --queue=test

After you run the worker, only the "test" job will be executed, but both jobs will be erased from the queue. The "new-test" job will be erased without execution.

@andreinocenti andreinocenti reopened this Feb 27, 2023
@markstory markstory self-assigned this Feb 28, 2023
@amayer5125
Copy link
Contributor

I tried to reproduce this and think there is something fishy going on. I'm using beanstalkd as the backend. It looks like all jobs are being put in the same tube (beanstalk queue).

I created an ExampleCommand that accepts --queue and passes that to the options parameter of QueueManager::push(). When I run the command it looks like both jobs are put on the default queue.

$ galley cake example --queue test                  
Options: {"queue":"test"}
Job Queued
$ galley exec -T beanstalk ash -s < stats.sh
==> enqueue.app.default <==
current-jobs-ready: 1

$ galley cake example --queue new-test      
Options: {"queue":"new-test"}
Job Queued
$ galley exec -T beanstalk ash -s < stats.sh
==> enqueue.app.default <==
current-jobs-ready: 2

When I start a worker it looks like both jobs get pulled off the default queue. One of them is "routed" to an event subscriber. The other is routed to "0" subscribers.

$ galley cake worker --verbose --queue test
2023-03-01 04:09:49 Debug: Change logger from "Cake\Log\Engine\FileLog" to "Cake\Log\Engine\ConsoleLog"
2023-03-01 04:09:49 Debug: Consumption has started
2023-03-01 04:09:49 Debug: [SetRouterPropertiesExtension] Set router processor "enqueue.client.router_processor"
2023-03-01 04:09:49 Debug: Received from enqueue.app.default	{"class":["App\\Job\\ExampleJob","execute"],"args":[{"id":1,"code":"oi1"}],"data":{"id":1,"code":"oi1"},"requeueOptions":{"config":"default","priority":null,"queue":"test"}}
2023-03-01 04:09:49 Info: [client] Processed test -> enqueue.client.router_processor	{"class":["App\\Job\\ExampleJob","execute"],"args":[{"id":1,"code":"oi1"}],"data":{"id":1,"code":"oi1"},"requeueOptions":{"config":"default","priority":null,"queue":"test"}}	ack Routed to "1" event subscribers
2023-03-01 04:09:49 Debug: [SetRouterPropertiesExtension] Set router processor "enqueue.client.router_processor"
2023-03-01 04:09:49 Debug: Received from enqueue.app.default	{"class":["App\\Job\\ExampleJob","execute"],"args":[{"id":1,"code":"oi1"}],"data":{"id":1,"code":"oi1"},"requeueOptions":{"config":"default","priority":null,"queue":"new-test"}}
2023-03-01 04:09:49 Info: [client] Processed new-test -> enqueue.client.router_processor	{"class":["App\\Job\\ExampleJob","execute"],"args":[{"id":1,"code":"oi1"}],"data":{"id":1,"code":"oi1"},"requeueOptions":{"config":"default","priority":null,"queue":"new-test"}}	ack Routed to "0" event subscribers
2023-03-01 04:09:49 Debug: Received from enqueue.app.default	{"class":["App\\Job\\ExampleJob","execute"],"args":[{"id":1,"code":"oi1"}],"data":{"id":1,"code":"oi1"},"requeueOptions":{"config":"default","priority":null,"queue":"test"}}
2023-03-01 04:09:49 Info: ExampleJob received arguments: {"id":1,"code":"oi1"}
2023-03-01 04:09:49 Debug: Message processed sucessfully
2023-03-01 04:09:49 Info: [client] Processed test -> Cake\Queue\Queue\Processor63fed00d9e9aa	{"class":["App\\Job\\ExampleJob","execute"],"args":[{"id":1,"code":"oi1"}],"data":{"id":1,"code":"oi1"},"requeueOptions":{"config":"default","priority":null,"queue":"test"}}	ack

The only way I can seem to get the jobs onto different queues is to add the queue option to the config.

'Queue' => [
    'test' => [
        'url' => 'beanstalk://beanstalk:11300',
        'queue' => 'test',
    ],
    'new-test' => [
        'url' => 'beanstalk://beanstalk:11300',
        'queue' => 'new-test',
    ],
],

Then I pass ['config' => 'test'] and ['config' => 'new-test'] as the options parameter.

$ galley cake example --config test
Options: {"config":"test"}
Job Queued
$ galley cake example --config new-test
Options: {"config":"new-test"}
Job Queued
$ galley exec -T beanstalk ash -s < stats.sh
==> enqueue.app.test <==
current-jobs-ready: 1

==> enqueue.app.new-test <==
current-jobs-ready: 1

Then when I run the queue worker with the --config option it runs as expected. I have to run it twice, once for each queue.

@markstory
Copy link
Member

The only way I can seem to get the jobs onto different queues is to add the queue option to the config.

I think we might have an issue in how messages are being enqueued then. I can take a look.

@markstory
Copy link
Member

I've tracked this down to being related to how QueueManager::push() generates Enqueue Client instances. When a client instance is created. Enqueue\SimpleClient\SimpleClient will use our 'queue' configuration value to set the client.router_topic and client.router_queue configuration values. These values decide the beanstalk tube or filesystem job log name as seen in 8a0b4c7.

While the queue option does get persisted into the message it does not get reflected into the producer configuration which is why both of your messages ended up the enqueue.app.test 'topic'. When a consumer binds to the topic/queue it binds with a Route with the topic name. When a topic has mixed queue messages ones with no matching binding are silently acked (I've not tracked where this happens yet.

I'm starting to think that all the queue options are a poor design that doesn't work well within enqueue. Instead each 'queue' of work would need its own configuration key much like how we handle caching and database connections. While less flexible this lets us provide a functional interface, as we missed this earlier on in the plugin development.

@markstory
Copy link
Member

When a topic has mixed queue messages ones with no matching binding are silently acked (I've not tracked where this happens yet.

The output we're seeing when messages are consumed is coming from the RouteProcessor which matches messages with 'routes'. Unfortunately our current worker command on binds a single route. I think if we were to call bindTopic() for all the possible queues, no messages would be lost. However you wouldn't gain any separation of work that you want from multiple queues.

I think if you're using a transport like beanstalk or files that don't support fanout workloads you need to use separate queue configurations instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants