1/*
2 * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package cats
18package effect
19package laws
20
21import cats.effect.concurrent.Deferred
22import cats.syntax.all._
23import cats.laws._
24
25import scala.concurrent.Promise
26
27trait ConcurrentEffectLaws[F[_]] extends ConcurrentLaws[F] with EffectLaws[F] {
28  implicit def F: ConcurrentEffect[F]
29
30  def runAsyncRunCancelableCoherence[A](fa: F[A]) = {
31    val fa1 = IO.async[A] { cb =>
32      F.runAsync(fa)(r => IO(cb(r))).unsafeRunSync()
33    }
34    val fa2 = IO.cancelable[A] { cb =>
35      F.toIO(F.runCancelable(fa)(r => IO(cb(r))).unsafeRunSync())
36    }
37    fa1 <-> fa2
38  }
39
40  def runCancelableIsSynchronous[A] = {
41    val lh = Deferred.uncancelable[F, Unit].flatMap { latch =>
42      val spawned = Promise[Unit]()
43      // Never ending task
44      val ff = F.cancelable[A] { _ =>
45        spawned.success(()); latch.complete(())
46      }
47      // Execute, then cancel
48      val token = F.delay(F.runCancelable(ff)(_ => IO.unit).unsafeRunSync()).flatMap { cancel =>
49        // Waiting for the task to start before cancelling it
50        Async.fromFuture(F.pure(spawned.future)) >> cancel
51      }
52      F.liftIO(F.runAsync(token)(_ => IO.unit).toIO) *> latch.get
53    }
54    lh <-> F.unit
55  }
56
57  def runCancelableStartCancelCoherence[A](a: A) = {
58    // Cancellation via runCancelable
59    val f1: F[A] = for {
60      effect1 <- Deferred.uncancelable[F, A]
61      latch <- F.delay(Promise[Unit]())
62      never = F.cancelable[A] { _ =>
63        latch.success(()); effect1.complete(a)
64      }
65      cancel <- F.liftIO(F.runCancelable(never)(_ => IO.unit).toIO)
66      // Waiting for the task to start before cancelling it
67      _ <- Async.fromFuture(F.pure(latch.future)) // TODO get rid of this, IO, and Future here
68      _ <- cancel
69      result <- effect1.get
70    } yield result
71
72    // Cancellation via start.flatMap(_.cancel)
73    val f2: F[A] = for {
74      effect2 <- Deferred.uncancelable[F, A]
75      // Using a latch to ensure that the task started
76      latch <- Deferred.uncancelable[F, Unit]
77      never = F.bracket(latch.complete(()))(_ => F.never[Unit])(_ => effect2.complete(a))
78      fiber <- F.start(never)
79      // Waiting for the task to start before cancelling it
80      _ <- latch.get
81      _ <- F.start(fiber.cancel)
82      result <- effect2.get
83    } yield result
84
85    f1 <-> f2
86  }
87
88  def toIORunCancelableConsistency[A](fa: F[A]) =
89    ConcurrentEffect.toIOFromRunCancelable(fa) <-> F.toIO(fa)
90}
91
92object ConcurrentEffectLaws {
93  def apply[F[_]](implicit F0: ConcurrentEffect[F], contextShift0: ContextShift[F]): ConcurrentEffectLaws[F] =
94    new ConcurrentEffectLaws[F] {
95      val F = F0
96      val contextShift = contextShift0
97    }
98}
99