1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE CPP #-}
3{-# LANGUAGE DeriveDataTypeable #-}
4{-# LANGUAGE FlexibleInstances #-}
5{-# OPTIONS_GHC -fno-warn-orphans #-}
6import Test.Hspec
7import Test.Hspec.QuickCheck (prop)
8import Test.QuickCheck (getPositive)
9import Test.QuickCheck.Monadic (assert, monadicIO, run)
10
11import Data.Conduit (runConduit, (.|), ConduitT, runConduitPure, runConduitRes)
12import qualified Data.Conduit as C
13import qualified Data.Conduit.Lift as C
14import qualified Data.Conduit.Internal as CI
15import qualified Data.Conduit.List as CL
16import Data.Typeable (Typeable)
17import Control.Exception (throw, evaluate)
18import Control.Monad.Trans.Resource (runResourceT)
19import Control.Monad.Trans.Maybe (MaybeT (MaybeT))
20import Control.Monad.State.Strict (modify)
21import Data.Maybe   (fromMaybe,catMaybes,fromJust)
22import qualified Data.List as DL
23import qualified Data.List.Split as DLS (chunksOf)
24import Control.Monad.ST (runST)
25import Data.Monoid
26import qualified Data.IORef as I
27import Data.Tuple (swap)
28import Control.Monad.Trans.Resource (allocate, resourceForkIO)
29import Control.Concurrent (threadDelay, killThread)
30import Control.Monad.IO.Class (liftIO)
31import Control.Monad.Trans.Class (lift)
32import Control.Monad.Trans.Writer (execWriter, tell, runWriterT)
33import Control.Monad.Trans.State (evalStateT, get, put)
34import qualified Control.Monad.Writer as W
35import Control.Applicative (pure, (<$>), (<*>))
36import qualified Control.Monad.Catch as Catch
37import Data.Functor.Identity (Identity,runIdentity)
38import Control.Monad (forever, void)
39import Data.Void (Void)
40import qualified Control.Concurrent.MVar as M
41import Control.Monad.Except (catchError, throwError)
42import qualified Data.Map as Map
43import qualified Data.Conduit.Extra.ZipConduitSpec as ZipConduit
44import qualified Data.Conduit.StreamSpec as Stream
45import qualified Spec
46
47(@=?) :: (Eq a, Show a) => a -> a -> IO ()
48(@=?) = flip shouldBe
49
50-- Quickcheck property for testing equivalence of list processing
51-- functions and their conduit counterparts
52equivToList :: Eq b => ([a] -> [b]) -> ConduitT a b Identity () -> [a] -> Bool
53equivToList f conduit xs =
54  f xs == runConduitPure (CL.sourceList xs .| conduit .| CL.consume)
55
56-- | Check that two conduits produce the same outputs and return the same result.
57bisimilarTo :: (Eq a, Eq r) => ConduitT () a Identity r -> ConduitT () a Identity r -> Bool
58left `bisimilarTo` right =
59    C.runConduitPure (toListRes left) == C.runConduitPure (toListRes right)
60  where
61    -- | Sink a conduit into a list and return it alongside the result.
62    -- So it is, essentially, @sinkList@ plus result.
63    toListRes :: Monad m => ConduitT () a m r -> ConduitT () Void m ([a], r)
64    toListRes cond = swap <$> C.fuseBoth cond CL.consume
65
66
67main :: IO ()
68main = hspec $ do
69    describe "Combinators" Spec.spec
70    describe "data loss rules" $ do
71        it "consumes the source to quickly" $ do
72            x <- runConduitRes $ CL.sourceList [1..10 :: Int] .| do
73                  strings <- CL.map show .| CL.take 5
74                  liftIO $ putStr $ unlines strings
75                  CL.fold (+) 0
76            40 `shouldBe` x
77
78        it "correctly consumes a chunked resource" $ do
79            x <- runConduitRes $ (CL.sourceList [1..5 :: Int] `mappend` CL.sourceList [6..10]) .| do
80                strings <- CL.map show .| CL.take 5
81                liftIO $ putStr $ unlines strings
82                CL.fold (+) 0
83            40 `shouldBe` x
84
85    describe "filter" $ do
86        it "even" $ do
87            x <- runConduitRes $ CL.sourceList [1..10] .| CL.filter even .| CL.consume
88            x `shouldBe` filter even [1..10 :: Int]
89
90    prop "concat" $ equivToList (concat :: [[Int]]->[Int]) CL.concat
91
92    describe "mapFoldable" $ do
93        prop "list" $
94            equivToList (concatMap (:[]) :: [Int]->[Int]) (CL.mapFoldable  (:[]))
95        let f x = if odd x then Just x else Nothing
96        prop "Maybe" $
97            equivToList (catMaybes . map f :: [Int]->[Int]) (CL.mapFoldable f)
98
99    prop "scan" $ equivToList (tail . scanl (+) 0 :: [Int]->[Int]) (void $ CL.scan (+) 0)
100
101    -- mapFoldableM and scanlM are fully polymorphic in type of monad
102    -- so it suffice to check only with Identity.
103    describe "mapFoldableM" $ do
104        prop "list" $
105            equivToList (concatMap (:[]) :: [Int]->[Int]) (CL.mapFoldableM (return . (:[])))
106        let f x = if odd x then Just x else Nothing
107        prop "Maybe" $
108            equivToList (catMaybes . map f :: [Int]->[Int]) (CL.mapFoldableM (return . f))
109
110    prop "scanM" $ equivToList (tail . scanl (+) 0) (void $ CL.scanM (\a s -> return $ a + s) (0 :: Int))
111
112    describe "ResourceT" $ do
113        it "resourceForkIO" $ do
114            counter <- I.newIORef 0
115            let w = allocate
116                        (I.atomicModifyIORef counter $ \i ->
117                            (i + 1, ()))
118                        (const $ I.atomicModifyIORef counter $ \i ->
119                            (i - 1, ()))
120            runResourceT $ do
121                _ <- w
122                _ <- resourceForkIO $ return ()
123                _ <- resourceForkIO $ return ()
124                sequence_ $ replicate 1000 $ do
125                    tid <- resourceForkIO $ return ()
126                    liftIO $ killThread tid
127                _ <- resourceForkIO $ return ()
128                _ <- resourceForkIO $ return ()
129                return ()
130
131            -- give enough of a chance to the cleanup code to finish
132            threadDelay 1000
133            res <- I.readIORef counter
134            res `shouldBe` (0 :: Int)
135
136    describe "sum" $ do
137        it "works for 1..10" $ do
138            x <- runConduitRes $ CL.sourceList [1..10] .| CL.fold (+) (0 :: Int)
139            x `shouldBe` sum [1..10]
140        prop "is idempotent" $ \list ->
141            (runST $ runConduit $ CL.sourceList list .| CL.fold (+) (0 :: Int))
142            == sum list
143
144    describe "foldMap" $ do
145        it "sums 1..10" $ do
146            Sum x <- runConduit $ CL.sourceList [1..(10 :: Int)] .| CL.foldMap Sum
147            x `shouldBe` sum [1..10]
148
149        it "preserves order" $ do
150            x <- runConduit $ CL.sourceList [[4],[2],[3],[1]] .| CL.foldMap (++[(9 :: Int)])
151            x `shouldBe` [4,9,2,9,3,9,1,9]
152
153    describe "foldMapM" $ do
154        it "sums 1..10" $ do
155            Sum x <- runConduit $ CL.sourceList [1..(10 :: Int)] .| CL.foldMapM (return . Sum)
156            x `shouldBe` sum [1..10]
157
158        it "preserves order" $ do
159            x <- runConduit $ CL.sourceList [[4],[2],[3],[1]] .| CL.foldMapM (return . (++[(9 :: Int)]))
160            x `shouldBe` [4,9,2,9,3,9,1,9]
161
162    describe "unfold" $ do
163        it "works" $ do
164            let f 0 = Nothing
165                f i = Just (show i, i - 1)
166                seed = 10 :: Int
167            x <- runConduit $ CL.unfold f seed .| CL.consume
168            let y = DL.unfoldr f seed
169            x `shouldBe` y
170
171    describe "unfoldM" $ do
172        it "works" $ do
173            let f 0 = Nothing
174                f i = Just (show i, i - 1)
175                seed = 10 :: Int
176            x <- runConduit $ CL.unfoldM (return . f) seed .| CL.consume
177            let y = DL.unfoldr f seed
178            x `shouldBe` y
179
180    describe "uncons" $ do
181        prop "folds to list" $ \xs ->
182          let src = C.sealConduitT $ CL.sourceList xs in
183          (xs :: [Int]) == DL.unfoldr CL.uncons src
184
185        prop "works with unfold" $ \xs ->
186          let src = CL.sourceList xs in
187          CL.unfold CL.uncons (C.sealConduitT src) `bisimilarTo` (src :: ConduitT () Int Identity ())
188
189    describe "unconsEither" $ do
190        let
191          eitherToMaybe :: Either l a -> Maybe a
192          eitherToMaybe (Left _) = Nothing
193          eitherToMaybe (Right a) = Just a
194        prop "folds outputs to list" $ \xs ->
195          let src = C.sealConduitT $ CL.sourceList xs in
196          (xs :: [Int]) == DL.unfoldr (eitherToMaybe . CL.unconsEither) src
197
198        prop "works with unfoldEither" $ \(xs, r) ->
199          let src = CL.sourceList xs *> pure r in
200          CL.unfoldEither CL.unconsEither (C.sealConduitT src) `bisimilarTo` (src :: ConduitT () Int Identity Int)
201
202    describe "Monoid instance for Source" $ do
203        it "mappend" $ do
204            x <- runConduitRes $ (CL.sourceList [1..5 :: Int] `mappend` CL.sourceList [6..10]) .| CL.fold (+) 0
205            x `shouldBe` sum [1..10]
206        it "mconcat" $ do
207            x <- runConduitRes $ mconcat
208                [ CL.sourceList [1..5 :: Int]
209                , CL.sourceList [6..10]
210                , CL.sourceList [11..20]
211                ] .| CL.fold (+) 0
212            x `shouldBe` sum [1..20]
213
214    describe "zipping" $ do
215        it "zipping two small lists" $ do
216            res <- runConduitRes $ CI.zipSources (CL.sourceList [1..10]) (CL.sourceList [11..12]) .| CL.consume
217            res @=? zip [1..10 :: Int] [11..12 :: Int]
218
219    describe "zipping sinks" $ do
220        it "take all" $ do
221            res <- runConduitRes $ CL.sourceList [1..10] .| CI.zipSinks CL.consume CL.consume
222            res @=? ([1..10 :: Int], [1..10 :: Int])
223        it "take fewer on left" $ do
224            res <- runConduitRes $ CL.sourceList [1..10] .| CI.zipSinks (CL.take 4) CL.consume
225            res @=? ([1..4 :: Int], [1..10 :: Int])
226        it "take fewer on right" $ do
227            res <- runConduitRes $ CL.sourceList [1..10] .| CI.zipSinks CL.consume (CL.take 4)
228            res @=? ([1..10 :: Int], [1..4 :: Int])
229
230    describe "Monad instance for Sink" $ do
231        it "binding" $ do
232            x <- runConduitRes $ CL.sourceList [1..10] .| do
233                _ <- CL.take 5
234                CL.fold (+) (0 :: Int)
235            x `shouldBe` sum [6..10]
236
237    describe "Applicative instance for Sink" $ do
238        it "<$> and <*>" $ do
239            x <- runConduitRes $ CL.sourceList [1..10] .|
240                (+) <$> pure 5 <*> CL.fold (+) (0 :: Int)
241            x `shouldBe` sum [1..10] + 5
242
243    describe "resumable sources" $ do
244        it "simple" $ do
245            (x, y, z) <- runConduitRes $ do
246                let src1 = CL.sourceList [1..10 :: Int]
247                (src2, x) <- src1 C.$$+ CL.take 5
248                (src3, y) <- src2 C.$$++ CL.fold (+) 0
249                z <- src3 C.$$+- CL.consume
250                return (x, y, z)
251            x `shouldBe` [1..5] :: IO ()
252            y `shouldBe` sum [6..10]
253            z `shouldBe` []
254
255    describe "conduits" $ do
256        it "map, left" $ do
257            x <- runConduitRes $
258                CL.sourceList [1..10]
259                    .| CL.map (* 2)
260                    .| CL.fold (+) 0
261            x `shouldBe` 2 * sum [1..10 :: Int]
262
263        it "map, left >+>" $ do
264            x <- runConduitRes $
265                CI.ConduitT
266                    ((CI.unConduitT (CL.sourceList [1..10]) CI.Done
267                    CI.>+> CI.injectLeftovers ((`CI.unConduitT` CI.Done) $ CL.map (* 2))) >>=)
268                    .| CL.fold (+) 0
269            x `shouldBe` 2 * sum [1..10 :: Int]
270
271        it "map, right" $ do
272            x <- runConduitRes $
273                CL.sourceList [1..10]
274                    .| CL.map (* 2)
275                    .| CL.fold (+) 0
276            x `shouldBe` 2 * sum [1..10 :: Int]
277
278        prop "chunksOf" $ \(positive, xs) ->
279            let p = getPositive positive
280                conduit = CL.sourceList xs .| CL.chunksOf p .| CL.consume
281            in DLS.chunksOf p (xs :: [Int]) == runConduitPure conduit
282
283        it "chunksOf (zero)" $
284            let conduit = return () .| CL.chunksOf 0 .| CL.consume
285            in evaluate (runConduitPure conduit) `shouldThrow` anyException
286
287        it "chunksOf (negative)" $
288            let conduit = return () .| CL.chunksOf (-5) .| CL.consume
289            in evaluate (runConduitPure conduit) `shouldThrow` anyException
290
291        it "groupBy" $ do
292            let input = [1::Int, 1, 2, 3, 3, 3, 4, 5, 5]
293            x <- runConduitRes $ CL.sourceList input
294                    .| CL.groupBy (==)
295                    .| CL.consume
296            x `shouldBe` DL.groupBy (==) input
297
298        it "groupBy (nondup begin/end)" $ do
299            let input = [1::Int, 2, 3, 3, 3, 4, 5]
300            x <- runConduitRes $ CL.sourceList input
301                    .| CL.groupBy (==)
302                    .| CL.consume
303            x `shouldBe` DL.groupBy (==) input
304
305        it "groupOn1" $ do
306            let input = [1::Int, 1, 2, 3, 3, 3, 4, 5, 5]
307            x <- runConduitRes $ CL.sourceList input
308                    .| CL.groupOn1 id
309                    .| CL.consume
310            x `shouldBe` [(1,[1]), (2, []), (3,[3,3]), (4,[]), (5, [5])]
311
312        it "groupOn1 (nondup begin/end)" $ do
313            let input = [1::Int, 2, 3, 3, 3, 4, 5]
314            x <- runConduitRes $ CL.sourceList input
315                    .| CL.groupOn1 id
316                    .| CL.consume
317            x `shouldBe` [(1,[]), (2, []), (3,[3,3]), (4,[]), (5, [])]
318
319
320        it "mapMaybe" $ do
321            let input = [Just (1::Int), Nothing, Just 2, Nothing, Just 3]
322            x <- runConduitRes $ CL.sourceList input
323                    .| CL.mapMaybe ((+2) <$>)
324                    .| CL.consume
325            x `shouldBe` [3, 4, 5]
326
327        it "mapMaybeM" $ do
328            let input = [Just (1::Int), Nothing, Just 2, Nothing, Just 3]
329            x <- runConduitRes $ CL.sourceList input
330                    .| CL.mapMaybeM (return . ((+2) <$>))
331                    .| CL.consume
332            x `shouldBe` [3, 4, 5]
333
334        it "catMaybes" $ do
335            let input = [Just (1::Int), Nothing, Just 2, Nothing, Just 3]
336            x <- runConduitRes $ CL.sourceList input
337                    .| CL.catMaybes
338                    .| CL.consume
339            x `shouldBe` [1, 2, 3]
340
341        it "concatMap" $ do
342            let input = [1, 11, 21]
343            x <- runConduitRes $ CL.sourceList input
344                    .| CL.concatMap (\i -> enumFromTo i (i + 9))
345                    .| CL.fold (+) (0 :: Int)
346            x `shouldBe` sum [1..30]
347
348        it "bind together" $ do
349            let conduit = CL.map (+ 5) .| CL.map (* 2)
350            x <- runConduitRes $ CL.sourceList [1..10] .| conduit .| CL.fold (+) 0
351            x `shouldBe` sum (map (* 2) $ map (+ 5) [1..10 :: Int])
352
353#if !FAST
354    describe "isolate" $ do
355        it "bound to resumable source" $ do
356            (x, y) <- runConduitRes $ do
357                let src1 = CL.sourceList [1..10 :: Int]
358                (src2, x) <- src1 .| CL.isolate 5 C.$$+ CL.consume
359                y <- src2 C.$$+- CL.consume
360                return (x, y)
361            x `shouldBe` [1..5]
362            y `shouldBe` []
363
364        it "bound to sink, non-resumable" $ do
365            (x, y) <- runConduitRes $ do
366                CL.sourceList [1..10 :: Int] .| do
367                    x <- CL.isolate 5 .| CL.consume
368                    y <- CL.consume
369                    return (x, y)
370            x `shouldBe` [1..5]
371            y `shouldBe` [6..10]
372
373        it "bound to sink, resumable" $ do
374            (x, y) <- runConduitRes $ do
375                let src1 = CL.sourceList [1..10 :: Int]
376                (src2, x) <- src1 C.$$+ CL.isolate 5 .| CL.consume
377                y <- src2 C.$$+- CL.consume
378                return (x, y)
379            x `shouldBe` [1..5]
380            y `shouldBe` [6..10]
381
382        it "consumes all data" $ do
383            x <- runConduitRes $ CL.sourceList [1..10 :: Int] .| do
384                CL.isolate 5 .| CL.sinkNull
385                CL.consume
386            x `shouldBe` [6..10]
387
388    describe "sequence" $ do
389        it "simple sink" $ do
390            let sumSink = do
391                    ma <- CL.head
392                    case ma of
393                        Nothing -> return 0
394                        Just a  -> (+a) . fromMaybe 0 <$> CL.head
395
396            res <- runConduitRes $ CL.sourceList [1..11 :: Int]
397                             .| CL.sequence sumSink
398                             .| CL.consume
399            res `shouldBe` [3, 7, 11, 15, 19, 11]
400
401        it "sink with unpull behaviour" $ do
402            let sumSink = do
403                    ma <- CL.head
404                    case ma of
405                        Nothing -> return 0
406                        Just a  -> (+a) . fromMaybe 0 <$> CL.peek
407
408            res <- runConduitRes $ CL.sourceList [1..11 :: Int]
409                             .| CL.sequence sumSink
410                             .| CL.consume
411            res `shouldBe` [3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 11]
412
413#endif
414
415    describe "peek" $ do
416        it "works" $ do
417            (a, b) <- runConduitRes $ CL.sourceList [1..10 :: Int] .| do
418                a <- CL.peek
419                b <- CL.consume
420                return (a, b)
421            (a, b) `shouldBe` (Just 1, [1..10])
422
423    describe "unbuffering" $ do
424        it "works" $ do
425            x <- runConduitRes $ do
426                let src1 = CL.sourceList [1..10 :: Int]
427                (src2, ()) <- src1 C.$$+ CL.drop 5
428                src2 C.$$+- CL.fold (+) 0
429            x `shouldBe` sum [6..10]
430
431    describe "operators" $ do
432        it "only use .|" $
433            runConduitPure
434            (    CL.sourceList [1..10 :: Int]
435              .| CL.map (+ 1)
436             .|  CL.map (subtract 1)
437             .|  CL.mapM (return . (* 2))
438             .|  CL.map (`div` 2)
439             .|  CL.fold (+) 0
440            ) `shouldBe` sum [1..10]
441        it "only use =$" $
442            runConduitPure
443            (    CL.sourceList [1..10 :: Int]
444              .| CL.map (+ 1)
445              .| CL.map (subtract 1)
446              .| CL.map (* 2)
447              .| CL.map (`div` 2)
448              .| CL.fold (+) 0
449            ) `shouldBe` sum [1..10]
450        it "chain" $ do
451            x <-    runConduit
452                 $ CL.sourceList [1..10 :: Int]
453                .| CL.map (+ 1)
454                .| CL.map (+ 1)
455                .| CL.map (+ 1)
456                .| CL.map (subtract 3)
457                .| CL.map (* 2)
458                .| CL.map (`div` 2)
459                .| CL.map (+ 1)
460                .| CL.map (+ 1)
461                .| CL.map (+ 1)
462                .| CL.map (subtract 3)
463                .| CL.fold (+) 0
464            x `shouldBe` sum [1..10]
465
466
467    describe "termination" $ do
468        it "terminates early" $ do
469            let src = forever $ C.yield ()
470            x <- runConduit $ src .| CL.head
471            x `shouldBe` Just ()
472        it "bracket" $ do
473            ref <- I.newIORef (0 :: Int)
474            let src = C.bracketP
475                    (I.modifyIORef ref (+ 1))
476                    (\() -> I.modifyIORef ref (+ 2))
477                    (\() -> forever $ C.yield (1 :: Int))
478            val <- runConduitRes $ src .| CL.isolate 10 .| CL.fold (+) 0
479            val `shouldBe` 10
480            i <- I.readIORef ref
481            i `shouldBe` 3
482        it "bracket skipped if not needed" $ do
483            ref <- I.newIORef (0 :: Int)
484            let src = C.bracketP
485                    (I.modifyIORef ref (+ 1))
486                    (\() -> I.modifyIORef ref (+ 2))
487                    (\() -> forever $ C.yield (1 :: Int))
488                src' = CL.sourceList $ repeat 1
489            val <- runConduitRes $ (src' >> src) .| CL.isolate 10 .| CL.fold (+) 0
490            val `shouldBe` 10
491            i <- I.readIORef ref
492            i `shouldBe` 0
493        it "bracket + toPipe" $ do
494            ref <- I.newIORef (0 :: Int)
495            let src = C.bracketP
496                    (I.modifyIORef ref (+ 1))
497                    (\() -> I.modifyIORef ref (+ 2))
498                    (\() -> forever $ C.yield (1 :: Int))
499            val <- runConduitRes $ src .| CL.isolate 10 .| CL.fold (+) 0
500            val `shouldBe` 10
501            i <- I.readIORef ref
502            i `shouldBe` 3
503        it "bracket skipped if not needed" $ do
504            ref <- I.newIORef (0 :: Int)
505            let src = C.bracketP
506                    (I.modifyIORef ref (+ 1))
507                    (\() -> I.modifyIORef ref (+ 2))
508                    (\() -> forever $ C.yield (1 :: Int))
509                src' = CL.sourceList $ repeat 1
510            val <- runConduitRes $ (src' >> src) .| CL.isolate 10 .| CL.fold (+) 0
511            val `shouldBe` 10
512            i <- I.readIORef ref
513            i `shouldBe` 0
514
515    describe "invariant violations" $ do
516        it "leftovers without input" $ do
517            ref <- I.newIORef []
518            let add x = I.modifyIORef ref (x:)
519                adder' = CI.NeedInput (\a -> liftIO (add a) >> adder') return
520                adder = CI.ConduitT (adder' >>=)
521                residue x = CI.ConduitT $ \rest -> CI.Leftover (rest ()) x
522
523            _ <- runConduit $ C.yield 1 .| adder
524            x <- I.readIORef ref
525            x `shouldBe` [1 :: Int]
526            I.writeIORef ref []
527
528            _ <- runConduit $ C.yield 1 .| ((residue 2 >> residue 3) >> adder)
529            y <- I.readIORef ref
530            y `shouldBe` [1, 2, 3]
531            I.writeIORef ref []
532
533            _ <- runConduit $ C.yield 1 .| (residue 2 >> (residue 3 >> adder))
534            z <- I.readIORef ref
535            z `shouldBe` [1, 2, 3]
536            I.writeIORef ref []
537
538    describe "sane yield/await'" $ do
539        it' "yield terminates" $ do
540            let is = [1..10] ++ undefined
541                src [] = return ()
542                src (x:xs) = C.yield x >> src xs
543            x <- runConduit $ src is .| CL.take 10
544            x `shouldBe` [1..10 :: Int]
545        it' "yield terminates (2)" $ do
546            let is = [1..10] ++ undefined
547            x <- runConduit $ mapM_ C.yield is .| CL.take 10
548            x `shouldBe` [1..10 :: Int]
549
550    describe "upstream results" $ do
551        it' "works" $ do
552            let foldUp :: (b -> a -> b) -> b -> CI.Pipe l a Void u IO (u, b)
553                foldUp f b = CI.awaitE >>= either (\u -> return (u, b)) (\a -> let b' = f b a in b' `seq` foldUp f b')
554                passFold :: (b -> a -> b) -> b -> CI.Pipe l a a () IO b
555                passFold f b = CI.await >>= maybe (return b) (\a -> let b' = f b a in b' `seq` CI.yield a >> passFold f b')
556            (x, y) <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> passFold (+) 0 CI.>+>  foldUp (*) 1
557            (x, y) `shouldBe` (sum [1..10], product [1..10])
558
559    describe "input/output mapping" $ do
560        it' "mapOutput" $ do
561            x <- runConduit $ C.mapOutput (+ 1) (CL.sourceList [1..10 :: Int]) .| CL.fold (+) 0
562            x `shouldBe` sum [2..11]
563        it' "mapOutputMaybe" $ do
564            x <- runConduit $ C.mapOutputMaybe (\i -> if even i then Just i else Nothing) (CL.sourceList [1..10 :: Int]) .| CL.fold (+) 0
565            x `shouldBe` sum [2, 4..10]
566        it' "mapInput" $ do
567            xyz <- runConduit $ (CL.sourceList $ map show [1..10 :: Int]) .| do
568                (x, y) <- C.mapInput read (Just . show) $ ((do
569                    x <- CL.isolate 5 .| CL.fold (+) 0
570                    y <- CL.peek
571                    return (x :: Int, y :: Maybe Int)) :: ConduitT Int Void IO (Int, Maybe Int))
572                z <- CL.consume
573                return (x, y, concat z)
574
575            xyz `shouldBe` (sum [1..5], Just 6, "678910")
576
577    describe "left/right identity" $ do
578        it' "left identity" $ do
579            x <- runConduit $ CL.sourceList [1..10 :: Int] .| CI.ConduitT (CI.idP >>=) .| CL.fold (+) 0
580            y <- runConduit $ CL.sourceList [1..10 :: Int] .| CL.fold (+) 0
581            x `shouldBe` y
582        it' "right identity" $ do
583            x <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> (CI.injectLeftovers $ (`CI.unConduitT` CI.Done) $ CL.fold (+) 0) CI.>+> CI.idP
584            y <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> (CI.injectLeftovers $ (`CI.unConduitT` CI.Done) $ CL.fold (+) 0)
585            x `shouldBe` y
586
587    describe "generalizing" $ do
588        it' "works" $ do
589            x <-     CI.runPipe
590                   $ CI.sourceToPipe  (CL.sourceList [1..10 :: Int])
591               CI.>+> CI.conduitToPipe (CL.map (+ 1))
592               CI.>+> CI.sinkToPipe    (CL.fold (+) 0)
593            x `shouldBe` sum [2..11]
594
595    describe "withUpstream" $ do
596        it' "works" $ do
597            let src = mapM_ CI.yield [1..10 :: Int] >> return True
598                fold f =
599                    loop
600                  where
601                    loop accum =
602                        CI.await >>= maybe (return accum) go
603                      where
604                        go a =
605                            let accum' = f accum a
606                             in accum' `seq` loop accum'
607                sink = CI.withUpstream $ fold (+) 0
608            res <- CI.runPipe $ src CI.>+> sink
609            res `shouldBe` (True, sum [1..10])
610
611    describe "iterate" $ do
612        it' "works" $ do
613            res <- runConduit $ CL.iterate (+ 1) (1 :: Int) .| CL.isolate 10 .| CL.fold (+) 0
614            res `shouldBe` sum [1..10]
615
616    prop "replicate" $ \cnt' -> do
617        let cnt = min cnt' 100
618        res <- runConduit $ CL.replicate cnt () .| CL.consume
619        res `shouldBe` replicate cnt ()
620
621    prop "replicateM" $ \cnt' -> do
622        ref <- I.newIORef 0
623        let cnt = min cnt' 100
624        res <- runConduit $ CL.replicateM cnt (I.modifyIORef ref (+ 1)) .| CL.consume
625        res `shouldBe` replicate cnt ()
626
627        ref' <- I.readIORef ref
628        ref' `shouldBe` (if cnt' <= 0 then 0 else cnt)
629
630    describe "injectLeftovers" $ do
631        it "works" $ do
632            let src = mapM_ CI.yield [1..10 :: Int]
633                conduit = CI.injectLeftovers $ (`CI.unConduitT` CI.Done) $ C.awaitForever $ \i -> do
634                    js <- CL.take 2
635                    mapM_ C.leftover $ reverse js
636                    C.yield i
637            res <- runConduit $ CI.ConduitT ((src CI.>+> CI.injectLeftovers conduit) >>=) .| CL.consume
638            res `shouldBe` [1..10]
639    describe "monad transformer laws" $ do
640        it "transPipe" $ do
641            let source = CL.sourceList $ replicate 10 ()
642            let tell' x = tell [x :: Int]
643
644            let replaceNum1 = C.awaitForever $ \() -> do
645                    i <- lift get
646                    lift $ (put $ i + 1) >> (get >>= lift . tell')
647                    C.yield i
648
649            let replaceNum2 = C.awaitForever $ \() -> do
650                    i <- lift get
651                    lift $ put $ i + 1
652                    lift $ get >>= lift . tell'
653                    C.yield i
654
655            x <- runWriterT $ runConduit $ source .| C.transPipe (`evalStateT` 1) replaceNum1 .| CL.consume
656            y <- runWriterT $ runConduit $ source .| C.transPipe (`evalStateT` 1) replaceNum2 .| CL.consume
657            x `shouldBe` y
658    describe "iterM" $ do
659        prop "behavior" $ \l -> monadicIO $ do
660            let counter ref = CL.iterM (const $ liftIO $ M.modifyMVar_ ref (\i -> return $! i + 1))
661            v <- run $ do
662                ref <- M.newMVar 0
663                runConduit $ CL.sourceList l .| counter ref .| CL.mapM_ (const $ return ())
664                M.readMVar ref
665
666            assert $ v == length (l :: [Int])
667        prop "mapM_ equivalence" $ \l -> monadicIO $ do
668            let runTest h = run $ do
669                    ref <- M.newMVar (0 :: Int)
670                    let f = action ref
671                    s <- runConduit $ CL.sourceList (l :: [Int]) .| h f .| CL.fold (+) 0
672                    c <- M.readMVar ref
673
674                    return (c, s)
675
676                action ref = const $ liftIO $ M.modifyMVar_ ref (\i -> return $! i + 1)
677            (c1, s1) <- runTest CL.iterM
678            (c2, s2) <- runTest (\f -> CL.mapM (\a -> f a >>= \() -> return a))
679
680            assert $ c1 == c2
681            assert $ s1 == s2
682
683    describe "generalizing" $ do
684        it "works" $ do
685            let src :: Int -> ConduitT () Int IO ()
686                src i = CL.sourceList [1..i]
687                sink :: ConduitT Int Void IO Int
688                sink = CL.fold (+) 0
689            res <- runConduit $ C.yield 10 .| C.awaitForever (C.toProducer . src) .| (C.toConsumer sink >>= C.yield) .| C.await
690            res `shouldBe` Just (sum [1..10])
691
692    describe "mergeSource" $ do
693        it "works" $ do
694            let src :: ConduitT () String IO ()
695                src = CL.sourceList ["A", "B", "C"]
696                withIndex :: ConduitT String (Integer, String) IO ()
697                withIndex = CI.mergeSource (CL.sourceList [1..])
698            output <- runConduit $ src .| withIndex .| CL.consume
699            output `shouldBe` [(1, "A"), (2, "B"), (3, "C")]
700        it "does stop processing when the source exhausted" $ do
701            let src :: ConduitT () Integer IO ()
702                src = CL.sourceList [1..]
703                withShortAlphaIndex :: ConduitT Integer (String, Integer) IO ()
704                withShortAlphaIndex = CI.mergeSource (CL.sourceList ["A", "B", "C"])
705            output <- runConduit $ src .| withShortAlphaIndex .| CL.consume
706            output `shouldBe` [("A", 1), ("B", 2), ("C", 3)]
707
708    describe "passthroughSink" $ do
709        it "works" $ do
710            ref <- I.newIORef (-1)
711            let sink = CL.fold (+) (0 :: Int)
712                conduit = C.passthroughSink sink (I.writeIORef ref)
713                input = [1..10]
714            output <- runConduit $ mapM_ C.yield input .| conduit .| CL.consume
715            output `shouldBe` input
716            x <- I.readIORef ref
717            x `shouldBe` sum input
718        it "does nothing when downstream does nothing" $ do
719            ref <- I.newIORef (-1)
720            let sink = CL.fold (+) (0 :: Int)
721                conduit = C.passthroughSink sink (I.writeIORef ref)
722                input = [undefined]
723            runConduit $ mapM_ C.yield input .| conduit .| return ()
724            x <- I.readIORef ref
725            x `shouldBe` (-1)
726
727        it "handles the last input correctly #304" $ do
728            ref <- I.newIORef (-1 :: Int)
729            let sink = CL.mapM_ (I.writeIORef ref)
730                conduit = C.passthroughSink sink (const (return ()))
731            res <- runConduit $ mapM_ C.yield [1..] .| conduit .| CL.take 5
732            res `shouldBe` [1..5]
733            x <- I.readIORef ref
734            x `shouldBe` 5
735
736    describe "mtl instances" $ do
737        it "ErrorT" $ do
738            let src = flip catchError (const $ C.yield 4) $ do
739                    lift $ return ()
740                    C.yield 1
741                    lift $ return ()
742                    C.yield 2
743                    lift $ return ()
744                    () <- throwError DummyError
745                    lift $ return ()
746                    C.yield 3
747                    lift $ return ()
748            runConduit (src .| CL.consume) `shouldBe` Right [1, 2, 4 :: Int]
749        describe "WriterT" $
750            it "pass" $
751                let writer = W.pass $ do
752                      W.tell [1 :: Int]
753                      pure ((), (2:))
754                in execWriter (runConduit writer) `shouldBe` [2, 1]
755
756    describe "Data.Conduit.Lift" $ do
757        it "execStateC" $ do
758            let sink = C.execStateLC 0 $ CL.mapM_ $ modify . (+)
759                src = mapM_ C.yield [1..10 :: Int]
760            res <- runConduit $ src .| sink
761            res `shouldBe` sum [1..10]
762
763        it "execWriterC" $ do
764            let sink = C.execWriterLC $ CL.mapM_ $ tell . return
765                src = mapM_ C.yield [1..10 :: Int]
766            res <- runConduit $ src .| sink
767            res `shouldBe` [1..10]
768
769        it "runExceptC" $ do
770            let sink = C.runExceptC $ do
771                    x <- C.catchExceptC (lift $ throwError "foo") return
772                    return $ x ++ "bar"
773            res <- runConduit $ return () .| sink
774            res `shouldBe` Right ("foobar" :: String)
775
776        it "runMaybeC" $ do
777            let src = void $ C.runMaybeC $ do
778                    C.yield 1
779                    () <- lift $ MaybeT $ return Nothing
780                    C.yield 2
781                sink = CL.consume
782            res <- runConduit $ src .| sink
783            res `shouldBe` [1 :: Int]
784
785    describe "sequenceSources" $ do
786        it "works" $ do
787            let src1 = mapM_ C.yield [1, 2, 3 :: Int]
788                src2 = mapM_ C.yield [3, 2, 1]
789                src3 = mapM_ C.yield $ repeat 2
790                srcs = C.sequenceSources $ Map.fromList
791                    [ (1 :: Int, src1)
792                    , (2, src2)
793                    , (3, src3)
794                    ]
795            res <- runConduit $ srcs .| CL.consume
796            res `shouldBe`
797                [ Map.fromList [(1, 1), (2, 3), (3, 2)]
798                , Map.fromList [(1, 2), (2, 2), (3, 2)]
799                , Map.fromList [(1, 3), (2, 1), (3, 2)]
800                ]
801    describe "zipSink" $ do
802        it "zip equal-sized" $ do
803            x <- runConduitRes $
804                    CL.sourceList [1..100] .|
805                    C.sequenceSinks [ CL.fold (+) 0,
806                                   (`mod` 101) <$> CL.fold (*) 1 ]
807            x `shouldBe` [5050, 100 :: Integer]
808
809        it "zip distinct sizes" $ do
810            let sink = C.getZipSink $
811                        (*) <$> C.ZipSink (CL.fold (+) 0)
812                            <*> C.ZipSink (Data.Maybe.fromJust <$> C.await)
813            x <- runConduitRes $ CL.sourceList [100,99..1] .| sink
814            x `shouldBe` (505000 :: Integer)
815
816    describe "upstream results" $ do
817        it "fuseBoth" $ do
818            let upstream = do
819                    C.yield ("hello" :: String)
820                    CL.isolate 5 .| CL.fold (+) 0
821                downstream = C.fuseBoth upstream CL.consume
822            res <- runConduit $ CL.sourceList [1..10 :: Int] .| do
823                (x, y) <- downstream
824                z <- CL.consume
825                return (x, y, z)
826            res `shouldBe` (sum [1..5], ["hello"], [6..10])
827
828        it "fuseBothMaybe with no result" $ do
829            let src = mapM_ C.yield [1 :: Int ..]
830                sink = CL.isolate 5 .| CL.fold (+) 0
831            (mup, down) <- runConduit $ C.fuseBothMaybe src sink
832            mup `shouldBe` (Nothing :: Maybe ())
833            down `shouldBe` sum [1..5]
834
835        it "fuseBothMaybe with result" $ do
836            let src = mapM_ C.yield [1 :: Int .. 5]
837                sink = CL.isolate 6 .| CL.fold (+) 0
838            (mup, down) <- runConduit $ C.fuseBothMaybe src sink
839            mup `shouldBe` Just ()
840            down `shouldBe` sum [1..5]
841
842        it "fuseBothMaybe with almost result" $ do
843            let src = mapM_ C.yield [1 :: Int .. 5]
844                sink = CL.isolate 5 .| CL.fold (+) 0
845            (mup, down) <- runConduit $ C.fuseBothMaybe src sink
846            mup `shouldBe` (Nothing :: Maybe ())
847            down `shouldBe` sum [1..5]
848
849    describe "catching exceptions" $ do
850        it "works" $ do
851            let src = do
852                    C.yield 1
853                    () <- Catch.throwM DummyError
854                    C.yield 2
855                src' = do
856                    CI.catchC src (\DummyError -> C.yield (3 :: Int))
857            res <- runConduit $ src' .| CL.consume
858            res `shouldBe` [1, 3]
859
860    describe "sourceToList" $ do
861        it "works lazily in Identity" $ do
862            let src = C.yield 1 >> C.yield 2 >> throw DummyError
863            let res = runIdentity $ C.sourceToList src
864            take 2 res `shouldBe` [1, 2 :: Int]
865        it "is not lazy in IO" $ do
866            let src = C.yield 1 >> C.yield (2 :: Int) >> throw DummyError
867            C.sourceToList src `shouldThrow` (==DummyError)
868
869    ZipConduit.spec
870    Stream.spec
871
872it' :: String -> IO () -> Spec
873it' = it
874
875data DummyError = DummyError
876    deriving (Show, Eq, Typeable)
877instance Catch.Exception DummyError
878