1{-# LANGUAGE RankNTypes #-} 2{-# LANGUAGE BangPatterns #-} 3{-# LANGUAGE CPP #-} 4{-# LANGUAGE Trustworthy #-} 5-- | /NOTE/ It is recommended to start using "Data.Conduit.Combinators" instead 6-- of this module. 7-- 8-- Higher-level functions to interact with the elements of a stream. Most of 9-- these are based on list functions. 10-- 11-- For many purposes, it's recommended to use the conduit-combinators library, 12-- which provides a more complete set of functions. 13-- 14-- Note that these functions all deal with individual elements of a stream as a 15-- sort of \"black box\", where there is no introspection of the contained 16-- elements. Values such as @ByteString@ and @Text@ will likely need to be 17-- treated specially to deal with their contents properly (@Word8@ and @Char@, 18-- respectively). See the @Data.Conduit.Binary@ and @Data.Conduit.Text@ 19-- modules in the @conduit-extra@ package. 20module Data.Conduit.List 21 ( -- * Sources 22 sourceList 23 , sourceNull 24 , unfold 25 , unfoldEither 26 , unfoldM 27 , unfoldEitherM 28 , enumFromTo 29 , iterate 30 , replicate 31 , replicateM 32 -- * Sinks 33 -- ** Pure 34 , fold 35 , foldMap 36 , uncons 37 , unconsEither 38 , take 39 , drop 40 , head 41 , peek 42 , consume 43 , sinkNull 44 -- ** Monadic 45 , foldMapM 46 , foldM 47 , unconsM 48 , unconsEitherM 49 , mapM_ 50 -- * Conduits 51 -- ** Pure 52 , map 53 , mapMaybe 54 , mapFoldable 55 , catMaybes 56 , concat 57 , concatMap 58 , concatMapAccum 59 , scanl 60 , scan 61 , mapAccum 62 , chunksOf 63 , groupBy 64 , groupOn1 65 , isolate 66 , filter 67 -- ** Monadic 68 , mapM 69 , iterM 70 , scanlM 71 , scanM 72 , mapAccumM 73 , mapMaybeM 74 , mapFoldableM 75 , concatMapM 76 , concatMapAccumM 77 -- * Misc 78 , sequence 79 ) where 80 81import qualified Prelude 82import Prelude 83 ( ($), return, (==), (-), Int 84 , (.), id, Maybe (..), Monad 85 , Either (..) 86 , Bool (..) 87 , (>>) 88 , (>>=) 89 , seq 90 , otherwise 91 , Enum, Eq 92 , maybe 93 , (<=) 94 , (>) 95 , error 96 , (++) 97 , show 98 ) 99import Data.Monoid (Monoid, mempty, mappend) 100import qualified Data.Foldable as F 101import Data.Conduit 102import Data.Conduit.Internal.Conduit (unconsM, unconsEitherM) 103import Data.Conduit.Internal.Fusion 104import Data.Conduit.Internal.List.Stream 105import qualified Data.Conduit.Internal as CI 106import Data.Functor.Identity (Identity (runIdentity)) 107import Control.Monad (when, (<=<), liftM, void) 108import Control.Monad.Trans.Class (lift) 109 110-- Defines INLINE_RULE0, INLINE_RULE, STREAMING0, and STREAMING. 111#include "fusion-macros.h" 112 113-- | Generate a source from a seed value. 114-- 115-- Subject to fusion 116-- 117-- Since 0.4.2 118unfold, unfoldC :: Monad m 119 => (b -> Maybe (a, b)) 120 -> b 121 -> ConduitT i a m () 122unfoldC f = 123 go 124 where 125 go seed = 126 case f seed of 127 Just (a, seed') -> yield a >> go seed' 128 Nothing -> return () 129{-# INLINE unfoldC #-} 130STREAMING(unfold, unfoldC, unfoldS, f x) 131 132-- | Generate a source from a seed value with a return value. 133-- 134-- Subject to fusion 135-- 136-- @since 1.2.11 137unfoldEither, unfoldEitherC :: Monad m 138 => (b -> Either r (a, b)) 139 -> b 140 -> ConduitT i a m r 141unfoldEitherC f = 142 go 143 where 144 go seed = 145 case f seed of 146 Right (a, seed') -> yield a >> go seed' 147 Left r -> return r 148{-# INLINE unfoldEitherC #-} 149STREAMING(unfoldEither, unfoldEitherC, unfoldEitherS, f x) 150 151-- | A monadic unfold. 152-- 153-- Subject to fusion 154-- 155-- Since 1.1.2 156unfoldM, unfoldMC :: Monad m 157 => (b -> m (Maybe (a, b))) 158 -> b 159 -> ConduitT i a m () 160unfoldMC f = 161 go 162 where 163 go seed = do 164 mres <- lift $ f seed 165 case mres of 166 Just (a, seed') -> yield a >> go seed' 167 Nothing -> return () 168STREAMING(unfoldM, unfoldMC, unfoldMS, f seed) 169 170-- | A monadic unfoldEither. 171-- 172-- Subject to fusion 173-- 174-- @since 1.2.11 175unfoldEitherM, unfoldEitherMC :: Monad m 176 => (b -> m (Either r (a, b))) 177 -> b 178 -> ConduitT i a m r 179unfoldEitherMC f = 180 go 181 where 182 go seed = do 183 mres <- lift $ f seed 184 case mres of 185 Right (a, seed') -> yield a >> go seed' 186 Left r -> return r 187STREAMING(unfoldEitherM, unfoldEitherMC, unfoldEitherMS, f seed) 188 189-- | Split a pure conduit into head and tail. 190-- This is equivalent to @runIdentity . unconsM@. 191-- 192-- Note that you have to 'sealConduitT' it first. 193-- 194-- Since 1.3.3 195uncons :: SealedConduitT () o Identity () 196 -> Maybe (o, SealedConduitT () o Identity ()) 197uncons = runIdentity . unconsM 198 199-- | Split a pure conduit into head and tail or return its result if it is done. 200-- This is equivalent to @runIdentity . unconsEitherM@. 201-- 202-- Note that you have to 'sealConduitT' it first. 203-- 204-- Since 1.3.3 205unconsEither :: SealedConduitT () o Identity r 206 -> Either r (o, SealedConduitT () o Identity r) 207unconsEither = runIdentity . unconsEitherM 208 209-- | Yield the values from the list. 210-- 211-- Subject to fusion 212sourceList, sourceListC :: Monad m => [a] -> ConduitT i a m () 213sourceListC = Prelude.mapM_ yield 214{-# INLINE sourceListC #-} 215STREAMING(sourceList, sourceListC, sourceListS, xs) 216 217-- | Enumerate from a value to a final value, inclusive, via 'succ'. 218-- 219-- This is generally more efficient than using @Prelude@\'s @enumFromTo@ and 220-- combining with @sourceList@ since this avoids any intermediate data 221-- structures. 222-- 223-- Subject to fusion 224-- 225-- Since 0.4.2 226enumFromTo, enumFromToC :: (Enum a, Prelude.Ord a, Monad m) 227 => a 228 -> a 229 -> ConduitT i a m () 230enumFromToC x0 y = 231 loop x0 232 where 233 loop x 234 | x Prelude.> y = return () 235 | otherwise = yield x >> loop (Prelude.succ x) 236{-# INLINE enumFromToC #-} 237STREAMING(enumFromTo, enumFromToC, enumFromToS, x0 y) 238 239-- | Produces an infinite stream of repeated applications of f to x. 240-- 241-- Subject to fusion 242-- 243iterate, iterateC :: Monad m => (a -> a) -> a -> ConduitT i a m () 244iterateC f = 245 go 246 where 247 go a = yield a >> go (f a) 248{-# INLINE iterateC #-} 249STREAMING(iterate, iterateC, iterateS, f a) 250 251-- | Replicate a single value the given number of times. 252-- 253-- Subject to fusion 254-- 255-- Since 1.2.0 256replicate, replicateC :: Monad m => Int -> a -> ConduitT i a m () 257replicateC cnt0 a = 258 loop cnt0 259 where 260 loop i 261 | i <= 0 = return () 262 | otherwise = yield a >> loop (i - 1) 263{-# INLINE replicateC #-} 264STREAMING(replicate, replicateC, replicateS, cnt0 a) 265 266-- | Replicate a monadic value the given number of times. 267-- 268-- Subject to fusion 269-- 270-- Since 1.2.0 271replicateM, replicateMC :: Monad m => Int -> m a -> ConduitT i a m () 272replicateMC cnt0 ma = 273 loop cnt0 274 where 275 loop i 276 | i <= 0 = return () 277 | otherwise = lift ma >>= yield >> loop (i - 1) 278{-# INLINE replicateMC #-} 279STREAMING(replicateM, replicateMC, replicateMS, cnt0 ma) 280 281-- | A strict left fold. 282-- 283-- Subject to fusion 284-- 285-- Since 0.3.0 286fold, foldC :: Monad m 287 => (b -> a -> b) 288 -> b 289 -> ConduitT a o m b 290foldC f = 291 loop 292 where 293 loop !accum = await >>= maybe (return accum) (loop . f accum) 294{-# INLINE foldC #-} 295STREAMING(fold, foldC, foldS, f accum) 296 297-- | A monadic strict left fold. 298-- 299-- Subject to fusion 300-- 301-- Since 0.3.0 302foldM, foldMC :: Monad m 303 => (b -> a -> m b) 304 -> b 305 -> ConduitT a o m b 306foldMC f = 307 loop 308 where 309 loop accum = do 310 await >>= maybe (return accum) go 311 where 312 go a = do 313 accum' <- lift $ f accum a 314 accum' `seq` loop accum' 315{-# INLINE foldMC #-} 316STREAMING(foldM, foldMC, foldMS, f accum) 317 318----------------------------------------------------------------- 319-- These are for cases where- for whatever reason- stream fusion cannot be 320-- applied. 321connectFold :: Monad m => ConduitT () a m () -> (b -> a -> b) -> b -> m b 322connectFold (CI.ConduitT src0) f = 323 go (src0 CI.Done) 324 where 325 go (CI.Done ()) b = return b 326 go (CI.HaveOutput src a) b = go src Prelude.$! f b a 327 go (CI.NeedInput _ c) b = go (c ()) b 328 go (CI.Leftover src ()) b = go src b 329 go (CI.PipeM msrc) b = do 330 src <- msrc 331 go src b 332{-# INLINE connectFold #-} 333{-# RULES "conduit: $$ fold" forall src f b. runConduit (src .| fold f b) = connectFold src f b #-} 334 335connectFoldM :: Monad m => ConduitT () a m () -> (b -> a -> m b) -> b -> m b 336connectFoldM (CI.ConduitT src0) f = 337 go (src0 CI.Done) 338 where 339 go (CI.Done ()) b = return b 340 go (CI.HaveOutput src a) b = do 341 !b' <- f b a 342 go src b' 343 go (CI.NeedInput _ c) b = go (c ()) b 344 go (CI.Leftover src ()) b = go src b 345 go (CI.PipeM msrc) b = do 346 src <- msrc 347 go src b 348{-# INLINE connectFoldM #-} 349{-# RULES "conduit: $$ foldM" forall src f b. runConduit (src .| foldM f b) = connectFoldM src f b #-} 350----------------------------------------------------------------- 351 352-- | A monoidal strict left fold. 353-- 354-- Subject to fusion 355-- 356-- Since 0.5.3 357foldMap :: (Monad m, Monoid b) 358 => (a -> b) 359 -> ConduitT a o m b 360INLINE_RULE(foldMap, f, let combiner accum = mappend accum . f in fold combiner mempty) 361 362-- | A monoidal strict left fold in a Monad. 363-- 364-- Since 1.0.8 365foldMapM :: (Monad m, Monoid b) 366 => (a -> m b) 367 -> ConduitT a o m b 368INLINE_RULE(foldMapM, f, let combiner accum = liftM (mappend accum) . f in foldM combiner mempty) 369 370-- | Apply the action to all values in the stream. 371-- 372-- Subject to fusion 373-- 374-- Since 0.3.0 375mapM_, mapM_C :: Monad m 376 => (a -> m ()) 377 -> ConduitT a o m () 378mapM_C f = awaitForever $ lift . f 379{-# INLINE mapM_C #-} 380STREAMING(mapM_, mapM_C, mapM_S, f) 381 382srcMapM_ :: Monad m => ConduitT () a m () -> (a -> m ()) -> m () 383srcMapM_ (CI.ConduitT src) f = 384 go (src CI.Done) 385 where 386 go (CI.Done ()) = return () 387 go (CI.PipeM mp) = mp >>= go 388 go (CI.Leftover p ()) = go p 389 go (CI.HaveOutput p o) = f o >> go p 390 go (CI.NeedInput _ c) = go (c ()) 391{-# INLINE srcMapM_ #-} 392{-# RULES "conduit: connect to mapM_" [2] forall f src. runConduit (src .| mapM_ f) = srcMapM_ src f #-} 393 394-- | Ignore a certain number of values in the stream. This function is 395-- semantically equivalent to: 396-- 397-- > drop i = take i >> return () 398-- 399-- However, @drop@ is more efficient as it does not need to hold values in 400-- memory. 401-- 402-- Subject to fusion 403-- 404-- Since 0.3.0 405drop, dropC :: Monad m 406 => Int 407 -> ConduitT a o m () 408dropC = 409 loop 410 where 411 loop i | i <= 0 = return () 412 loop count = await >>= maybe (return ()) (\_ -> loop (count - 1)) 413{-# INLINE dropC #-} 414STREAMING(drop, dropC, dropS, i) 415 416-- | Take some values from the stream and return as a list. If you want to 417-- instead create a conduit that pipes data to another sink, see 'isolate'. 418-- This function is semantically equivalent to: 419-- 420-- > take i = isolate i =$ consume 421-- 422-- Subject to fusion 423-- 424-- Since 0.3.0 425take, takeC :: Monad m 426 => Int 427 -> ConduitT a o m [a] 428takeC = 429 loop id 430 where 431 loop front count | count <= 0 = return $ front [] 432 loop front count = await >>= maybe 433 (return $ front []) 434 (\x -> loop (front . (x:)) (count - 1)) 435{-# INLINE takeC #-} 436STREAMING(take, takeC, takeS, i) 437 438-- | Take a single value from the stream, if available. 439-- 440-- Subject to fusion 441-- 442-- Since 0.3.0 443head, headC :: Monad m => ConduitT a o m (Maybe a) 444headC = await 445{-# INLINE headC #-} 446STREAMING0(head, headC, headS) 447 448-- | Look at the next value in the stream, if available. This function will not 449-- change the state of the stream. 450-- 451-- Since 0.3.0 452peek :: Monad m => ConduitT a o m (Maybe a) 453peek = await >>= maybe (return Nothing) (\x -> leftover x >> return (Just x)) 454 455-- | Apply a transformation to all values in a stream. 456-- 457-- Subject to fusion 458-- 459-- Since 0.3.0 460map, mapC :: Monad m => (a -> b) -> ConduitT a b m () 461mapC f = awaitForever $ yield . f 462{-# INLINE mapC #-} 463STREAMING(map, mapC, mapS, f) 464 465-- Since a Source never has any leftovers, fusion rules on it are safe. 466{- 467{-# RULES "conduit: source/map fusion .|" forall f src. src .| map f = mapFuseRight src f #-} 468 469mapFuseRight :: Monad m => Source m a -> (a -> b) -> Source m b 470mapFuseRight src f = CIC.mapOutput f src 471{-# INLINE mapFuseRight #-} 472-} 473 474{- 475 476It might be nice to include these rewrite rules, but they may have subtle 477differences based on leftovers. 478 479{-# RULES "conduit: map-to-mapOutput pipeL" forall f src. pipeL src (map f) = mapOutput f src #-} 480{-# RULES "conduit: map-to-mapOutput $=" forall f src. src $= (map f) = mapOutput f src #-} 481{-# RULES "conduit: map-to-mapOutput pipe" forall f src. pipe src (map f) = mapOutput f src #-} 482{-# RULES "conduit: map-to-mapOutput >+>" forall f src. src >+> (map f) = mapOutput f src #-} 483 484{-# RULES "conduit: map-to-mapInput pipeL" forall f sink. pipeL (map f) sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} 485{-# RULES "conduit: map-to-mapInput =$" forall f sink. map f =$ sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} 486{-# RULES "conduit: map-to-mapInput pipe" forall f sink. pipe (map f) sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} 487{-# RULES "conduit: map-to-mapInput >+>" forall f sink. map f >+> sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} 488 489{-# RULES "conduit: map-to-mapOutput .|" forall f con. con .| map f = mapOutput f con #-} 490{-# RULES "conduit: map-to-mapInput .|" forall f con. map f .| con = mapInput f (Prelude.const Prelude.Nothing) con #-} 491 492{-# INLINE [1] map #-} 493 494-} 495 496-- | Apply a monadic transformation to all values in a stream. 497-- 498-- If you do not need the transformed values, and instead just want the monadic 499-- side-effects of running the action, see 'mapM_'. 500-- 501-- Subject to fusion 502-- 503-- Since 0.3.0 504mapM, mapMC :: Monad m => (a -> m b) -> ConduitT a b m () 505mapMC f = awaitForever $ \a -> lift (f a) >>= yield 506{-# INLINE mapMC #-} 507STREAMING(mapM, mapMC, mapMS, f) 508 509-- | Apply a monadic action on all values in a stream. 510-- 511-- This @Conduit@ can be used to perform a monadic side-effect for every 512-- value, whilst passing the value through the @Conduit@ as-is. 513-- 514-- > iterM f = mapM (\a -> f a >>= \() -> return a) 515-- 516-- Subject to fusion 517-- 518-- Since 0.5.6 519iterM, iterMC :: Monad m => (a -> m ()) -> ConduitT a a m () 520iterMC f = awaitForever $ \a -> lift (f a) >> yield a 521{-# INLINE iterMC #-} 522STREAMING(iterM, iterMC, iterMS, f) 523 524-- | Apply a transformation that may fail to all values in a stream, discarding 525-- the failures. 526-- 527-- Subject to fusion 528-- 529-- Since 0.5.1 530mapMaybe, mapMaybeC :: Monad m => (a -> Maybe b) -> ConduitT a b m () 531mapMaybeC f = awaitForever $ maybe (return ()) yield . f 532{-# INLINE mapMaybeC #-} 533STREAMING(mapMaybe, mapMaybeC, mapMaybeS, f) 534 535-- | Apply a monadic transformation that may fail to all values in a stream, 536-- discarding the failures. 537-- 538-- Subject to fusion 539-- 540-- Since 0.5.1 541mapMaybeM, mapMaybeMC :: Monad m => (a -> m (Maybe b)) -> ConduitT a b m () 542mapMaybeMC f = awaitForever $ maybe (return ()) yield <=< lift . f 543{-# INLINE mapMaybeMC #-} 544STREAMING(mapMaybeM, mapMaybeMC, mapMaybeMS, f) 545 546-- | Filter the @Just@ values from a stream, discarding the @Nothing@ values. 547-- 548-- Subject to fusion 549-- 550-- Since 0.5.1 551catMaybes, catMaybesC :: Monad m => ConduitT (Maybe a) a m () 552catMaybesC = awaitForever $ maybe (return ()) yield 553{-# INLINE catMaybesC #-} 554STREAMING0(catMaybes, catMaybesC, catMaybesS) 555 556-- | Generalization of 'catMaybes'. It puts all values from 557-- 'F.Foldable' into stream. 558-- 559-- Subject to fusion 560-- 561-- Since 1.0.6 562concat, concatC :: (Monad m, F.Foldable f) => ConduitT (f a) a m () 563concatC = awaitForever $ F.mapM_ yield 564{-# INLINE concatC #-} 565STREAMING0(concat, concatC, concatS) 566 567-- | Apply a transformation to all values in a stream, concatenating the output 568-- values. 569-- 570-- Subject to fusion 571-- 572-- Since 0.3.0 573concatMap, concatMapC :: Monad m => (a -> [b]) -> ConduitT a b m () 574concatMapC f = awaitForever $ sourceList . f 575{-# INLINE concatMapC #-} 576STREAMING(concatMap, concatMapC, concatMapS, f) 577 578-- | Apply a monadic transformation to all values in a stream, concatenating 579-- the output values. 580-- 581-- Subject to fusion 582-- 583-- Since 0.3.0 584concatMapM, concatMapMC :: Monad m => (a -> m [b]) -> ConduitT a b m () 585concatMapMC f = awaitForever $ sourceList <=< lift . f 586{-# INLINE concatMapMC #-} 587STREAMING(concatMapM, concatMapMC, concatMapMS, f) 588 589-- | 'concatMap' with a strict accumulator. 590-- 591-- Subject to fusion 592-- 593-- Since 0.3.0 594concatMapAccum, concatMapAccumC :: Monad m => (a -> accum -> (accum, [b])) -> accum -> ConduitT a b m () 595concatMapAccumC f x0 = void (mapAccum f x0) .| concat 596{-# INLINE concatMapAccumC #-} 597STREAMING(concatMapAccum, concatMapAccumC, concatMapAccumS, f x0) 598 599-- | Deprecated synonym for @mapAccum@ 600-- 601-- Since 1.0.6 602scanl :: Monad m => (a -> s -> (s, b)) -> s -> ConduitT a b m () 603scanl f s = void $ mapAccum f s 604{-# DEPRECATED scanl "Use mapAccum instead" #-} 605 606-- | Deprecated synonym for @mapAccumM@ 607-- 608-- Since 1.0.6 609scanlM :: Monad m => (a -> s -> m (s, b)) -> s -> ConduitT a b m () 610scanlM f s = void $ mapAccumM f s 611{-# DEPRECATED scanlM "Use mapAccumM instead" #-} 612 613-- | Analog of @mapAccumL@ for lists. Note that in contrast to @mapAccumL@, the function argument 614-- takes the accumulator as its second argument, not its first argument, and the accumulated value 615-- is strict. 616-- 617-- Subject to fusion 618-- 619-- Since 1.1.1 620mapAccum, mapAccumC :: Monad m => (a -> s -> (s, b)) -> s -> ConduitT a b m s 621mapAccumC f = 622 loop 623 where 624 loop !s = await >>= maybe (return s) go 625 where 626 go a = case f a s of 627 (s', b) -> yield b >> loop s' 628STREAMING(mapAccum, mapAccumC, mapAccumS, f s) 629 630-- | Monadic `mapAccum`. 631-- 632-- Subject to fusion 633-- 634-- Since 1.1.1 635mapAccumM, mapAccumMC :: Monad m => (a -> s -> m (s, b)) -> s -> ConduitT a b m s 636mapAccumMC f = 637 loop 638 where 639 loop !s = await >>= maybe (return s) go 640 where 641 go a = do (s', b) <- lift $ f a s 642 yield b 643 loop s' 644{-# INLINE mapAccumMC #-} 645STREAMING(mapAccumM, mapAccumMC, mapAccumMS, f s) 646 647-- | Analog of 'Prelude.scanl' for lists. 648-- 649-- Subject to fusion 650-- 651-- Since 1.1.1 652scan :: Monad m => (a -> b -> b) -> b -> ConduitT a b m b 653INLINE_RULE(scan, f, mapAccum (\a b -> let r = f a b in (r, r))) 654 655-- | Monadic @scanl@. 656-- 657-- Subject to fusion 658-- 659-- Since 1.1.1 660scanM :: Monad m => (a -> b -> m b) -> b -> ConduitT a b m b 661INLINE_RULE(scanM, f, mapAccumM (\a b -> f a b >>= \r -> return (r, r))) 662 663-- | 'concatMapM' with a strict accumulator. 664-- 665-- Subject to fusion 666-- 667-- Since 0.3.0 668concatMapAccumM, concatMapAccumMC :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> ConduitT a b m () 669concatMapAccumMC f x0 = void (mapAccumM f x0) .| concat 670{-# INLINE concatMapAccumMC #-} 671STREAMING(concatMapAccumM, concatMapAccumMC, concatMapAccumMS, f x0) 672 673-- | Generalization of 'mapMaybe' and 'concatMap'. It applies function 674-- to all values in a stream and send values inside resulting 675-- 'Foldable' downstream. 676-- 677-- Subject to fusion 678-- 679-- Since 1.0.6 680mapFoldable, mapFoldableC :: (Monad m, F.Foldable f) => (a -> f b) -> ConduitT a b m () 681mapFoldableC f = awaitForever $ F.mapM_ yield . f 682{-# INLINE mapFoldableC #-} 683STREAMING(mapFoldable, mapFoldableC, mapFoldableS, f) 684 685-- | Monadic variant of 'mapFoldable'. 686-- 687-- Subject to fusion 688-- 689-- Since 1.0.6 690mapFoldableM, mapFoldableMC :: (Monad m, F.Foldable f) => (a -> m (f b)) -> ConduitT a b m () 691mapFoldableMC f = awaitForever $ F.mapM_ yield <=< lift . f 692{-# INLINE mapFoldableMC #-} 693STREAMING(mapFoldableM, mapFoldableMC, mapFoldableMS, f) 694 695-- | Consume all values from the stream and return as a list. Note that this 696-- will pull all values into memory. 697-- 698-- Subject to fusion 699-- 700-- Since 0.3.0 701consume, consumeC :: Monad m => ConduitT a o m [a] 702consumeC = 703 loop id 704 where 705 loop front = await >>= maybe (return $ front []) (\x -> loop $ front . (x:)) 706{-# INLINE consumeC #-} 707STREAMING0(consume, consumeC, consumeS) 708 709-- | Group a stream into chunks of a given size. The last chunk may contain 710-- fewer than n elements. 711-- 712-- Subject to fusion 713-- 714-- Since 1.2.9 715chunksOf :: Monad m => Int -> ConduitT a [a] m () 716chunksOf n = if n > 0 then loop n id else error $ "chunksOf size must be positive (given " ++ show n ++ ")" 717 where 718 loop 0 rest = yield (rest []) >> loop n id 719 loop count rest = await >>= \ma -> case ma of 720 Nothing -> case rest [] of 721 [] -> return () 722 nonempty -> yield nonempty 723 Just a -> loop (count - 1) (rest . (a :)) 724 725-- | Grouping input according to an equality function. 726-- 727-- Subject to fusion 728-- 729-- Since 0.3.0 730groupBy, groupByC :: Monad m => (a -> a -> Bool) -> ConduitT a [a] m () 731groupByC f = 732 start 733 where 734 start = await >>= maybe (return ()) (loop id) 735 736 loop rest x = 737 await >>= maybe (yield (x : rest [])) go 738 where 739 go y 740 | f x y = loop (rest . (y:)) x 741 | otherwise = yield (x : rest []) >> loop id y 742STREAMING(groupBy, groupByC, groupByS, f) 743 744-- | 'groupOn1' is similar to @groupBy id@ 745-- 746-- returns a pair, indicating there are always 1 or more items in the grouping. 747-- This is designed to be converted into a NonEmpty structure 748-- but it avoids a dependency on another package 749-- 750-- > import Data.List.NonEmpty 751-- > 752-- > groupOn1 :: (Monad m, Eq b) => (a -> b) -> Conduit a m (NonEmpty a) 753-- > groupOn1 f = CL.groupOn1 f .| CL.map (uncurry (:|)) 754-- 755-- Subject to fusion 756-- 757-- Since 1.1.7 758groupOn1, groupOn1C :: (Monad m, Eq b) 759 => (a -> b) 760 -> ConduitT a (a, [a]) m () 761groupOn1C f = 762 start 763 where 764 start = await >>= maybe (return ()) (loop id) 765 766 loop rest x = 767 await >>= maybe (yield (x, rest [])) go 768 where 769 go y 770 | f x == f y = loop (rest . (y:)) x 771 | otherwise = yield (x, rest []) >> loop id y 772STREAMING(groupOn1, groupOn1C, groupOn1S, f) 773 774-- | Ensure that the inner sink consumes no more than the given number of 775-- values. Note this this does /not/ ensure that the sink consumes all of those 776-- values. To get the latter behavior, combine with 'sinkNull', e.g.: 777-- 778-- > src $$ do 779-- > x <- isolate count =$ do 780-- > x <- someSink 781-- > sinkNull 782-- > return x 783-- > someOtherSink 784-- > ... 785-- 786-- Subject to fusion 787-- 788-- Since 0.3.0 789isolate, isolateC :: Monad m => Int -> ConduitT a a m () 790isolateC = 791 loop 792 where 793 loop count | count <= 0 = return () 794 loop count = await >>= maybe (return ()) (\x -> yield x >> loop (count - 1)) 795STREAMING(isolate, isolateC, isolateS, count) 796 797-- | Keep only values in the stream passing a given predicate. 798-- 799-- Subject to fusion 800-- 801-- Since 0.3.0 802filter, filterC :: Monad m => (a -> Bool) -> ConduitT a a m () 803filterC f = awaitForever $ \i -> when (f i) (yield i) 804STREAMING(filter, filterC, filterS, f) 805 806filterFuseRight 807 :: Monad m 808 => ConduitT i o m () 809 -> (o -> Bool) 810 -> ConduitT i o m () 811filterFuseRight (CI.ConduitT src) f = CI.ConduitT $ \rest -> let 812 go (CI.Done ()) = rest () 813 go (CI.PipeM mp) = CI.PipeM (liftM go mp) 814 go (CI.Leftover p i) = CI.Leftover (go p) i 815 go (CI.HaveOutput p o) 816 | f o = CI.HaveOutput (go p) o 817 | otherwise = go p 818 go (CI.NeedInput p c) = CI.NeedInput (go . p) (go . c) 819 in go (src CI.Done) 820-- Intermediate finalizers are dropped, but this is acceptable: the next 821-- yielded value would be demanded by downstream in any event, and that new 822-- finalizer will always override the existing finalizer. 823{-# RULES "conduit: source/filter fusion .|" forall f src. src .| filter f = filterFuseRight src f #-} 824{-# INLINE filterFuseRight #-} 825 826-- | Ignore the remainder of values in the source. Particularly useful when 827-- combined with 'isolate'. 828-- 829-- Subject to fusion 830-- 831-- Since 0.3.0 832sinkNull, sinkNullC :: Monad m => ConduitT i o m () 833sinkNullC = awaitForever $ \_ -> return () 834{-# INLINE sinkNullC #-} 835STREAMING0(sinkNull, sinkNullC, sinkNullS) 836 837srcSinkNull :: Monad m => ConduitT () o m () -> m () 838srcSinkNull (CI.ConduitT src) = 839 go (src CI.Done) 840 where 841 go (CI.Done ()) = return () 842 go (CI.PipeM mp) = mp >>= go 843 go (CI.Leftover p ()) = go p 844 go (CI.HaveOutput p _) = go p 845 go (CI.NeedInput _ c) = go (c ()) 846{-# INLINE srcSinkNull #-} 847{-# RULES "conduit: connect to sinkNull" forall src. runConduit (src .| sinkNull) = srcSinkNull src #-} 848 849-- | A source that outputs no values. Note that this is just a type-restricted 850-- synonym for 'mempty'. 851-- 852-- Subject to fusion 853-- 854-- Since 0.3.0 855sourceNull, sourceNullC :: Monad m => ConduitT i o m () 856sourceNullC = return () 857{-# INLINE sourceNullC #-} 858STREAMING0(sourceNull, sourceNullC, sourceNullS) 859 860-- | Run a @Pipe@ repeatedly, and output its result value downstream. Stops 861-- when no more input is available from upstream. 862-- 863-- Since 0.5.0 864sequence :: Monad m 865 => ConduitT i o m o -- ^ @Pipe@ to run repeatedly 866 -> ConduitT i o m () 867sequence sink = 868 self 869 where 870 self = awaitForever $ \i -> leftover i >> sink >>= yield 871