1{-# LANGUAGE OverloadedStrings #-} 2{-# LANGUAGE CPP #-} 3{-# LANGUAGE DeriveDataTypeable #-} 4{-# LANGUAGE FlexibleInstances #-} 5{-# OPTIONS_GHC -fno-warn-orphans #-} 6import Test.Hspec 7import Test.Hspec.QuickCheck (prop) 8import Test.QuickCheck (getPositive) 9import Test.QuickCheck.Monadic (assert, monadicIO, run) 10 11import Data.Conduit (runConduit, (.|), ConduitT, runConduitPure, runConduitRes) 12import qualified Data.Conduit as C 13import qualified Data.Conduit.Lift as C 14import qualified Data.Conduit.Internal as CI 15import qualified Data.Conduit.List as CL 16import Data.Typeable (Typeable) 17import Control.Exception (throw, evaluate) 18import Control.Monad.Trans.Resource (runResourceT) 19import Control.Monad.Trans.Maybe (MaybeT (MaybeT)) 20import Control.Monad.State.Strict (modify) 21import Data.Maybe (fromMaybe,catMaybes,fromJust) 22import qualified Data.List as DL 23import qualified Data.List.Split as DLS (chunksOf) 24import Control.Monad.ST (runST) 25import Data.Monoid 26import qualified Data.IORef as I 27import Data.Tuple (swap) 28import Control.Monad.Trans.Resource (allocate, resourceForkIO) 29import Control.Concurrent (threadDelay, killThread) 30import Control.Monad.IO.Class (liftIO) 31import Control.Monad.Trans.Class (lift) 32import Control.Monad.Trans.Writer (execWriter, tell, runWriterT) 33import Control.Monad.Trans.State (evalStateT, get, put) 34import qualified Control.Monad.Writer as W 35import Control.Applicative (pure, (<$>), (<*>)) 36import qualified Control.Monad.Catch as Catch 37import Data.Functor.Identity (Identity,runIdentity) 38import Control.Monad (forever, void) 39import Data.Void (Void) 40import qualified Control.Concurrent.MVar as M 41import Control.Monad.Except (catchError, throwError) 42import qualified Data.Map as Map 43import qualified Data.Conduit.Extra.ZipConduitSpec as ZipConduit 44import qualified Data.Conduit.StreamSpec as Stream 45import qualified Spec 46 47(@=?) :: (Eq a, Show a) => a -> a -> IO () 48(@=?) = flip shouldBe 49 50-- Quickcheck property for testing equivalence of list processing 51-- functions and their conduit counterparts 52equivToList :: Eq b => ([a] -> [b]) -> ConduitT a b Identity () -> [a] -> Bool 53equivToList f conduit xs = 54 f xs == runConduitPure (CL.sourceList xs .| conduit .| CL.consume) 55 56-- | Check that two conduits produce the same outputs and return the same result. 57bisimilarTo :: (Eq a, Eq r) => ConduitT () a Identity r -> ConduitT () a Identity r -> Bool 58left `bisimilarTo` right = 59 C.runConduitPure (toListRes left) == C.runConduitPure (toListRes right) 60 where 61 -- | Sink a conduit into a list and return it alongside the result. 62 -- So it is, essentially, @sinkList@ plus result. 63 toListRes :: Monad m => ConduitT () a m r -> ConduitT () Void m ([a], r) 64 toListRes cond = swap <$> C.fuseBoth cond CL.consume 65 66 67main :: IO () 68main = hspec $ do 69 describe "Combinators" Spec.spec 70 describe "data loss rules" $ do 71 it "consumes the source to quickly" $ do 72 x <- runConduitRes $ CL.sourceList [1..10 :: Int] .| do 73 strings <- CL.map show .| CL.take 5 74 liftIO $ putStr $ unlines strings 75 CL.fold (+) 0 76 40 `shouldBe` x 77 78 it "correctly consumes a chunked resource" $ do 79 x <- runConduitRes $ (CL.sourceList [1..5 :: Int] `mappend` CL.sourceList [6..10]) .| do 80 strings <- CL.map show .| CL.take 5 81 liftIO $ putStr $ unlines strings 82 CL.fold (+) 0 83 40 `shouldBe` x 84 85 describe "filter" $ do 86 it "even" $ do 87 x <- runConduitRes $ CL.sourceList [1..10] .| CL.filter even .| CL.consume 88 x `shouldBe` filter even [1..10 :: Int] 89 90 prop "concat" $ equivToList (concat :: [[Int]]->[Int]) CL.concat 91 92 describe "mapFoldable" $ do 93 prop "list" $ 94 equivToList (concatMap (:[]) :: [Int]->[Int]) (CL.mapFoldable (:[])) 95 let f x = if odd x then Just x else Nothing 96 prop "Maybe" $ 97 equivToList (catMaybes . map f :: [Int]->[Int]) (CL.mapFoldable f) 98 99 prop "scan" $ equivToList (tail . scanl (+) 0 :: [Int]->[Int]) (void $ CL.scan (+) 0) 100 101 -- mapFoldableM and scanlM are fully polymorphic in type of monad 102 -- so it suffice to check only with Identity. 103 describe "mapFoldableM" $ do 104 prop "list" $ 105 equivToList (concatMap (:[]) :: [Int]->[Int]) (CL.mapFoldableM (return . (:[]))) 106 let f x = if odd x then Just x else Nothing 107 prop "Maybe" $ 108 equivToList (catMaybes . map f :: [Int]->[Int]) (CL.mapFoldableM (return . f)) 109 110 prop "scanM" $ equivToList (tail . scanl (+) 0) (void $ CL.scanM (\a s -> return $ a + s) (0 :: Int)) 111 112 describe "ResourceT" $ do 113 it "resourceForkIO" $ do 114 counter <- I.newIORef 0 115 let w = allocate 116 (I.atomicModifyIORef counter $ \i -> 117 (i + 1, ())) 118 (const $ I.atomicModifyIORef counter $ \i -> 119 (i - 1, ())) 120 runResourceT $ do 121 _ <- w 122 _ <- resourceForkIO $ return () 123 _ <- resourceForkIO $ return () 124 sequence_ $ replicate 1000 $ do 125 tid <- resourceForkIO $ return () 126 liftIO $ killThread tid 127 _ <- resourceForkIO $ return () 128 _ <- resourceForkIO $ return () 129 return () 130 131 -- give enough of a chance to the cleanup code to finish 132 threadDelay 1000 133 res <- I.readIORef counter 134 res `shouldBe` (0 :: Int) 135 136 describe "sum" $ do 137 it "works for 1..10" $ do 138 x <- runConduitRes $ CL.sourceList [1..10] .| CL.fold (+) (0 :: Int) 139 x `shouldBe` sum [1..10] 140 prop "is idempotent" $ \list -> 141 (runST $ runConduit $ CL.sourceList list .| CL.fold (+) (0 :: Int)) 142 == sum list 143 144 describe "foldMap" $ do 145 it "sums 1..10" $ do 146 Sum x <- runConduit $ CL.sourceList [1..(10 :: Int)] .| CL.foldMap Sum 147 x `shouldBe` sum [1..10] 148 149 it "preserves order" $ do 150 x <- runConduit $ CL.sourceList [[4],[2],[3],[1]] .| CL.foldMap (++[(9 :: Int)]) 151 x `shouldBe` [4,9,2,9,3,9,1,9] 152 153 describe "foldMapM" $ do 154 it "sums 1..10" $ do 155 Sum x <- runConduit $ CL.sourceList [1..(10 :: Int)] .| CL.foldMapM (return . Sum) 156 x `shouldBe` sum [1..10] 157 158 it "preserves order" $ do 159 x <- runConduit $ CL.sourceList [[4],[2],[3],[1]] .| CL.foldMapM (return . (++[(9 :: Int)])) 160 x `shouldBe` [4,9,2,9,3,9,1,9] 161 162 describe "unfold" $ do 163 it "works" $ do 164 let f 0 = Nothing 165 f i = Just (show i, i - 1) 166 seed = 10 :: Int 167 x <- runConduit $ CL.unfold f seed .| CL.consume 168 let y = DL.unfoldr f seed 169 x `shouldBe` y 170 171 describe "unfoldM" $ do 172 it "works" $ do 173 let f 0 = Nothing 174 f i = Just (show i, i - 1) 175 seed = 10 :: Int 176 x <- runConduit $ CL.unfoldM (return . f) seed .| CL.consume 177 let y = DL.unfoldr f seed 178 x `shouldBe` y 179 180 describe "uncons" $ do 181 prop "folds to list" $ \xs -> 182 let src = C.sealConduitT $ CL.sourceList xs in 183 (xs :: [Int]) == DL.unfoldr CL.uncons src 184 185 prop "works with unfold" $ \xs -> 186 let src = CL.sourceList xs in 187 CL.unfold CL.uncons (C.sealConduitT src) `bisimilarTo` (src :: ConduitT () Int Identity ()) 188 189 describe "unconsEither" $ do 190 let 191 eitherToMaybe :: Either l a -> Maybe a 192 eitherToMaybe (Left _) = Nothing 193 eitherToMaybe (Right a) = Just a 194 prop "folds outputs to list" $ \xs -> 195 let src = C.sealConduitT $ CL.sourceList xs in 196 (xs :: [Int]) == DL.unfoldr (eitherToMaybe . CL.unconsEither) src 197 198 prop "works with unfoldEither" $ \(xs, r) -> 199 let src = CL.sourceList xs *> pure r in 200 CL.unfoldEither CL.unconsEither (C.sealConduitT src) `bisimilarTo` (src :: ConduitT () Int Identity Int) 201 202 describe "Monoid instance for Source" $ do 203 it "mappend" $ do 204 x <- runConduitRes $ (CL.sourceList [1..5 :: Int] `mappend` CL.sourceList [6..10]) .| CL.fold (+) 0 205 x `shouldBe` sum [1..10] 206 it "mconcat" $ do 207 x <- runConduitRes $ mconcat 208 [ CL.sourceList [1..5 :: Int] 209 , CL.sourceList [6..10] 210 , CL.sourceList [11..20] 211 ] .| CL.fold (+) 0 212 x `shouldBe` sum [1..20] 213 214 describe "zipping" $ do 215 it "zipping two small lists" $ do 216 res <- runConduitRes $ CI.zipSources (CL.sourceList [1..10]) (CL.sourceList [11..12]) .| CL.consume 217 res @=? zip [1..10 :: Int] [11..12 :: Int] 218 219 describe "zipping sinks" $ do 220 it "take all" $ do 221 res <- runConduitRes $ CL.sourceList [1..10] .| CI.zipSinks CL.consume CL.consume 222 res @=? ([1..10 :: Int], [1..10 :: Int]) 223 it "take fewer on left" $ do 224 res <- runConduitRes $ CL.sourceList [1..10] .| CI.zipSinks (CL.take 4) CL.consume 225 res @=? ([1..4 :: Int], [1..10 :: Int]) 226 it "take fewer on right" $ do 227 res <- runConduitRes $ CL.sourceList [1..10] .| CI.zipSinks CL.consume (CL.take 4) 228 res @=? ([1..10 :: Int], [1..4 :: Int]) 229 230 describe "Monad instance for Sink" $ do 231 it "binding" $ do 232 x <- runConduitRes $ CL.sourceList [1..10] .| do 233 _ <- CL.take 5 234 CL.fold (+) (0 :: Int) 235 x `shouldBe` sum [6..10] 236 237 describe "Applicative instance for Sink" $ do 238 it "<$> and <*>" $ do 239 x <- runConduitRes $ CL.sourceList [1..10] .| 240 (+) <$> pure 5 <*> CL.fold (+) (0 :: Int) 241 x `shouldBe` sum [1..10] + 5 242 243 describe "resumable sources" $ do 244 it "simple" $ do 245 (x, y, z) <- runConduitRes $ do 246 let src1 = CL.sourceList [1..10 :: Int] 247 (src2, x) <- src1 C.$$+ CL.take 5 248 (src3, y) <- src2 C.$$++ CL.fold (+) 0 249 z <- src3 C.$$+- CL.consume 250 return (x, y, z) 251 x `shouldBe` [1..5] :: IO () 252 y `shouldBe` sum [6..10] 253 z `shouldBe` [] 254 255 describe "conduits" $ do 256 it "map, left" $ do 257 x <- runConduitRes $ 258 CL.sourceList [1..10] 259 .| CL.map (* 2) 260 .| CL.fold (+) 0 261 x `shouldBe` 2 * sum [1..10 :: Int] 262 263 it "map, left >+>" $ do 264 x <- runConduitRes $ 265 CI.ConduitT 266 ((CI.unConduitT (CL.sourceList [1..10]) CI.Done 267 CI.>+> CI.injectLeftovers ((`CI.unConduitT` CI.Done) $ CL.map (* 2))) >>=) 268 .| CL.fold (+) 0 269 x `shouldBe` 2 * sum [1..10 :: Int] 270 271 it "map, right" $ do 272 x <- runConduitRes $ 273 CL.sourceList [1..10] 274 .| CL.map (* 2) 275 .| CL.fold (+) 0 276 x `shouldBe` 2 * sum [1..10 :: Int] 277 278 prop "chunksOf" $ \(positive, xs) -> 279 let p = getPositive positive 280 conduit = CL.sourceList xs .| CL.chunksOf p .| CL.consume 281 in DLS.chunksOf p (xs :: [Int]) == runConduitPure conduit 282 283 it "chunksOf (zero)" $ 284 let conduit = return () .| CL.chunksOf 0 .| CL.consume 285 in evaluate (runConduitPure conduit) `shouldThrow` anyException 286 287 it "chunksOf (negative)" $ 288 let conduit = return () .| CL.chunksOf (-5) .| CL.consume 289 in evaluate (runConduitPure conduit) `shouldThrow` anyException 290 291 it "groupBy" $ do 292 let input = [1::Int, 1, 2, 3, 3, 3, 4, 5, 5] 293 x <- runConduitRes $ CL.sourceList input 294 .| CL.groupBy (==) 295 .| CL.consume 296 x `shouldBe` DL.groupBy (==) input 297 298 it "groupBy (nondup begin/end)" $ do 299 let input = [1::Int, 2, 3, 3, 3, 4, 5] 300 x <- runConduitRes $ CL.sourceList input 301 .| CL.groupBy (==) 302 .| CL.consume 303 x `shouldBe` DL.groupBy (==) input 304 305 it "groupOn1" $ do 306 let input = [1::Int, 1, 2, 3, 3, 3, 4, 5, 5] 307 x <- runConduitRes $ CL.sourceList input 308 .| CL.groupOn1 id 309 .| CL.consume 310 x `shouldBe` [(1,[1]), (2, []), (3,[3,3]), (4,[]), (5, [5])] 311 312 it "groupOn1 (nondup begin/end)" $ do 313 let input = [1::Int, 2, 3, 3, 3, 4, 5] 314 x <- runConduitRes $ CL.sourceList input 315 .| CL.groupOn1 id 316 .| CL.consume 317 x `shouldBe` [(1,[]), (2, []), (3,[3,3]), (4,[]), (5, [])] 318 319 320 it "mapMaybe" $ do 321 let input = [Just (1::Int), Nothing, Just 2, Nothing, Just 3] 322 x <- runConduitRes $ CL.sourceList input 323 .| CL.mapMaybe ((+2) <$>) 324 .| CL.consume 325 x `shouldBe` [3, 4, 5] 326 327 it "mapMaybeM" $ do 328 let input = [Just (1::Int), Nothing, Just 2, Nothing, Just 3] 329 x <- runConduitRes $ CL.sourceList input 330 .| CL.mapMaybeM (return . ((+2) <$>)) 331 .| CL.consume 332 x `shouldBe` [3, 4, 5] 333 334 it "catMaybes" $ do 335 let input = [Just (1::Int), Nothing, Just 2, Nothing, Just 3] 336 x <- runConduitRes $ CL.sourceList input 337 .| CL.catMaybes 338 .| CL.consume 339 x `shouldBe` [1, 2, 3] 340 341 it "concatMap" $ do 342 let input = [1, 11, 21] 343 x <- runConduitRes $ CL.sourceList input 344 .| CL.concatMap (\i -> enumFromTo i (i + 9)) 345 .| CL.fold (+) (0 :: Int) 346 x `shouldBe` sum [1..30] 347 348 it "bind together" $ do 349 let conduit = CL.map (+ 5) .| CL.map (* 2) 350 x <- runConduitRes $ CL.sourceList [1..10] .| conduit .| CL.fold (+) 0 351 x `shouldBe` sum (map (* 2) $ map (+ 5) [1..10 :: Int]) 352 353#if !FAST 354 describe "isolate" $ do 355 it "bound to resumable source" $ do 356 (x, y) <- runConduitRes $ do 357 let src1 = CL.sourceList [1..10 :: Int] 358 (src2, x) <- src1 .| CL.isolate 5 C.$$+ CL.consume 359 y <- src2 C.$$+- CL.consume 360 return (x, y) 361 x `shouldBe` [1..5] 362 y `shouldBe` [] 363 364 it "bound to sink, non-resumable" $ do 365 (x, y) <- runConduitRes $ do 366 CL.sourceList [1..10 :: Int] .| do 367 x <- CL.isolate 5 .| CL.consume 368 y <- CL.consume 369 return (x, y) 370 x `shouldBe` [1..5] 371 y `shouldBe` [6..10] 372 373 it "bound to sink, resumable" $ do 374 (x, y) <- runConduitRes $ do 375 let src1 = CL.sourceList [1..10 :: Int] 376 (src2, x) <- src1 C.$$+ CL.isolate 5 .| CL.consume 377 y <- src2 C.$$+- CL.consume 378 return (x, y) 379 x `shouldBe` [1..5] 380 y `shouldBe` [6..10] 381 382 it "consumes all data" $ do 383 x <- runConduitRes $ CL.sourceList [1..10 :: Int] .| do 384 CL.isolate 5 .| CL.sinkNull 385 CL.consume 386 x `shouldBe` [6..10] 387 388 describe "sequence" $ do 389 it "simple sink" $ do 390 let sumSink = do 391 ma <- CL.head 392 case ma of 393 Nothing -> return 0 394 Just a -> (+a) . fromMaybe 0 <$> CL.head 395 396 res <- runConduitRes $ CL.sourceList [1..11 :: Int] 397 .| CL.sequence sumSink 398 .| CL.consume 399 res `shouldBe` [3, 7, 11, 15, 19, 11] 400 401 it "sink with unpull behaviour" $ do 402 let sumSink = do 403 ma <- CL.head 404 case ma of 405 Nothing -> return 0 406 Just a -> (+a) . fromMaybe 0 <$> CL.peek 407 408 res <- runConduitRes $ CL.sourceList [1..11 :: Int] 409 .| CL.sequence sumSink 410 .| CL.consume 411 res `shouldBe` [3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 11] 412 413#endif 414 415 describe "peek" $ do 416 it "works" $ do 417 (a, b) <- runConduitRes $ CL.sourceList [1..10 :: Int] .| do 418 a <- CL.peek 419 b <- CL.consume 420 return (a, b) 421 (a, b) `shouldBe` (Just 1, [1..10]) 422 423 describe "unbuffering" $ do 424 it "works" $ do 425 x <- runConduitRes $ do 426 let src1 = CL.sourceList [1..10 :: Int] 427 (src2, ()) <- src1 C.$$+ CL.drop 5 428 src2 C.$$+- CL.fold (+) 0 429 x `shouldBe` sum [6..10] 430 431 describe "operators" $ do 432 it "only use .|" $ 433 runConduitPure 434 ( CL.sourceList [1..10 :: Int] 435 .| CL.map (+ 1) 436 .| CL.map (subtract 1) 437 .| CL.mapM (return . (* 2)) 438 .| CL.map (`div` 2) 439 .| CL.fold (+) 0 440 ) `shouldBe` sum [1..10] 441 it "only use =$" $ 442 runConduitPure 443 ( CL.sourceList [1..10 :: Int] 444 .| CL.map (+ 1) 445 .| CL.map (subtract 1) 446 .| CL.map (* 2) 447 .| CL.map (`div` 2) 448 .| CL.fold (+) 0 449 ) `shouldBe` sum [1..10] 450 it "chain" $ do 451 x <- runConduit 452 $ CL.sourceList [1..10 :: Int] 453 .| CL.map (+ 1) 454 .| CL.map (+ 1) 455 .| CL.map (+ 1) 456 .| CL.map (subtract 3) 457 .| CL.map (* 2) 458 .| CL.map (`div` 2) 459 .| CL.map (+ 1) 460 .| CL.map (+ 1) 461 .| CL.map (+ 1) 462 .| CL.map (subtract 3) 463 .| CL.fold (+) 0 464 x `shouldBe` sum [1..10] 465 466 467 describe "termination" $ do 468 it "terminates early" $ do 469 let src = forever $ C.yield () 470 x <- runConduit $ src .| CL.head 471 x `shouldBe` Just () 472 it "bracket" $ do 473 ref <- I.newIORef (0 :: Int) 474 let src = C.bracketP 475 (I.modifyIORef ref (+ 1)) 476 (\() -> I.modifyIORef ref (+ 2)) 477 (\() -> forever $ C.yield (1 :: Int)) 478 val <- runConduitRes $ src .| CL.isolate 10 .| CL.fold (+) 0 479 val `shouldBe` 10 480 i <- I.readIORef ref 481 i `shouldBe` 3 482 it "bracket skipped if not needed" $ do 483 ref <- I.newIORef (0 :: Int) 484 let src = C.bracketP 485 (I.modifyIORef ref (+ 1)) 486 (\() -> I.modifyIORef ref (+ 2)) 487 (\() -> forever $ C.yield (1 :: Int)) 488 src' = CL.sourceList $ repeat 1 489 val <- runConduitRes $ (src' >> src) .| CL.isolate 10 .| CL.fold (+) 0 490 val `shouldBe` 10 491 i <- I.readIORef ref 492 i `shouldBe` 0 493 it "bracket + toPipe" $ do 494 ref <- I.newIORef (0 :: Int) 495 let src = C.bracketP 496 (I.modifyIORef ref (+ 1)) 497 (\() -> I.modifyIORef ref (+ 2)) 498 (\() -> forever $ C.yield (1 :: Int)) 499 val <- runConduitRes $ src .| CL.isolate 10 .| CL.fold (+) 0 500 val `shouldBe` 10 501 i <- I.readIORef ref 502 i `shouldBe` 3 503 it "bracket skipped if not needed" $ do 504 ref <- I.newIORef (0 :: Int) 505 let src = C.bracketP 506 (I.modifyIORef ref (+ 1)) 507 (\() -> I.modifyIORef ref (+ 2)) 508 (\() -> forever $ C.yield (1 :: Int)) 509 src' = CL.sourceList $ repeat 1 510 val <- runConduitRes $ (src' >> src) .| CL.isolate 10 .| CL.fold (+) 0 511 val `shouldBe` 10 512 i <- I.readIORef ref 513 i `shouldBe` 0 514 515 describe "invariant violations" $ do 516 it "leftovers without input" $ do 517 ref <- I.newIORef [] 518 let add x = I.modifyIORef ref (x:) 519 adder' = CI.NeedInput (\a -> liftIO (add a) >> adder') return 520 adder = CI.ConduitT (adder' >>=) 521 residue x = CI.ConduitT $ \rest -> CI.Leftover (rest ()) x 522 523 _ <- runConduit $ C.yield 1 .| adder 524 x <- I.readIORef ref 525 x `shouldBe` [1 :: Int] 526 I.writeIORef ref [] 527 528 _ <- runConduit $ C.yield 1 .| ((residue 2 >> residue 3) >> adder) 529 y <- I.readIORef ref 530 y `shouldBe` [1, 2, 3] 531 I.writeIORef ref [] 532 533 _ <- runConduit $ C.yield 1 .| (residue 2 >> (residue 3 >> adder)) 534 z <- I.readIORef ref 535 z `shouldBe` [1, 2, 3] 536 I.writeIORef ref [] 537 538 describe "sane yield/await'" $ do 539 it' "yield terminates" $ do 540 let is = [1..10] ++ undefined 541 src [] = return () 542 src (x:xs) = C.yield x >> src xs 543 x <- runConduit $ src is .| CL.take 10 544 x `shouldBe` [1..10 :: Int] 545 it' "yield terminates (2)" $ do 546 let is = [1..10] ++ undefined 547 x <- runConduit $ mapM_ C.yield is .| CL.take 10 548 x `shouldBe` [1..10 :: Int] 549 550 describe "upstream results" $ do 551 it' "works" $ do 552 let foldUp :: (b -> a -> b) -> b -> CI.Pipe l a Void u IO (u, b) 553 foldUp f b = CI.awaitE >>= either (\u -> return (u, b)) (\a -> let b' = f b a in b' `seq` foldUp f b') 554 passFold :: (b -> a -> b) -> b -> CI.Pipe l a a () IO b 555 passFold f b = CI.await >>= maybe (return b) (\a -> let b' = f b a in b' `seq` CI.yield a >> passFold f b') 556 (x, y) <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> passFold (+) 0 CI.>+> foldUp (*) 1 557 (x, y) `shouldBe` (sum [1..10], product [1..10]) 558 559 describe "input/output mapping" $ do 560 it' "mapOutput" $ do 561 x <- runConduit $ C.mapOutput (+ 1) (CL.sourceList [1..10 :: Int]) .| CL.fold (+) 0 562 x `shouldBe` sum [2..11] 563 it' "mapOutputMaybe" $ do 564 x <- runConduit $ C.mapOutputMaybe (\i -> if even i then Just i else Nothing) (CL.sourceList [1..10 :: Int]) .| CL.fold (+) 0 565 x `shouldBe` sum [2, 4..10] 566 it' "mapInput" $ do 567 xyz <- runConduit $ (CL.sourceList $ map show [1..10 :: Int]) .| do 568 (x, y) <- C.mapInput read (Just . show) $ ((do 569 x <- CL.isolate 5 .| CL.fold (+) 0 570 y <- CL.peek 571 return (x :: Int, y :: Maybe Int)) :: ConduitT Int Void IO (Int, Maybe Int)) 572 z <- CL.consume 573 return (x, y, concat z) 574 575 xyz `shouldBe` (sum [1..5], Just 6, "678910") 576 577 describe "left/right identity" $ do 578 it' "left identity" $ do 579 x <- runConduit $ CL.sourceList [1..10 :: Int] .| CI.ConduitT (CI.idP >>=) .| CL.fold (+) 0 580 y <- runConduit $ CL.sourceList [1..10 :: Int] .| CL.fold (+) 0 581 x `shouldBe` y 582 it' "right identity" $ do 583 x <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> (CI.injectLeftovers $ (`CI.unConduitT` CI.Done) $ CL.fold (+) 0) CI.>+> CI.idP 584 y <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> (CI.injectLeftovers $ (`CI.unConduitT` CI.Done) $ CL.fold (+) 0) 585 x `shouldBe` y 586 587 describe "generalizing" $ do 588 it' "works" $ do 589 x <- CI.runPipe 590 $ CI.sourceToPipe (CL.sourceList [1..10 :: Int]) 591 CI.>+> CI.conduitToPipe (CL.map (+ 1)) 592 CI.>+> CI.sinkToPipe (CL.fold (+) 0) 593 x `shouldBe` sum [2..11] 594 595 describe "withUpstream" $ do 596 it' "works" $ do 597 let src = mapM_ CI.yield [1..10 :: Int] >> return True 598 fold f = 599 loop 600 where 601 loop accum = 602 CI.await >>= maybe (return accum) go 603 where 604 go a = 605 let accum' = f accum a 606 in accum' `seq` loop accum' 607 sink = CI.withUpstream $ fold (+) 0 608 res <- CI.runPipe $ src CI.>+> sink 609 res `shouldBe` (True, sum [1..10]) 610 611 describe "iterate" $ do 612 it' "works" $ do 613 res <- runConduit $ CL.iterate (+ 1) (1 :: Int) .| CL.isolate 10 .| CL.fold (+) 0 614 res `shouldBe` sum [1..10] 615 616 prop "replicate" $ \cnt' -> do 617 let cnt = min cnt' 100 618 res <- runConduit $ CL.replicate cnt () .| CL.consume 619 res `shouldBe` replicate cnt () 620 621 prop "replicateM" $ \cnt' -> do 622 ref <- I.newIORef 0 623 let cnt = min cnt' 100 624 res <- runConduit $ CL.replicateM cnt (I.modifyIORef ref (+ 1)) .| CL.consume 625 res `shouldBe` replicate cnt () 626 627 ref' <- I.readIORef ref 628 ref' `shouldBe` (if cnt' <= 0 then 0 else cnt) 629 630 describe "injectLeftovers" $ do 631 it "works" $ do 632 let src = mapM_ CI.yield [1..10 :: Int] 633 conduit = CI.injectLeftovers $ (`CI.unConduitT` CI.Done) $ C.awaitForever $ \i -> do 634 js <- CL.take 2 635 mapM_ C.leftover $ reverse js 636 C.yield i 637 res <- runConduit $ CI.ConduitT ((src CI.>+> CI.injectLeftovers conduit) >>=) .| CL.consume 638 res `shouldBe` [1..10] 639 describe "monad transformer laws" $ do 640 it "transPipe" $ do 641 let source = CL.sourceList $ replicate 10 () 642 let tell' x = tell [x :: Int] 643 644 let replaceNum1 = C.awaitForever $ \() -> do 645 i <- lift get 646 lift $ (put $ i + 1) >> (get >>= lift . tell') 647 C.yield i 648 649 let replaceNum2 = C.awaitForever $ \() -> do 650 i <- lift get 651 lift $ put $ i + 1 652 lift $ get >>= lift . tell' 653 C.yield i 654 655 x <- runWriterT $ runConduit $ source .| C.transPipe (`evalStateT` 1) replaceNum1 .| CL.consume 656 y <- runWriterT $ runConduit $ source .| C.transPipe (`evalStateT` 1) replaceNum2 .| CL.consume 657 x `shouldBe` y 658 describe "iterM" $ do 659 prop "behavior" $ \l -> monadicIO $ do 660 let counter ref = CL.iterM (const $ liftIO $ M.modifyMVar_ ref (\i -> return $! i + 1)) 661 v <- run $ do 662 ref <- M.newMVar 0 663 runConduit $ CL.sourceList l .| counter ref .| CL.mapM_ (const $ return ()) 664 M.readMVar ref 665 666 assert $ v == length (l :: [Int]) 667 prop "mapM_ equivalence" $ \l -> monadicIO $ do 668 let runTest h = run $ do 669 ref <- M.newMVar (0 :: Int) 670 let f = action ref 671 s <- runConduit $ CL.sourceList (l :: [Int]) .| h f .| CL.fold (+) 0 672 c <- M.readMVar ref 673 674 return (c, s) 675 676 action ref = const $ liftIO $ M.modifyMVar_ ref (\i -> return $! i + 1) 677 (c1, s1) <- runTest CL.iterM 678 (c2, s2) <- runTest (\f -> CL.mapM (\a -> f a >>= \() -> return a)) 679 680 assert $ c1 == c2 681 assert $ s1 == s2 682 683 describe "generalizing" $ do 684 it "works" $ do 685 let src :: Int -> ConduitT () Int IO () 686 src i = CL.sourceList [1..i] 687 sink :: ConduitT Int Void IO Int 688 sink = CL.fold (+) 0 689 res <- runConduit $ C.yield 10 .| C.awaitForever (C.toProducer . src) .| (C.toConsumer sink >>= C.yield) .| C.await 690 res `shouldBe` Just (sum [1..10]) 691 692 describe "mergeSource" $ do 693 it "works" $ do 694 let src :: ConduitT () String IO () 695 src = CL.sourceList ["A", "B", "C"] 696 withIndex :: ConduitT String (Integer, String) IO () 697 withIndex = CI.mergeSource (CL.sourceList [1..]) 698 output <- runConduit $ src .| withIndex .| CL.consume 699 output `shouldBe` [(1, "A"), (2, "B"), (3, "C")] 700 it "does stop processing when the source exhausted" $ do 701 let src :: ConduitT () Integer IO () 702 src = CL.sourceList [1..] 703 withShortAlphaIndex :: ConduitT Integer (String, Integer) IO () 704 withShortAlphaIndex = CI.mergeSource (CL.sourceList ["A", "B", "C"]) 705 output <- runConduit $ src .| withShortAlphaIndex .| CL.consume 706 output `shouldBe` [("A", 1), ("B", 2), ("C", 3)] 707 708 describe "passthroughSink" $ do 709 it "works" $ do 710 ref <- I.newIORef (-1) 711 let sink = CL.fold (+) (0 :: Int) 712 conduit = C.passthroughSink sink (I.writeIORef ref) 713 input = [1..10] 714 output <- runConduit $ mapM_ C.yield input .| conduit .| CL.consume 715 output `shouldBe` input 716 x <- I.readIORef ref 717 x `shouldBe` sum input 718 it "does nothing when downstream does nothing" $ do 719 ref <- I.newIORef (-1) 720 let sink = CL.fold (+) (0 :: Int) 721 conduit = C.passthroughSink sink (I.writeIORef ref) 722 input = [undefined] 723 runConduit $ mapM_ C.yield input .| conduit .| return () 724 x <- I.readIORef ref 725 x `shouldBe` (-1) 726 727 it "handles the last input correctly #304" $ do 728 ref <- I.newIORef (-1 :: Int) 729 let sink = CL.mapM_ (I.writeIORef ref) 730 conduit = C.passthroughSink sink (const (return ())) 731 res <- runConduit $ mapM_ C.yield [1..] .| conduit .| CL.take 5 732 res `shouldBe` [1..5] 733 x <- I.readIORef ref 734 x `shouldBe` 5 735 736 describe "mtl instances" $ do 737 it "ErrorT" $ do 738 let src = flip catchError (const $ C.yield 4) $ do 739 lift $ return () 740 C.yield 1 741 lift $ return () 742 C.yield 2 743 lift $ return () 744 () <- throwError DummyError 745 lift $ return () 746 C.yield 3 747 lift $ return () 748 runConduit (src .| CL.consume) `shouldBe` Right [1, 2, 4 :: Int] 749 describe "WriterT" $ 750 it "pass" $ 751 let writer = W.pass $ do 752 W.tell [1 :: Int] 753 pure ((), (2:)) 754 in execWriter (runConduit writer) `shouldBe` [2, 1] 755 756 describe "Data.Conduit.Lift" $ do 757 it "execStateC" $ do 758 let sink = C.execStateLC 0 $ CL.mapM_ $ modify . (+) 759 src = mapM_ C.yield [1..10 :: Int] 760 res <- runConduit $ src .| sink 761 res `shouldBe` sum [1..10] 762 763 it "execWriterC" $ do 764 let sink = C.execWriterLC $ CL.mapM_ $ tell . return 765 src = mapM_ C.yield [1..10 :: Int] 766 res <- runConduit $ src .| sink 767 res `shouldBe` [1..10] 768 769 it "runExceptC" $ do 770 let sink = C.runExceptC $ do 771 x <- C.catchExceptC (lift $ throwError "foo") return 772 return $ x ++ "bar" 773 res <- runConduit $ return () .| sink 774 res `shouldBe` Right ("foobar" :: String) 775 776 it "runMaybeC" $ do 777 let src = void $ C.runMaybeC $ do 778 C.yield 1 779 () <- lift $ MaybeT $ return Nothing 780 C.yield 2 781 sink = CL.consume 782 res <- runConduit $ src .| sink 783 res `shouldBe` [1 :: Int] 784 785 describe "sequenceSources" $ do 786 it "works" $ do 787 let src1 = mapM_ C.yield [1, 2, 3 :: Int] 788 src2 = mapM_ C.yield [3, 2, 1] 789 src3 = mapM_ C.yield $ repeat 2 790 srcs = C.sequenceSources $ Map.fromList 791 [ (1 :: Int, src1) 792 , (2, src2) 793 , (3, src3) 794 ] 795 res <- runConduit $ srcs .| CL.consume 796 res `shouldBe` 797 [ Map.fromList [(1, 1), (2, 3), (3, 2)] 798 , Map.fromList [(1, 2), (2, 2), (3, 2)] 799 , Map.fromList [(1, 3), (2, 1), (3, 2)] 800 ] 801 describe "zipSink" $ do 802 it "zip equal-sized" $ do 803 x <- runConduitRes $ 804 CL.sourceList [1..100] .| 805 C.sequenceSinks [ CL.fold (+) 0, 806 (`mod` 101) <$> CL.fold (*) 1 ] 807 x `shouldBe` [5050, 100 :: Integer] 808 809 it "zip distinct sizes" $ do 810 let sink = C.getZipSink $ 811 (*) <$> C.ZipSink (CL.fold (+) 0) 812 <*> C.ZipSink (Data.Maybe.fromJust <$> C.await) 813 x <- runConduitRes $ CL.sourceList [100,99..1] .| sink 814 x `shouldBe` (505000 :: Integer) 815 816 describe "upstream results" $ do 817 it "fuseBoth" $ do 818 let upstream = do 819 C.yield ("hello" :: String) 820 CL.isolate 5 .| CL.fold (+) 0 821 downstream = C.fuseBoth upstream CL.consume 822 res <- runConduit $ CL.sourceList [1..10 :: Int] .| do 823 (x, y) <- downstream 824 z <- CL.consume 825 return (x, y, z) 826 res `shouldBe` (sum [1..5], ["hello"], [6..10]) 827 828 it "fuseBothMaybe with no result" $ do 829 let src = mapM_ C.yield [1 :: Int ..] 830 sink = CL.isolate 5 .| CL.fold (+) 0 831 (mup, down) <- runConduit $ C.fuseBothMaybe src sink 832 mup `shouldBe` (Nothing :: Maybe ()) 833 down `shouldBe` sum [1..5] 834 835 it "fuseBothMaybe with result" $ do 836 let src = mapM_ C.yield [1 :: Int .. 5] 837 sink = CL.isolate 6 .| CL.fold (+) 0 838 (mup, down) <- runConduit $ C.fuseBothMaybe src sink 839 mup `shouldBe` Just () 840 down `shouldBe` sum [1..5] 841 842 it "fuseBothMaybe with almost result" $ do 843 let src = mapM_ C.yield [1 :: Int .. 5] 844 sink = CL.isolate 5 .| CL.fold (+) 0 845 (mup, down) <- runConduit $ C.fuseBothMaybe src sink 846 mup `shouldBe` (Nothing :: Maybe ()) 847 down `shouldBe` sum [1..5] 848 849 describe "catching exceptions" $ do 850 it "works" $ do 851 let src = do 852 C.yield 1 853 () <- Catch.throwM DummyError 854 C.yield 2 855 src' = do 856 CI.catchC src (\DummyError -> C.yield (3 :: Int)) 857 res <- runConduit $ src' .| CL.consume 858 res `shouldBe` [1, 3] 859 860 describe "sourceToList" $ do 861 it "works lazily in Identity" $ do 862 let src = C.yield 1 >> C.yield 2 >> throw DummyError 863 let res = runIdentity $ C.sourceToList src 864 take 2 res `shouldBe` [1, 2 :: Int] 865 it "is not lazy in IO" $ do 866 let src = C.yield 1 >> C.yield (2 :: Int) >> throw DummyError 867 C.sourceToList src `shouldThrow` (==DummyError) 868 869 ZipConduit.spec 870 Stream.spec 871 872it' :: String -> IO () -> Spec 873it' = it 874 875data DummyError = DummyError 876 deriving (Show, Eq, Typeable) 877instance Catch.Exception DummyError 878