1{-# LANGUAGE DeriveDataTypeable #-}
2{-# LANGUAGE CPP #-}
3module UnliftIO.AsyncSpec (spec) where
4
5import Test.Hspec
6import Test.Hspec.QuickCheck
7import Test.QuickCheck
8import UnliftIO
9import UnliftIO.Internals.Async
10import Data.List (nub)
11import Control.Applicative
12import Control.Concurrent (myThreadId, threadDelay)
13import qualified Control.Exception as CE (ErrorCall(..), try)
14import GHC.Conc.Sync (ThreadStatus(..), threadStatus)
15import Control.Concurrent.STM (throwSTM)
16import Control.Exception (getMaskingState, MaskingState (Unmasked))
17
18data MyExc = MyExc
19  deriving (Show, Eq, Typeable)
20instance Exception MyExc
21
22spec :: Spec
23spec = do
24  describe "replicateConcurrently_" $ do
25    prop "works" $ \(NonNegative cnt) -> do
26      ref <- newIORef (0 :: Int)
27      replicateConcurrently_ cnt $ atomicModifyIORef' ref $ \i -> (i + 1, ())
28      readIORef ref `shouldReturn` cnt
29
30    it "uses a different thread per replicated action" $
31      forAllShrink ((+ 1) . abs <$> arbitrary) (filter (>= 1) . shrink) $ \n -> do
32        threadIdsRef <- newIORef []
33        let action = myThreadId >>= \tid -> atomicModifyIORef' threadIdsRef (\acc -> (tid:acc, ()))
34        replicateConcurrently_ n action
35        tids <- readIORef threadIdsRef
36        tids `shouldBe` (nub tids)
37
38#if MIN_VERSION_base(4,8,0)
39  describe "flatten" $ do
40    -- NOTE: cannot make this test a property test given
41    -- Flat and Conc cannot have an Eq property
42    it "flattens all alternative trees" $ do
43      let
44        concValue :: Conc IO Int
45        concValue =
46            conc (pure 1) <|> conc (pure 2) <|> pure 3
47            -- Alt (Alt (Action (pure 1)) (Action (pure 2)))
48            --     (Pure 3)
49      flatConc <- flatten concValue
50      case flatConc of
51        FlatAlt (FlatAction action1)
52                (FlatAction action2)
53                [(FlatPure 3)] -> do
54          action1 `shouldReturn` 1
55          action2 `shouldReturn` 2
56        _ -> expectationFailure "expecting flatten to work but didn't"
57
58  describe "conc" $ do
59    it "handles sync exceptions" $ do
60      runConc (conc (pure ()) *> conc (throwIO MyExc))
61        `shouldThrow` (== MyExc)
62
63    it "handles async exceptions" $ do
64      tidVar <- newEmptyMVar
65      result <- CE.try $ runConc (conc (pure ())
66                               *> conc (takeMVar tidVar >>= (`throwTo` (CE.ErrorCall "having error")))
67                               *> conc (myThreadId
68                                        >>= putMVar tidVar
69                                        >> threadDelay 1000100))
70      case result of
71        Right _ ->
72          expectationFailure "Expecting an error, got none"
73        Left (SomeAsyncException err) ->
74          displayException err `shouldBe` "having error"
75
76    it "has an Unmasked masking state for given subroutines" $
77      uninterruptibleMask_ $
78        runConc $ conc (threadDelay maxBound) <|>
79          conc (getMaskingState `shouldReturn` Unmasked)
80
81-- NOTE: Older versions of GHC have a timeout function that doesn't
82-- work on Windows
83#if !WINDOWS
84    it "allows to kill parent via timeout" $ do
85      ref <- newIORef (0 :: Int)
86      mres <- timeout 20 $ runConc $
87        conc (pure ()) *>
88        conc ((writeIORef ref 1 >> threadDelay maxBound >> writeIORef ref 2)
89              `finally` writeIORef ref 3)
90      mres `shouldBe` Nothing
91      res <- readIORef ref
92      case res of
93        0 -> putStrLn "make timeout longer"
94        1 -> error "it's 1"
95        2 -> error "it's 2"
96        3 -> pure ()
97        _ -> error $ "what? " ++ show res
98#endif
99
100    it "throws right exception on empty" $
101      runConc empty `shouldThrow` (== EmptyWithNoAlternative)
102
103  describe "Conc Applicative instance" $ do
104    prop "doesn't fork a new thread on a pure call" $ \i ->
105      runConc (pure (i :: Int)) `shouldReturn` i
106
107    it "evaluates all needed sub-routines " $ do
108      runConc (conc (pure ()) *> conc (throwIO MyExc))
109        `shouldThrow` (== MyExc)
110
111    it "cleanup on brackets work" $ do
112      var <- newTVarIO (0 :: Int)
113      let worker = conc $ bracket_
114            (atomically $ modifyTVar' var (+ 1))
115            (atomically $ modifyTVar' var (subtract 1))
116            (threadDelay 10000000 >> error "this should never happen")
117          count = 10
118          killer = conc $ atomically $ do
119            count' <- readTVar var
120            checkSTM $ count == count'
121            throwSTM MyExc
122          composed = foldr (*>) killer (replicate count worker)
123      runConc composed `shouldThrow` (== MyExc)
124      atomically (readTVar var) `shouldReturn` 0
125
126    it "re-throws exception that happened first" $ do
127      let composed = conc (throwIO MyExc) *> conc (threadDelay 1000000 >> error "foo")
128      runConc composed `shouldThrow` (== MyExc)
129
130  describe "Conc Alternative instance" $ do
131    it "is left associative" $ do
132      let
133        concValue :: Conc IO Int
134        concValue =
135            conc (pure 1) <|> conc (pure 2) <|> conc (pure 3)
136      case concValue of
137        Alt (Alt (Action action1) (Action action2)) (Action action3) -> do
138          action1 `shouldReturn` 1
139          action2 `shouldReturn` 2
140          action3 `shouldReturn` 3
141
142        _ -> expectationFailure "expecting Conc Alternative to be left associative, but it wasn't"
143
144    it "executes body of all alternative blocks" $ do
145      var <- newEmptyMVar
146      runConc $
147        conc (takeMVar var) <|>
148        conc (threadDelay maxBound) <|>
149        conc (threadDelay 100 >> pure ())
150      -- if a GC runs at the right time, it's possible that both `takeMVar` and
151      -- `runConc` itself will be in a "blocked indefinitely on MVar" situation,
152      -- adding line bellow to avoid that
153      putMVar var ()
154
155    it "finishes all threads that didn't finish first" $ do
156      ref <- newIORef []
157      runConc $
158        conc (do tid <- myThreadId
159                 atomicModifyIORef' ref (\acc -> (tid:acc, ()))
160                 -- it is never going to finish
161                 threadDelay maxBound) <|>
162        conc (do tid <- myThreadId
163                 -- it finishes after registering thread id
164                 atomicModifyIORef' ref (\acc -> (tid:acc, ()))
165                 threadDelay 500) <|>
166        conc (do tid <- myThreadId
167                 atomicModifyIORef' ref (\acc -> (tid:acc, ()))
168                 -- it is never going to finish
169                 threadDelay maxBound)
170      threads <- readIORef ref
171      statusList <- mapM threadStatus threads
172      length (filter (== ThreadFinished) statusList) `shouldBe` 3
173
174    it "nesting works" $ do
175      var <- newEmptyMVar
176      let sillyAlts :: Conc IO a -> Conc IO a
177          sillyAlts c = c <|> conc (takeMVar var >> error "shouldn't happen")
178      res <- runConc $ sillyAlts $ (+)
179        <$> sillyAlts (conc (pure 1))
180        <*> sillyAlts (conc (pure 2))
181      res `shouldBe` 3
182      putMVar var ()
183
184#endif
185