Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close SystemConsumer properly #1674

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ryucc
Copy link
Contributor

@ryucc ryucc commented Jun 22, 2023

Changes

KafkaCheckpointManager is a reused class. On reuse, the SystemConsumer can be left unclosed, and a memory leak.

This is because SystemConsumer is started on each start, but closed when

  1. taskNamesToCheckpoints == null and stopConsumerAfterFirstRead == true, or
  2. stopConsumerAfterFirstRead == false

On the second run, taskNamesToCheckpoints != null, and SystemConsumer is never closed.

There are 2 fixes.

  1. Adding a patch to close SystemConsumer properly
  2. Setting taskNamesToCheckpoints back to null on stop()

We are choosing solution 1., since the stopConsumerAfterFirstRead option expects only 1 read per task(?).

Issue

SAMZA-2785

Calling start() and stop() multiple times on the same KafkaCheckpointManager, while stopConsumerAfterFirstRead == true, causes the SystemConsumer left unclosed. The unclosed SystemConsumer can cause memory leaks in some implementations.

Evidence:

In production logs, SystemConsumer was started 1741 times, but only closed 14 times.

[katxxxx@xxxxxx xxxx]$ grep "Starting the checkpoint SystemConsumer from oldest offset" xxxxxx.log|wc -l
1741
[katxxxxx@xxxxxx xxxx]$ grep "Stopping system consumer" xxxxxxxxxx.log |wc -l
14 

We also have a heap dump of KafkaSystemConsumer taking up 8Gbs of memory.

@mynameborat
Copy link
Contributor

mynameborat commented Jun 22, 2023

I don't think there is any problem with the existing code. For standby containers, the expected behavior is to keep the KafkaCheckpointManager started until the entire container is shutdown which would trigger stop.

On the other hand, for active containers, there is need to only read checkpoint once and hence closes right after.

I'd trace it back to see if there is any violation in how this flag is set and assumptions that this flag is built on.

@ryucc
Copy link
Contributor Author

ryucc commented Jun 22, 2023

I don't think there is any problem with the existing code. For active containers, the expected behavior is to keep the KafkaCheckpointManager started until the entire container is shutdown which would trigger stop.

This PR doesn't change when KafkaCheckpointManager is shutdown. The PR is trying to address that when KafkaCheckpointManager is shutdown, SystemConsumer is left open.

@ryucc
Copy link
Contributor Author

ryucc commented Jun 22, 2023

Somehow the KafkaCheckpointManager is started/stopped multiple times in standalone mode.

@mynameborat
Copy link
Contributor

I don't think there is any problem with the existing code. For active containers, the expected behavior is to keep the KafkaCheckpointManager started until the entire container is shutdown which would trigger stop.

This PR doesn't change when KafkaCheckpointManager is shutdown. The PR is trying to address that when KafkaCheckpointManager is shutdown, SystemConsumer is left open.

I meant SystemConsumer shutdown.

For standalone, the containers are recreated on every rebalance. It is possible that the previous attempts to shutdown the container failed and perhaps, that is being ignored in the application

@ryucc
Copy link
Contributor Author

ryucc commented Jun 22, 2023

Then in this case, new SystemConsumers shouldn't be created on KafkaCheckpointManager.start().

Observing the logs in another way, new SystemConsumers are started, but never used to readCheckpoints. They are also never closed, and we see them accumulating in the heap.

This opens a 3rd solution, prevent starts on consumers already started.

override def start(): Unit = {
     ...
     if(!systemConsumerStarted) {
         systemConsumer.start()
         systemConsumerStarted=true
     }
     ...
  }

@ryucc
Copy link
Contributor Author

ryucc commented Jun 22, 2023

The idempotent protection line for KafkaSystemConsumer is never triggered in my logs, while there are 168 successful starts and 3 stops.

$grep "Attempting to start the consumer for the second" xxxxx.log.2023-06-21 |wc -l
       0
$grep ": Consumer started" seas-cloud-consumer.log.2023-06-21|wc -l
     168
$grep "Stopping Samza kafkaConsumer" seas-cloud-consumer.log.2023-06-21|wc -l
       3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants