Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source per partition does not support backoff "restart" #1386

Open
fguerout opened this issue Jun 25, 2021 · 6 comments
Open

Source per partition does not support backoff "restart" #1386

fguerout opened this issue Jun 25, 2021 · 6 comments

Comments

@fguerout
Copy link

Versions used

Akka version: "2.6.15"
Akka Kafka version: "2.1.0"

Expected Behavior

Using source per partition (Consumer.committablePartitionedSource - https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition) and source restart (RestartSource.onFailuresWithBackoff - https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#restarting-the-stream-with-a-backoff-stage) : source per partition should be restarted in case of stream failure.

Actual Behavior

Using source per partition (Consumer.committablePartitionedSource - https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition) and source restart (RestartSource.onFailuresWithBackoff - https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#restarting-the-stream-with-a-backoff-stage) : in case of stream failure, source per partition is restarted but then completed, and data are not consumed anymore from that partition.

Relevant logs

akka.stream.scaladsl.RestartWithBackoffSource [RestartWithBackoffSource(akka://processor)]                                                                          
        Restarting graph due to failure. stack_trace: 
[...]
        Caused by: java.util.concurrent.TimeoutException: Ask timed out on
[...]
akka.kafka.internal.CommittableSubSourceStageLogic [CommittableSubSourceStageLogic(akka://processor)] 
        [c1832#1] Starting. Partition test-15
akka.kafka.internal.KafkaConsumerActor [akka://[email protected]:2551/system/kafka-consumer-1] 
        [5881c] RequestMessages from topic/partition Set(test-15) already requested by other stage Set(test-15)
akka.kafka.internal.CommittableSubSourceStageLogic [CommittableSubSourceStageLogic(akka://processor)] 
        [c1832#1] Completing. Partition test-15

Reproducible Test Case

Consumer.committablePartitionedSource(
                consumerSettings,
                Subscriptions.topics("test"))
                .mapAsyncUnordered(10, topicToSource ->
                        RestartSource.onFailuresWithBackoff(
                                RestartSettings.create(
                                        Duration.ofSeconds(3),
                                        Duration.ofSeconds(30),
                                        0.2),
                                () -> topicToSource.second()
                                        .mapAsync(1, message ->
                                                askWithStatus(vehicleActor,
                                                        (ActorRef<StatusReply<Done>> replyTo) -> new ProcessMessage("", replyTo),
                                                        Duration.ofSeconds(10),
                                                        context.getSystem().scheduler())
                                                        .thenApply(done -> message.committableOffset())))
                                .runWith(Committer.sink(CommitterSettings.create(context.getSystem().classicSystem())), context.getSystem()))
                .toMat(Sink.ignore(), Consumer::createDrainingControl)
                .run(context.getSystem());

@mwkohout
Copy link

Is there a workaround for this on the current release(2.1.1)?

@fguerout
Copy link
Author

fguerout commented Sep 23, 2021

Hi @mwkohout , you could rely on "retry pattern" (https://doc.akka.io/docs/akka/current/futures.html#retry) in source processing logic.

@mwkohout
Copy link

mwkohout commented Sep 23, 2021

thanks for your idea @fguerout but either my test case is bad or it's got something that prevents it from rereading a message:

here's a testcase that's executed as part of a Junit 5 TestcontainersKafkaTest subclass:


 ActorSystem testSystem = ActorSystem.apply();
    setUpAdminClient();
    String topicName = createTopic();
    String groupId = createGroupId();

    AtomicBoolean fail = new AtomicBoolean(true);

    Done published = Source.from(
        List.of(
            new ProducerRecord<String,String>(topicName,"someKey","someValue")
            //        new ProducerRecord<String,String>(topicName,"someKey2","someValue2")
        )).runWith(Producer.plainSink(producerDefaults()),Materializer.apply(system)).toCompletableFuture().get();

    Source<Pair<TopicPartition, Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed>>, Consumer.Control> source = Consumer.committablePartitionedSource(
        consumerDefaults().withGroupId(groupId).withProperty("auto.offset.reset", "earliest"),
        Subscriptions.topics(topicName));

    TestKit tk = new TestKit(testSystem);
    CompletionStage<Done> processedAndCommittedSource = source.mapAsyncUnordered(3, pair -> {
      Callable<CompletionStage<Done>> running =()-> pair.second().map( m-> {
        if(fail.get()){
          fail.set(false);
          throw new Exception("fail");
        }
            tk.getTestActor().tell(m.record().value(), ActorRef.noSender());
            return m.committableOffset();
          }
          ).runWith(Sink.ignore(), system);

      return Patterns.retry(running,3,Duration.ofSeconds(10),Duration.ofSeconds(40),0.1, system);
    }).runWith(Sink.ignore(), system);

    var processedValue = tk.receiveOne(Duration.ofSeconds(120));

    assertEquals("someValue", processedValue);

Is there an issue with the way I've written my testcase?

@mmatloka
Copy link
Contributor

Hey, quite recently we have tested in our project a few variants of using RestartSource after/inside of the committablePartitionedSource. Our tests result was that it does not work fully correctly in any variant (e.g. if one item processing failed still next stream elements were being processed). The only way to make it work was to wrap the whole Consumer.committablePartitionedSource in RestartSource.

@mwkohout
Copy link

thank you @mmatloka -- that did the trick. I just pulled the whole flow, with the per-partition logic set up inside my mapAsyncUnordered into the restartable source callable. It restarted and reprocessed the messages as I expected.

One question I have is that how are you accessing the control for the flow? Are you using a killswitch instead?
I was using Consumer.Control before(without the restartable source) to start and stop the system in a controlled way and Consumer.Control.isShutdown() as part of the healthcheck for the system(changing an AtomicBoolean to true when the system was shut down via CompletionStage.thenRun(Runnable)).

@mmatloka
Copy link
Contributor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants