You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We currently recreate the processing strategy everytime we assign/revoke a set of partitions. To my knowledge there are two reasons for that:
We want to flush out old messages to preserve ordering guarantees (messages on the same partition should be submitted in-order, and strategy should commit in-order)
StrategyFactory.create_with_partitions gets the currently assigned partitions passed
However, at the same time:
we only provide ordering guarantees per-partition. if a new partition is assigned, we can reuse the processing strategy. if a partition is revoked, we can reuse the processing strategy. the only case where we need to join() is when the same partition is revoked and assigned again to the same consumer
Almost no factory we write actually cares about the assigned partitions.
And additionally, we observe in production rebalancing where partitions get shuffled around between consumers in the following pattern:
new partition assigned n
partition n revoked
new partition assigned n + 1
partition n+1 revoked
...
(see attachment out.tsv for full logs pertaining to INC-402)
The logs show that we are closing the strategy on every partition revocation, i.e. on step 2 and 4. In theory it can also happen on partition assignment. But in the pattern above, it is completely unnecessary to recreate the strategy at all in order to preserve ordering of messages per-partition.
Proposal:
Deprecate create_with_partitions and replace it with a simpler create interface that does not take partition count.
There is one strategy here that uses partition count to determine query concurrency. But could it possibly observe partitions as messages come in, and adjust query concurrency based on that?
Alternatively there can be a mechanism where strategy factories that do define create_with_partitions still enable today's behavior on rebalancing, while most other factories that don't need partition information can use create will receive a nice speed boost during rebalancing.
In on_revoke, do not close the strategy. Instead, add the revoked partitions to a set of "unflushed partitions"
In on_assign, also do not close the strategy, but only create it if it is None. If there is overlap between the "unflushed partitions" and the newly assigned partitions, call join() on the strategy (but do not recreate it) and clear the set "unflushed partitions" (only if join() happened)
Unclear:
Do we need to actually recreate the strategy, or can we just call join() and continue using it?
The text was updated successfully, but these errors were encountered:
We currently recreate the processing strategy everytime we assign/revoke a set of partitions. To my knowledge there are two reasons for that:
StrategyFactory.create_with_partitions
gets the currently assigned partitions passedHowever, at the same time:
we only provide ordering guarantees per-partition. if a new partition is assigned, we can reuse the processing strategy. if a partition is revoked, we can reuse the processing strategy. the only case where we need to join() is when the same partition is revoked and assigned again to the same consumer
Almost no factory we write actually cares about the assigned partitions.
And additionally, we observe in production rebalancing where partitions get shuffled around between consumers in the following pattern:
n
n
revokedn + 1
n+1
revoked(see attachment out.tsv for full logs pertaining to INC-402)
out.tsv.txt
The logs show that we are closing the strategy on every partition revocation, i.e. on step 2 and 4. In theory it can also happen on partition assignment. But in the pattern above, it is completely unnecessary to recreate the strategy at all in order to preserve ordering of messages per-partition.
Proposal:
Deprecate
create_with_partitions
and replace it with a simplercreate
interface that does not take partition count.There is one strategy here that uses partition count to determine query concurrency. But could it possibly observe partitions as messages come in, and adjust query concurrency based on that?
Alternatively there can be a mechanism where strategy factories that do define
create_with_partitions
still enable today's behavior on rebalancing, while most other factories that don't need partition information can usecreate
will receive a nice speed boost during rebalancing.In
on_revoke
, do not close the strategy. Instead, add the revoked partitions to a set of "unflushed partitions"In
on_assign
, also do not close the strategy, but only create it if it isNone
. If there is overlap between the "unflushed partitions" and the newly assigned partitions, calljoin()
on the strategy (but do not recreate it) and clear the set "unflushed partitions" (only ifjoin()
happened)Unclear:
join()
and continue using it?The text was updated successfully, but these errors were encountered: