Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer.acknowledge() not registered in client #385

Open
fhomam opened this issue May 11, 2024 · 3 comments
Open

Consumer.acknowledge() not registered in client #385

fhomam opened this issue May 11, 2024 · 3 comments

Comments

@fhomam
Copy link

fhomam commented May 11, 2024

After receiving a message, our script makes an API call through axios to process the pulsar message. consumer.receive() and the processing logic runs in a while loop until receive() times out when the queue is empty.

It appears if we run acknowledge() prior to the API call, the client behaves correctly, that is messages are acked and receive does not receive any more messages once all messages are acked.

But running ack after the API call, the desired way to ack the message, it seems as receive() goes in a loop and keeps receiving messages already acked. However if we quit the script and re-run it, the queue is indeed empty and no messages are received, similarly peaking the messages through pulsar-admin for the given topic-partition while the script is running confirm messages are acked.

@BewareMyPower
Copy link
Contributor

BewareMyPower commented May 11, 2024

Could you add a simple example to reproduce? You mean that the following loop does not work?

  while (true) {
    const msg = await consumer.receive();
    process(msg); // process is the API call you mentioned
    consumer.acknowledge(msg);
  }

but the following loop work?

  while (true) {
    const msg = await consumer.receive();
    consumer.acknowledge(msg);
    process(msg); // process is the API call you mentioned
  }

@fhomam
Copy link
Author

fhomam commented May 11, 2024

The order of execution you've provided above is correct, with await calls all around. The API calls are to an LLM service. I noticed the issue doesn't appear when the API returns relatively faster in one script than the other. I just tried this again by setting receiverQueueSize to 1, that didn't seem to solve the issue.

But trying again and setting ackTimeoutMs to 0, from an earlier value of 10000, does seem to resolve the issue and ack works as expected and the while loop terminates as expected with: "Error: Failed to receive message: TimeOut"

axios: 1.6.8
pulsar-client: 1.10.0
pulsar (standalone on docker): 3.2.2
node: v20.9.0

Apologies as a sample code is a bit involved, but I hope the above sheds some more light.

@BewareMyPower
Copy link
Contributor

But trying again and setting ackTimeoutMs to 0, from an earlier value of 10000, does seem to resolve the issue

Interesting. I will take a further look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants