diff --git a/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala b/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala index 284b45b403..d3a0150c19 100644 --- a/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala +++ b/std/jvm/src/main/scala/cats/effect/std/DispatcherPlatform.scala @@ -44,22 +44,23 @@ private[std] trait DispatcherPlatform[F[_]] { this: Dispatcher[F] => } /** - * Submits an effect to be executed and indefinitely blocks until a result is produced. This - * function will throw an exception if the submitted effect terminates with an error. + * Submits an effect to be executed and indefinitely blocks until a result is produced. + * Cancels the effect in case of Java thread interruption. This function will throw an + * exception if the submitted effect terminates with an error. */ def unsafeRunSync[A](fa: F[A]): A = unsafeRunTimed(fa, Duration.Inf) /** * Submits an effect to be executed and blocks for at most the specified timeout for a result - * to be produced. This function will throw an exception if the submitted effect terminates - * with an error. + * to be produced. Cancels the effect both in case of timeout or Java thread interruption. + * This function will throw an exception if the submitted effect terminates with an error. */ def unsafeRunTimed[A](fa: F[A], timeout: Duration): A = { val (fut, cancel) = unsafeToFutureCancelable(fa) try Await.result(fut, timeout) catch { - case t: TimeoutException => + case t @ (_: TimeoutException | _: InterruptedException) => cancel() throw t } diff --git a/tests/jvm/src/test/scala/cats/effect/std/DispatcherJVMSpec.scala b/tests/jvm/src/test/scala/cats/effect/std/DispatcherJVMSpec.scala index f0df165da8..0b1156bcd4 100644 --- a/tests/jvm/src/test/scala/cats/effect/std/DispatcherJVMSpec.scala +++ b/tests/jvm/src/test/scala/cats/effect/std/DispatcherJVMSpec.scala @@ -20,6 +20,8 @@ package std import cats.effect.kernel.Deferred import cats.syntax.all._ +import scala.concurrent.duration.DurationInt + class DispatcherJVMSpec extends BaseSpec { "async dispatcher" should { @@ -42,5 +44,23 @@ class DispatcherJVMSpec extends BaseSpec { } } yield ok } + + "Propagate Java thread interruption in unsafeRunSync" in real { + Dispatcher.parallel[IO](await = true).use { dispatcher => + for { + canceled <- Deferred[IO, Unit] + io = IO.sleep(1.second).onCancel(canceled.complete(()).void) + f <- IO.interruptible { + try dispatcher.unsafeRunSync(io) + catch { case _: InterruptedException => } + }.start + _ <- IO.sleep(100.millis) + _ <- f.cancel + _ <- canceled + .get + .timeoutTo(300.millis, IO.raiseError(new Exception("io was not canceled"))) + } yield ok + } + } } }