Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka backend can't unsubscribe from individual channel #65

Open
amacfie opened this issue May 13, 2022 · 3 comments
Open

Kafka backend can't unsubscribe from individual channel #65

amacfie opened this issue May 13, 2022 · 3 comments

Comments

@amacfie
Copy link

amacfie commented May 13, 2022

Currently, unsubscribing from one Kafka channel unsubscribes from all channels. Based on the aiokafka docs I'm guessing we want to do

self._consumer_channels.remove(channel)
self._consumer.subscribe(topics=self._consumer_channels)
@amacfie
Copy link
Author

amacfie commented May 18, 2022

Also, when we call AIOKafkaConsumer we might want to add auto_offset_reset="latest" based on https://aiokafka.readthedocs.io/en/stable/consumer.html#controlling-the-consumer-s-position. Even then, when we change the topics we're subscribed to it's not obvious to me that we won't miss events or process events multiple times.

@tsotnesharvadze
Copy link
Contributor

Why didn't merge this PR?

@tsotnesharvadze
Copy link
Contributor

tsotnesharvadze commented Oct 23, 2023

Currently, unsubscribing from one Kafka channel unsubscribes from all channels. Based on the aiokafka docs I'm guessing we want to do

self._consumer_channels.remove(channel)
self._consumer.subscribe(topics=self._consumer_channels)

When _counsumer_channels will be empty it raise error, so:

    async def unsubscribe(self, channel: str) -> None:
        self._consumer_channels.remove(channel)
        if self._consumer_channels:
            self._consumer.subscribe(topics=list(self._consumer_channels))
        else:
            self._consumer.unsubscribe()

@alex-oleshkevich alex-oleshkevich mentioned this issue Apr 3, 2024
19 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants