Skip to content

Commit

Permalink
Add overridden duration timeout to StreamTestKit
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Sep 9, 2024
1 parent 5f2d690 commit 0624688
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import pekko.stream.{ Materializer, SystemMaterializer }
import pekko.stream.impl.PhasedFusingActorMaterializer
import pekko.stream.testkit.scaladsl

import java.time.Duration
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration

object StreamTestKit {

/**
Expand All @@ -29,7 +33,21 @@ object StreamTestKit {
def assertAllStagesStopped(mat: Materializer): Unit =
mat match {
case impl: PhasedFusingActorMaterializer =>
scaladsl.StreamTestKit.assertNoChildren(impl.system, impl.supervisor)
scaladsl.StreamTestKit.assertNoChildren(impl.system, impl.supervisor, None)
case _ =>
}

/**
* Assert that there are no stages running under a given materializer.
* Usually this assertion is run after a test-case to check that all of the
* stages have terminated successfully with an overridden duration that ignores
* `stream.testkit.all-stages-stopped-timeout`.
*/
def assertAllStagesStopped(mat: Materializer, overrideTimeout: Duration): Unit =
mat match {
case impl: PhasedFusingActorMaterializer =>
scaladsl.StreamTestKit.assertNoChildren(impl.system, impl.supervisor,
Some(FiniteDuration(overrideTimeout.toMillis, TimeUnit.MILLISECONDS)))
case _ =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,30 @@ object StreamTestKit {
* This assertion is useful to check that all of the stages have
* terminated successfully.
*/
def assertAllStagesStopped[T](block: => T, overrideTimeout: FiniteDuration)(implicit materializer: Materializer): T =
materializer match {
case impl: PhasedFusingActorMaterializer =>
stopAllChildren(impl.system, impl.supervisor)
val result = block
assertNoChildren(impl.system, impl.supervisor, Some(overrideTimeout))
result
case _ => block
}

/**
* Asserts that after the given code block is ran, no stages are left over
* that were created by the given materializer with an overridden duration
* that ignores `stream.testkit.all-stages-stopped-timeout`.
*
* This assertion is useful to check that all of the stages have
* terminated successfully.
*/
def assertAllStagesStopped[T](block: => T)(implicit materializer: Materializer): T =
materializer match {
case impl: PhasedFusingActorMaterializer =>
stopAllChildren(impl.system, impl.supervisor)
val result = block
assertNoChildren(impl.system, impl.supervisor)
assertNoChildren(impl.system, impl.supervisor, None)
result
case _ => block
}
Expand All @@ -53,10 +71,15 @@ object StreamTestKit {
}

/** INTERNAL API */
@InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit = {
@InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef,
overrideTimeout: Option[FiniteDuration]): Unit = {
val probe = TestProbe()(sys)
val c = sys.settings.config.getConfig("pekko.stream.testkit")
val timeout = c.getDuration("all-stages-stopped-timeout", MILLISECONDS).millis

val timeout = overrideTimeout.getOrElse {
val c = sys.settings.config.getConfig("pekko.stream.testkit")
c.getDuration("all-stages-stopped-timeout", MILLISECONDS).millis
}

probe.within(timeout) {
try probe.awaitAssert {
supervisor.tell(StreamSupervisor.GetChildren, probe.ref)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.testkit

import org.apache.pekko.testkit.TestKitBase
import org.scalatest.time.{ Millis, Span }

import java.util.concurrent.TimeUnit

trait StreamConfiguration extends TestKitBase {
case class StreamConfig(allStagesStoppedTimeout: Span = Span({
val c = system.settings.config.getConfig("pekko.stream.testkit")
c.getDuration("all-stages-stopped-timeout", TimeUnit.MILLISECONDS)
}, Millis))

private val defaultStreamConfig = StreamConfig()

/**
* The default `StreamConfig` which is derived from the Actor System's `pekko.stream.testkit.all-stages-stopped-timeout`
* configuration value. If you want to provide a different StreamConfig for specific tests without having to re-specify
* `pekko.stream.testkit.all-stages-stopped-timeout` then you can override this value.
*/
implicit def streamConfig: StreamConfig = defaultStreamConfig

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import org.scalatest.Failed

import com.typesafe.config.{ Config, ConfigFactory }

abstract class StreamSpec(_system: ActorSystem) extends PekkoSpec(_system) {
import java.util.concurrent.TimeUnit

abstract class StreamSpec(_system: ActorSystem) extends PekkoSpec(_system) with StreamConfiguration {

def this(config: Config) =
this(
ActorSystem(
Expand Down Expand Up @@ -73,7 +76,8 @@ abstract class StreamSpec(_system: ActorSystem) extends PekkoSpec(_system) {
case impl: PhasedFusingActorMaterializer =>
stopAllChildren(impl.system, impl.supervisor)
val result = test.apply()
assertNoChildren(impl.system, impl.supervisor)
assertNoChildren(impl.system, impl.supervisor,
Some(FiniteDuration(streamConfig.allStagesStoppedTimeout.millisPart, TimeUnit.MILLISECONDS)))
result
case _ => other
}
Expand Down

0 comments on commit 0624688

Please sign in to comment.