Skip to content

Commit

Permalink
Merge pull request #3460 from djspiewak/feature/cancelable
Browse files Browse the repository at this point in the history
Added `cancelable`
  • Loading branch information
djspiewak authored Feb 27, 2023
2 parents 023bffd + 7a5311b commit 73a379d
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 0 deletions.
45 changes: 45 additions & 0 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,51 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
def backgroundOn(ec: ExecutionContext): ResourceIO[IO[OutcomeIO[A @uncheckedVariance]]] =
Resource.make(startOn(ec))(_.cancel).map(_.join)

/**
* Given an effect which might be [[uncancelable]] and a finalizer, produce an effect which
* can be canceled by running the finalizer. This combinator is useful for handling scenarios
* in which an effect is inherently uncancelable but may be canceled through setting some
* external state. A trivial example of this might be the following:
*
* {{{
* val flag = new AtomicBoolean(false)
* val ioa = IO blocking {
* while (!flag.get()) {
* Thread.sleep(10)
* }
* }
*
* ioa.cancelable(IO.delay(flag.set(true)))
* }}}
*
* Without `cancelable`, effects constructed by `blocking`, `delay`, and similar are
* inherently uncancelable. Simply adding an `onCancel` to such effects is insufficient to
* resolve this, despite the fact that under *some* circumstances (such as the above), it is
* possible to enrich an otherwise-uncancelable effect with early termination. `cancelable`
* addresses this use-case.
*
* Note that there is no free lunch here. If an effect truly cannot be prematurely terminated,
* `cancelable` will not allow for cancelation. As an example, if you attempt to cancel
* `uncancelable(_ => never)`, the cancelation will hang forever (in other words, it will be
* itself equivalent to `never`). Applying `cancelable` will not change this in any way. Thus,
* attempting to cancel `cancelable(uncancelable(_ => never), unit)` will ''also'' hang
* forever. As in all cases, cancelation will only return when all finalizers have run and the
* fiber has fully terminated.
*
* If the `IO` self-cancels and the `cancelable` itself is uncancelable, the resulting fiber
* will be equal to `never` (similar to [[race]]). Under normal circumstances, if `IO`
* self-cancels, that cancelation will be propagated to the calling context.
*
* @param fin
* an effect which orchestrates some external state which terminates the `IO`
* @see
* [[uncancelable]]
* @see
* [[onCancel]]
*/
def cancelable(fin: IO[Unit]): IO[A] =
Spawn[IO].cancelable(this, fin)

def forceR[B](that: IO[B]): IO[B] =
// cast is needed here to trick the compiler into avoiding the IO[Any]
asInstanceOf[IO[Unit]].handleError(_ => ()).productR(that)
Expand Down
80 changes: 80 additions & 0 deletions docs/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,83 @@ If you're seeing this warning and have not changed any of the default configurat
- If you would like to entirely disable this check, you can do so by overriding the `cpuStarvationCheckInitialDelay` value within `IORuntimeConfig` to `Duration.Inf`

Please understand that this warning is essentially never a false negative. Outside of some extremely rare circumstances, it accurately measures the responsiveness of your application runtime. However, as a corollary of this accuracy, it is measuring not only your application runtime, but also the system (and other adjacent processes on that system) on which your application is running. Thus, if you see this warning, it essentially always means that either the checker's definition of "responsiveness" (100 millisecond SLA) is different from what you expected, or it means that there is some legitimate problem *somewhere* in your application, your JVM, your kernel, your host environment, or all of the above.

## How do I cancel or timeout `delay` or `blocking`?

Under normal circumstances, effects constructed with `delay` or `blocking` are *uncancelable*, meaning that they will suppress any attempt to `cancel` their fiber until the effect completes. A classic example:

```scala
IO.blocking(Thread.sleep(1000))
```

The above will block a thread for 1 second. This is done in a *relatively* safe fashion since the `blocking` constructor will ensure that the compute pool is not starved for threads, but the behavior can still be unintuitive. For example:

```scala
IO.blocking(Thread.sleep(1000)).timeout(100.millis)
```

This effect will take the full 1 second to complete! This is because the timeout fires at the 100 millisecond mark, but since `blocking` is uncancelable, `IO` will wait for it to complete. This is very much by design since it avoids situations in which resources might leak or be left in an invalid state due to unsupported cancelation, but it can be confusing in scenarios like this.

There are two possible ways to address this situation, and the correct one to use depends on a number of different factors. In this particular scenario, `Thread.sleep` *happens* to correctly respect Java `Thread` interruption, and so we can fix this by swapping `blocking` for `interruptible`:

```scala
IO.interuptible(Thread.sleep(1000)).timeout(100.millis)
```

The above will return in 100 milliseconds, raising a `TimeoutException` as expected.

However, not *all* effects respect thread interruption. Notably, most things involving `java.io` file operations (e.g. `FileReader`) ignore interruption. When working with this type of effect, we must turn to other means. A simple and naive example:

```scala
def readBytes(fis: FileInputStream) =
IO blocking {
var bytes = new ArrayBuffer[Int]

var i = fis.read()
while (i >= 0) {
bytes += i
i = fis.read()
}

bytes.toArray
}

IO.bracket(
IO(new FileInputStream(inputFile)))(
readBytes(_))(
fis => IO(fis.close()))
```

In the above snippet, swapping `blocking` for `interruptible` won't actually help, since `fis.read()` ignores `Thread` interruption! However, we can still *partially* resolve this by introducing a bit of external state:

```scala
IO(new AtomicBoolean(false)) flatMap { flag =>
def readBytes(fis: FileInputStream) =
IO blocking {
var bytes = new ArrayBuffer[Int]

var i = fis.read()
while (i >= 0 && !flag.get()) {
bytes += i
i = fis.read()
}

bytes.toArray
}

val readAll = IO.bracket(
IO(new FileInputStream(inputFile)))(
readBytes(_))(
fis => IO(fis.close()))

readAll.cancelable(IO(flag.set(true)))
}
```

This is *almost* the same as the previous snippet except for the introduction of an `AtomicBoolean` which is checked on each iteration of the `while` loop. We then set this flag by using `cancelable` (right at the end), which makes the whole thing (safely!) cancelable, despite the fact that it was defined with `blocking` (note this also works with `delay` and absolutely everything else).

It is still worth keeping in mind that this is only a partial solution. Whenever `fis.read()` is blocked, cancelation will wait for that blocking to complete regardless of how long it takes. In other words, we still can't interrupt the `read()` function any more than we could with `interruptible`. All that `cancelable` is achieving here is allowing us to encode a more bespoke cancelation protocol for a particular effect, in this case in terms of `AtomicBoolean`.

> Note: It is actually possible to implement a *proper* solution for cancelation here simply by applying the `cancelable` to the inner `blocking` effect and defining the handler to be `IO.blocking(fis.close())`, in addition to adding some error handling logic to catch the corresponding exception. This is a special case however, specific to `FileInputStream`, and doesn't make for as nice of an example. :-)
The reason this is safe is it effectively leans entirely on *cooperative* cancelation. It's relatively common to have effects which cannot be canceled by normal means (and thus are, correctly, `uncancelable`) but which *can* be terminated early by using some ancillary protocol (in this case, an `AtomicBoolean`). Note that nothing is magic here and this is still fully safe with respect to backpressure and other finalizers. For example, `fis.close()` will still be run at the proper time, and cancelation of this fiber will only complete when all finalizers are done, exactly the same as non-`uncancelable` effects.
52 changes: 52 additions & 0 deletions kernel/shared/src/main/scala/cats/effect/kernel/GenSpawn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,58 @@ trait GenSpawn[F[_], E] extends MonadCancel[F, E] with Unique[F] {
def background[A](fa: F[A]): Resource[F, F[Outcome[F, E, A]]] =
Resource.make(start(fa))(_.cancel)(this).map(_.join)

/**
* Given an effect which might be [[uncancelable]] and a finalizer, produce an effect which
* can be canceled by running the finalizer. This combinator is useful for handling scenarios
* in which an effect is inherently uncancelable but may be canceled through setting some
* external state. A trivial example of this might be the following (assuming an [[Async]]
* instance):
*
* {{{
* val flag = new AtomicBoolean(false)
* val fa = F blocking {
* while (!flag.get()) {
* Thread.sleep(10)
* }
* }
*
* F.cancelable(fa, F.delay(flag.set(true)))
* }}}
*
* Without `cancelable`, effects constructed by `blocking`, `delay`, and similar are
* inherently uncancelable. Simply adding an `onCancel` to such effects is insufficient to
* resolve this, despite the fact that under *some* circumstances (such as the above), it is
* possible to enrich an otherwise-uncancelable effect with early termination. `cancelable`
* addresses this use-case.
*
* Note that there is no free lunch here. If an effect truly cannot be prematurely terminated,
* `cancelable` will not allow for cancelation. As an example, if you attempt to cancel
* `uncancelable(_ => never)`, the cancelation will hang forever (in other words, it will be
* itself equivalent to `never`). Applying `cancelable` will not change this in any way. Thus,
* attempting to cancel `cancelable(uncancelable(_ => never), unit)` will ''also'' hang
* forever. As in all cases, cancelation will only return when all finalizers have run and the
* fiber has fully terminated.
*
* If `fa` self-cancels and the `cancelable` itself is uncancelable, the resulting fiber will
* be equal to `never` (similar to [[race]]). Under normal circumstances, if `fa`
* self-cancels, that cancelation will be propagated to the calling context.
*
* @param fa
* the effect to be canceled
* @param fin
* an effect which orchestrates some external state which terminates `fa`
* @see
* [[uncancelable]]
* @see
* [[onCancel]]
*/
def cancelable[A](fa: F[A], fin: F[Unit]): F[A] =
uncancelable { poll =>
start(fa) flatMap { fiber =>
poll(fiber.join).onCancel(fin *> fiber.cancel).flatMap(_.embed(poll(canceled *> never)))
}
}

/**
* A non-terminating effect that never completes, which causes a fiber to semantically block
* indefinitely. This is the purely functional, asynchronous equivalent of an infinite while
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ final class GenSpawnOps[F[_], A, E] private[syntax] (private val wrapped: F[A])
def background(implicit F: GenSpawn[F, E]): Resource[F, F[Outcome[F, E, A]]] =
F.background(wrapped)

def cancelable(fin: F[Unit])(implicit F: GenSpawn[F, E]): F[A] =
F.cancelable(wrapped, fin)

def raceOutcome[B](another: F[B])(
implicit F: GenSpawn[F, E]
): F[Either[Outcome[F, E, A], Outcome[F, E, B]]] =
Expand Down
8 changes: 8 additions & 0 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,14 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
failed must beFalse
}

"support re-enablement via cancelable" in ticked { implicit ticker =>
val test = IO.deferred[Unit] flatMap { latch =>
latch.get.uncancelable.cancelable(latch.complete(()).void)
}

test.start.flatMap(_.cancel) must completeAs(())
}

"only unmask within current fiber" in ticked { implicit ticker =>
var passed = false
val test = IO uncancelable { poll =>
Expand Down

0 comments on commit 73a379d

Please sign in to comment.