You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have searched issues to ensure it has not already been reported
Description
There appear to be some issues with the current implementation of Dynamic Worker Pool Scaling #429
Starting pools before partitions assigned
If a consumer is not allocated any partitions before Configuration.WorkersCountEvaluationInterval has passed since starting, the worker pool will be started by that timer. If partitions are later allocated to the consumer, the worker pool will be started again without stopping it, leaving it in an invalid state.
Restarting stopped pools
If the worker count changes after partitions have been revoked, the currently stopped worker pool will be restarted. If partitions are then later assigned, the worker pool will also be started twice without stopping, leaving it in an invalid state.
Concurrently starting / stopping a pool
If the worker count changes during a worker pool starting / stopping due to partitions being assigned / revoked (which may take a while as we will wait up to WorkerStopTimeout for in flight messages to be handled) then we may perform concurrent starts / stops of the same pool using non thread safe code, corrupting its state.
Steps to reproduce
Let's say we have a consumer configured with .WithWorkersCount(10). This will set the worker pool to a fixed size of 10 workers, with a workers count evaluation interval of the default 5 minutes.
5 minutes later EvaluateWorkersCountAsync runs. WorkerPool.CurrentWorkersCount is equal to the configured number of workers so it returns without doing anything.
Failure Scenario - Partitions assigned after 5 minutes
If no partitions are assigned to the consumer within 5 minutes of starting, when EvaluateWorkersCountAsync runs, WorkerPool.CurrentWorkersCount is 0, so ChangeWorkersCountAsync runs. This restarts the worker pool, which calls WorkerPool.StartAsync even when no partitions are assigned, creating and starting 10 workers.
If partitions are then later assigned after 5 minutes, ConsumerManager.OnPartitionAssigned runs, calling WorkerPool.StartAsync on a pool that is already started. This creates another 10 workers, without stopping the existing 10 workers.
When debugging / analysing a memory dump of the process, 20 workers exist in ConsumerWorkerPool._workers instead of 10. These workers have the worker ids: 0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9
Expected behavior
Worker pools should not be started unless partitions have been assigned to them. They should not be started multiple times without stopping.
Worker pool scaling should not restart a stopped pool with no assigned partitions.
Worker pool scaling should be thread safe and not restart a pool that is concurrently starting / stopping due to partitions being assigned / revoked.
Actual behavior
Worker pool scaling restarts the pool in a non thread safe manner, regardless of its current state.
KafkaFlow version
3.0.6
The text was updated successfully, but these errors were encountered:
This is tangentially related to #456 - I believe the cooperative-sticky protocol will cause OnPartitionAssigned / OnPartitionRevoked to be called multiple times in a row, causing the worker pool to be started multiple times without stopping, getting it into an invalid state.
This is because with cooperative rebalancing, partition reassignment is no longer a "stop the world" event where all partitions are first revoked from all consumers, then reassigned to consumers. Partitions can be incrementally assigned / revoked one at a time without affecting the other partitions.
No, this bug happens regardless of the Kafka partition rebalancing protocol used.
I was just mentioning that the cooperative-sticky protocol (not currently supported by KafkaFlow) would make this much worse. Fixing this bug would might be related to adding support for cooperative-sticky - logic around locking on worker pool changes and multiple starts of the worker pool.
golanbz
added a commit
to golanbz/kafkaflow
that referenced
this issue
Sep 2, 2024
…world" scenarios (e.g., cooperative sticky). Fixes issue Farfetch#557 and Fixes issue Farfetch#456
* Enabled automatic committing with `confluent auto commit: true` instead of relying solely on manual commits, but only when the consumer strategy is cooperative sticky. (Refer to the open librdkafka issue at confluentinc/librdkafka#4059).
Prerequisites
Description
There appear to be some issues with the current implementation of Dynamic Worker Pool Scaling #429
Starting pools before partitions assigned
If a consumer is not allocated any partitions before
Configuration.WorkersCountEvaluationInterval
has passed since starting, the worker pool will be started by that timer. If partitions are later allocated to the consumer, the worker pool will be started again without stopping it, leaving it in an invalid state.Restarting stopped pools
If the worker count changes after partitions have been revoked, the currently stopped worker pool will be restarted. If partitions are then later assigned, the worker pool will also be started twice without stopping, leaving it in an invalid state.
Concurrently starting / stopping a pool
If the worker count changes during a worker pool starting / stopping due to partitions being assigned / revoked (which may take a while as we will wait up to
WorkerStopTimeout
for in flight messages to be handled) then we may perform concurrent starts / stops of the same pool using non thread safe code, corrupting its state.Steps to reproduce
Let's say we have a consumer configured with
.WithWorkersCount(10)
. This will set the worker pool to a fixed size of 10 workers, with a workers count evaluation interval of the default 5 minutes.Working Scenario - Partitions assigned within 5 minutes
Partition assignment triggers
ConsumerManager.OnPartitionAssigned
which callsWorkerPool.StartAsync
, settingWorkerPool.CurrentWorkersCount
to 10 and creating and starting 10 workers.5 minutes later
EvaluateWorkersCountAsync
runs.WorkerPool.CurrentWorkersCount
is equal to the configured number of workers so it returns without doing anything.Failure Scenario - Partitions assigned after 5 minutes
If no partitions are assigned to the consumer within 5 minutes of starting, when
EvaluateWorkersCountAsync
runs,WorkerPool.CurrentWorkersCount
is 0, soChangeWorkersCountAsync
runs. This restarts the worker pool, which callsWorkerPool.StartAsync
even when no partitions are assigned, creating and starting 10 workers.If partitions are then later assigned after 5 minutes,
ConsumerManager.OnPartitionAssigned
runs, callingWorkerPool.StartAsync
on a pool that is already started. This creates another 10 workers, without stopping the existing 10 workers.When debugging / analysing a memory dump of the process, 20 workers exist in
ConsumerWorkerPool._workers
instead of 10. These workers have the worker ids: 0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9Expected behavior
Worker pools should not be started unless partitions have been assigned to them. They should not be started multiple times without stopping.
Worker pool scaling should not restart a stopped pool with no assigned partitions.
Worker pool scaling should be thread safe and not restart a pool that is concurrently starting / stopping due to partitions being assigned / revoked.
Actual behavior
Worker pool scaling restarts the pool in a non thread safe manner, regardless of its current state.
KafkaFlow version
3.0.6
The text was updated successfully, but these errors were encountered: