Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CommittingProducerSinkStage never completes #1783

Open
ilinandrii opened this issue Oct 3, 2024 · 2 comments
Open

CommittingProducerSinkStage never completes #1783

ilinandrii opened this issue Oct 3, 2024 · 2 comments

Comments

@ilinandrii
Copy link

ilinandrii commented Oct 3, 2024

Akka version:
2.5.32
Alpakka version:
2.0.5
Scala version:
2.12.17
Testcontainers Kafka
1.19.0

Expected Behavior

Future[Done] materialized from Producer.committableSink is completed when CommittingProducerSinkStage finishes because of a producer exception.

Actual Behavior

Future[Done] materialized from Producer.committableSink never completes when CommittingProducerSinkStage finishes because of a producer exception.

Additional information

In my particular case a producer used within CommittingProducerSinkStage failed to register schema, this caused sink and consumer to complete, however isShutdown future on stream's DrainingControl built from sink's Future[Done] and Consumer.Control never completes, therefore shutdown hook I set up based on isShutdown never fired.

Reproducible Test Case

Here's a test.
I emulate producer serialization error with mock serializer.
control.isShutdown.futureValue completes successfully
completion.futureValue runs forever, however stream is dead.
Hence if I build DrainingControl from control and completion it's isShutdown will also never be completed.

Looking into implementation I suppose promise never completes in case of producer.send failure.

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{
  CommitterSettings,
  ConsumerSettings,
  ProducerMessage,
  ProducerSettings,
  Subscriptions
}
import akka.stream.scaladsl.Keep
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.mockito.scalatest.IdiomaticMockito
import org.scalatest.concurrent.{Eventually, Futures, JavaFutures, ScalaFutures}
import org.scalatest.time.SpanSugar
import org.scalatest.{Matchers, WordSpec}
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName

class ProducerCommitterSinkTest extends WordSpec
    with Futures
    with JavaFutures
    with ScalaFutures
    with Matchers
    with IdiomaticMockito
    with Eventually
    with SpanSugar {

  val imageName = "confluentinc/cp-kafka"
  val imageTag  = "7.1.4"
  val kafkaTestContainer =
    new KafkaContainer(DockerImageName.parse(s"$imageName:$imageTag"))

  kafkaTestContainer.withKraft().start()

  val config        = ConfigFactory.load()
  val consumerTopic = "consumer-test-topic"
  val producerTopic = "producer-test-topic"
  val serializer    = mock[StringSerializer]

  implicit val system: ActorSystem = ActorSystem("ProducerCommitterSinkTest", config)

  override implicit def patienceConfig: PatienceConfig =
    PatienceConfig(10.seconds, 1.second)

  "ProducerCommitterSink" should {
    "finish stream" when {
      "produce fails" in {

        val testProducer =
          ProducerSettings(system, new StringSerializer, new StringSerializer)
            .withBootstrapServers(kafkaTestContainer.getBootstrapServers)
            .withCloseTimeout(1.second)
            .createKafkaProducer()

        val producerSettings =
          ProducerSettings(system, new StringSerializer, serializer) // mock serializer
            .withBootstrapServers(kafkaTestContainer.getBootstrapServers)
            .withCloseTimeout(1.second)

        val id = "producer-committer-sink-test"
        val consumerSettings = ConsumerSettings(
          system,
          new StringDeserializer,
          new StringDeserializer
        )
          .withBootstrapServers(kafkaTestContainer.getBootstrapServers)
          .withGroupId(id)
          .withClientId(id)
          .withCloseTimeout(1.second)
          .withStopTimeout(0.seconds)
          .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

        val committerSettings = CommitterSettings(system)

        val (control, completion) =
          Consumer.committableSource(
            consumerSettings,
            Subscriptions.topics(consumerTopic)
          )
            .map { msg =>
              val record = new ProducerRecord[String, String](
                producerTopic,
                s"key: ${msg.record.key()}",
                s"value: ${msg.record.value}"
              )
              ProducerMessage.multi(List(record), msg.committableOffset)
            }
            .toMat(Producer.committableSink(producerSettings, committerSettings))(
              Keep.both
            )
            .run()

        serializer.serialize(*, *, *) throws new RuntimeException("failure in serializer")

        testProducer.send(new ProducerRecord(consumerTopic, "test", "test")).futureValue

        control.isShutdown.futureValue
        completion.futureValue
      }
    }
  }
}
@ilinandrii ilinandrii changed the title ProducerCommitterSink never completes CommittingProducerSinkStage never completes Oct 3, 2024
@ilinandrii
Copy link
Author

Here's test log that shows that consumer and sink actually stopped.

9926 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] ERROR akka.actor.RepointableActorRef Error in stage [akka.kafka.internal.CommittingProducerSinkStage@48ae68c8]: failure in serializer 
java.lang.RuntimeException: failure in serializer
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:903)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
	at akka.kafka.internal.CommittingProducerSinkStageLogic.$anonfun$produce$1(CommittingProducerSinkStage.scala:110)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at akka.kafka.internal.CommittingProducerSinkStageLogic.akka$kafka$internal$CommittingProducerSinkStageLogic$$produce(CommittingProducerSinkStage.scala:109)
	at akka.kafka.internal.CommittingProducerSinkStageLogic$$anon$1.onPush(CommittingProducerSinkStage.scala:223)
	at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:409)
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:485)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:764)
	at akka.actor.Actor.aroundReceive(Actor.scala:539)
	at akka.actor.Actor.aroundReceive$(Actor.scala:537)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
	at akka.actor.ActorCell.invoke(ActorCell.scala:583)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
	at akka.dispatch.Mailbox.run(Mailbox.scala:229)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
9926 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-17] DEBUG o.a.k.c.consumer.KafkaConsumer [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Pausing partitions [consumer-test-topic-0] 
9926 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.k.i.CommittingProducerSinkStage [cb534] CommittingProducerSink stopped 
9926 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.kafka.internal.KafkaConsumerActor received handled message Poll(akka.kafka.internal.KafkaConsumerActor@132090ca,true) from Actor[akka://ProducerCommitterSinkTest/deadLetters] 
9938 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.k.i.CommittingProducerSinkStage [cb534] Producer closed 
9939 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] INFO  a.kafka.internal.SingleSourceLogic [1aae0] Completing 
9940 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.kafka.internal.KafkaConsumerActor received handled message StopFromStage(1aae0) from Actor[akka://ProducerCommitterSinkTest/system/StreamSupervisor-0/$$a#-2087746625] 
9940 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.kafka.internal.KafkaConsumerActor [5db21] Received Stop from StageId [1aae0], stopping 
9947 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-2] DEBUG a.kafka.internal.KafkaConsumerActor stopping 
9949 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-2] DEBUG a.kafka.internal.ConnectionChecker stopped 
9950 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] DEBUG o.a.k.c.consumer.KafkaConsumer [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Pausing partitions [consumer-test-topic-0] 
9950 [kafka-coordinator-heartbeat-thread | producer-committer-sink-test] DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Heartbeat thread has closed 
9950 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] DEBUG o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Executing onLeavePrepare with generation Generation{generationId=1, memberId='producer-committer-sink-test-5ee99cef-34f4-4fb2-b2d3-3909007cdb94', protocol='range'} and memberId producer-committer-sink-test-5ee99cef-34f4-4fb2-b2d3-3909007cdb94 
9950 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] INFO  o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Revoke previously assigned partitions consumer-test-topic-0 
9951 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] INFO  o.a.k.c.c.i.AbstractCoordinator [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Member producer-committer-sink-test-5ee99cef-34f4-4fb2-b2d3-3909007cdb94 sending LeaveGroup request to coordinator localhost:50240 (id: 2147483646 rack: null) due to the consumer is being closed 
9951 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.kafka.internal.SingleSourceLogic [1aae0] Revoked partitions: Set(consumer-test-topic-0). All partitions: Set() 
9952 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Resetting generation due to consumer pro-actively leaving the group 
9958 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] LeaveGroup request returned successfully 
9959 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] DEBUG o.a.k.c.consumer.KafkaConsumer [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Kafka consumer has been closed 
9961 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.kafka.internal.KafkaConsumerActor stopped 
9962 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-4] INFO  akka.actor.RepointableActorRef Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://ProducerCommitterSinkTest/system/StreamSupervisor-0/$$a#-2087746625] to Actor[akka://ProducerCommitterSinkTest/system/kafka-consumer-1#860674792] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ProducerCommitterSinkTest/system/kafka-consumer-1#860674792]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 

@ennru
Copy link
Member

ennru commented Oct 10, 2024

The CommittingProducerSinkStage as seen a large number of improvements since version 2.0.5. Please update to recent versions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants