Skip to content
This repository has been archived by the owner on Jun 17, 2024. It is now read-only.
This repository has been archived by the owner on Jun 17, 2024. It is now read-only.

Can't increase the in-flight messages count for persistent subscriptions #149

Open
chiller opened this issue Jul 30, 2019 · 1 comment
Open

Comments

@chiller
Copy link

chiller commented Jul 30, 2019

I have a persistent subscription and a scala consumer. My issue is that no matter what settings I set in the subscription or the client application.conf I can't seem to get more than 10 in-flight messages.

Here is my code:

import java.util.concurrent.Executors
import _root_.akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import eventstore.ResolvedEvent
import eventstore.akka.PersistentSubscriptionActor.ManualAck
import eventstore.akka.{EventStoreExtension, LiveProcessingStarted, PersistentSubscriptionActor}
import eventstore.core.EventStream

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.sys.process._

trait Globals {
  implicit val system = ActorSystem()
  implicit val ec =  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(30))
}

object PersistentSubscriptionExample extends App with Globals{

  val actor = system.actorOf(Props[CountPersistentStream])
  val extension = EventStoreExtension(system)

  val sub = system.actorOf(
    PersistentSubscriptionActor.props(
      connection = extension.actor,
      client = actor,
      streamId = EventStream.Id("$ce-test"),
      groupName = "test1",
      credentials = None,
      settings = extension.settings,
      autoAck = false
    ))
}

class CountPersistentStream extends Actor with ActorLogging with Globals {
  context.setReceiveTimeout(1.second)

  def receive: Receive = {
    case e: ResolvedEvent =>
      val currentSender = sender()
      Future {
        log.info(s"${e.streamId.toString}")
        Thread.sleep(1000)
        currentSender ! ManualAck(e.linkEvent.data.eventId)
      }
    case LiveProcessingStarted => log.info("live processing started")
  }
}

I can see that only 10 threads do work at any given time by looking at visualvm. Now if I set autoack=true, then all 30 threads will do work.
If I use the http api curl 'localhost:2113/subscriptions/%24ce-test/test1/15?embed=TryHarder' -H "Accept: application/vnd.eventstore.competingatom+json" then I do get 15 in-flight.
Am I missing some configuration?

@chiller chiller changed the title Can't seem to be able to increase the in-flight messages for persistent subscriptions Can't increase the in-flight messages for persistent subscriptions Jul 31, 2019
@chiller chiller changed the title Can't increase the in-flight messages for persistent subscriptions Can't increase the in-flight messages count for persistent subscriptions Jul 31, 2019
@amondnet
Copy link

@chiller If you does not change default setting of persistent subscription, Max Subscriber Count is 10.

https://eventstore.com/docs/http-api/competing-consumers/index.html#creating-a-persistent-subscription

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants