Skip to content

Commit

Permalink
Merge pull request #3284 from djspiewak/feature/specialize-deferred
Browse files Browse the repository at this point in the history
  • Loading branch information
djspiewak authored Dec 24, 2022
2 parents 07e7050 + a045092 commit 3ab83ce
Show file tree
Hide file tree
Showing 26 changed files with 209 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.AsyncBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.AsyncBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.AttemptBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.AttemptBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.BothBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.BothBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.DeepBindBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.DeepBindBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.benchmarks

import cats.effect._
import cats.effect.unsafe.implicits.global
import cats.syntax.all._

import org.openjdk.jmh.annotations._

import java.util.concurrent.TimeUnit

/**
* To do comparative benchmarks between versions:
*
* benchmarks/run-benchmark DeferredBenchmark
*
* This will generate results in `benchmarks/results`.
*
* Or to run the benchmark from within sbt:
*
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.DeferredBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
* more is better.
*/
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class DeferredBenchmark {

@Param(Array("10", "100", "1000"))
var count: Int = _

@Benchmark
def get(): Unit = {
IO.deferred[Unit]
.flatMap(d => d.complete(()) *> d.get.replicateA_(count))
.replicateA_(1000)
.unsafeRunSync()
}

@Benchmark
def complete(): Unit = {
IO.deferred[Unit]
.flatMap { d => d.get.parReplicateA_(count) &> d.complete(()) }
.replicateA_(1000)
.unsafeRunSync()
}

@Benchmark
def cancel(): Unit = {
IO.deferred[Unit]
.flatMap { d => d.get.start.replicateA(count).flatMap(_.traverse(_.cancel)) }
.replicateA_(1000)
.unsafeRunSync()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.DispatcherBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.DispatcherBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.HandleErrorBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.HandleErrorBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.MapCallsBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.MapCallsBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.MapStreamBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.MapStreamBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 4 cats.effect.benchmarks.ParallelBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 4 cats.effect.benchmarks.ParallelBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "4 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.QueueBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.QueueBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.AttemptBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.AttemptBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.RandomBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.RandomBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.RedeemBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.RedeemBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.RedeemWithBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.RedeemWithBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.RefBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.RefBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.ShallowBindBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.ShallowBindBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit
/**
* To run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.ThreadLocalBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.ThreadLocalBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.WorkStealingBenchmark
* Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.WorkStealingBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
Expand Down
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,9 @@ addCommandAlias(CI.JS.command, CI.JS.toString)
addCommandAlias(CI.Firefox.command, CI.Firefox.toString)
addCommandAlias(CI.Chrome.command, CI.Chrome.toString)

addCommandAlias(
tlReplaceCommandAlias(
"prePR",
"; root/clean; root/scalafixAll; scalafmtSbt; +root/scalafmtAll; +root/headerCreate")
"; root/clean; +root/headerCreate; root/scalafixAll; scalafmtSbt; +root/scalafmtAll")

val jsProjects: Seq[ProjectReference] =
Seq(kernel.js, kernelTestkit.js, laws.js, core.js, testkit.js, testsJS, std.js, example.js)
Expand Down Expand Up @@ -595,7 +595,10 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
"cats.effect.NonDaemonThreadLogger.sleepIntervalMillis"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.NonDaemonThreadLogger.this"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.NonDaemonThreadLogger$")
ProblemFilters.exclude[MissingClassProblem]("cats.effect.NonDaemonThreadLogger$"),
// introduced by #3284
// internal API change
ProblemFilters.exclude[IncompatibleMethTypeProblem]("cats.effect.CallbackStack.apply")
) ++ {
if (tlIsScala3.value) {
// Scala 3 specific exclusions
Expand Down
8 changes: 4 additions & 4 deletions core/shared/src/main/scala/cats/effect/CallbackStack.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import scala.annotation.tailrec

import java.util.concurrent.atomic.AtomicReference

private final class CallbackStack[A](private[this] var callback: OutcomeIO[A] => Unit)
private final class CallbackStack[A](private[this] var callback: A => Unit)
extends AtomicReference[CallbackStack[A]] {

def push(next: OutcomeIO[A] => Unit): CallbackStack[A] = {
def push(next: A => Unit): CallbackStack[A] = {
val attempt = new CallbackStack(next)

@tailrec
Expand All @@ -40,7 +40,7 @@ private final class CallbackStack[A](private[this] var callback: OutcomeIO[A] =>
loop()
}

def unsafeSetCallback(cb: OutcomeIO[A] => Unit): Unit = {
def unsafeSetCallback(cb: A => Unit): Unit = {
callback = cb
}

Expand All @@ -49,7 +49,7 @@ private final class CallbackStack[A](private[this] var callback: OutcomeIO[A] =>
* iff *any* callbacks were invoked.
*/
@tailrec
def apply(oc: OutcomeIO[A], invoked: Boolean): Boolean = {
def apply(oc: A, invoked: Boolean): Boolean = {
val cb = callback

val invoked2 = if (cb != null) {
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1434,7 +1434,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {

def ref[A](a: A): IO[Ref[IO, A]] = IO(Ref.unsafe(a))

def deferred[A]: IO[Deferred[IO, A]] = IO(Deferred.unsafe)
def deferred[A]: IO[Deferred[IO, A]] = IO(new IODeferred[A])

def bracketFull[A, B](acquire: Poll[IO] => IO[A])(use: A => IO[B])(
release: (A, OutcomeIO[B]) => IO[Unit]): IO[B] =
Expand Down
82 changes: 82 additions & 0 deletions core/shared/src/main/scala/cats/effect/IODeferred.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect

import cats.effect.syntax.all._
import cats.syntax.all._
import cats.~>

import java.util.concurrent.atomic.AtomicReference

private final class IODeferred[A] extends Deferred[IO, A] {
import IODeferred.Sentinel

private[this] val cell = new AtomicReference[AnyRef](Sentinel)
private[this] val callbacks = new CallbackStack[Right[Nothing, A]](null)

def complete(a: A): IO[Boolean] = IO {
if (cell.compareAndSet(Sentinel, a.asInstanceOf[AnyRef])) {
val _ = callbacks(Right(a), false)
true
} else {
false
}
}

def get: IO[A] = IO defer {
val back = cell.get()

if (back eq Sentinel) {
IO.cont[A, A](new Cont[IO, A, A] {
def apply[G[_]: MonadCancelThrow] = {
(cb: Either[Throwable, A] => Unit, get: G[A], lift: IO ~> G) =>
MonadCancel[G] uncancelable { poll =>
val gga = lift {
IO {
val handle = callbacks.push(cb)

val back = cell.get()
if (back eq Sentinel) {
poll(get).onCancel(lift(IO(handle.clearCurrent())))
} else {
handle.clearCurrent()
back.asInstanceOf[A].pure[G]
}
}
}

gga.flatten
}
}
})
} else {
IO.pure(back.asInstanceOf[A])
}
}

def tryGet: IO[Option[A]] = IO {
val back = cell.get()
if (back eq Sentinel)
None
else
Some(back.asInstanceOf[A])
}
}

private object IODeferred {
private val Sentinel = new AnyRef
}
Loading

0 comments on commit 3ab83ce

Please sign in to comment.