1{-# LANGUAGE RankNTypes #-} 2{-# LANGUAGE BangPatterns #-} 3{-# LANGUAGE CPP #-} 4{-# LANGUAGE Trustworthy #-} 5-- | /NOTE/ It is recommended to start using "Data.Conduit.Combinators" instead 6-- of this module. 7-- 8-- Higher-level functions to interact with the elements of a stream. Most of 9-- these are based on list functions. 10-- 11-- For many purposes, it's recommended to use the conduit-combinators library, 12-- which provides a more complete set of functions. 13-- 14-- Note that these functions all deal with individual elements of a stream as a 15-- sort of \"black box\", where there is no introspection of the contained 16-- elements. Values such as @ByteString@ and @Text@ will likely need to be 17-- treated specially to deal with their contents properly (@Word8@ and @Char@, 18-- respectively). See the "Data.Conduit.Binary" and "Data.Conduit.Text" 19-- modules. 20module Data.Conduit.List 21 ( -- * Sources 22 sourceList 23 , sourceNull 24 , unfold 25 , unfoldEither 26 , unfoldM 27 , unfoldEitherM 28 , enumFromTo 29 , iterate 30 , replicate 31 , replicateM 32 -- * Sinks 33 -- ** Pure 34 , fold 35 , foldMap 36 , take 37 , drop 38 , head 39 , peek 40 , consume 41 , sinkNull 42 -- ** Monadic 43 , foldMapM 44 , foldM 45 , mapM_ 46 -- * Conduits 47 -- ** Pure 48 , map 49 , mapMaybe 50 , mapFoldable 51 , catMaybes 52 , concat 53 , concatMap 54 , concatMapAccum 55 , scanl 56 , scan 57 , mapAccum 58 , chunksOf 59 , groupBy 60 , groupOn1 61 , isolate 62 , filter 63 -- ** Monadic 64 , mapM 65 , iterM 66 , scanlM 67 , scanM 68 , mapAccumM 69 , mapMaybeM 70 , mapFoldableM 71 , concatMapM 72 , concatMapAccumM 73 -- * Misc 74 , sequence 75 ) where 76 77import qualified Prelude 78import Prelude 79 ( ($), return, (==), (-), Int 80 , (.), id, Maybe (..), Monad 81 , Either (..) 82 , Bool (..) 83 , (>>) 84 , (>>=) 85 , seq 86 , otherwise 87 , Enum, Eq 88 , maybe 89 , (<=) 90 , (>) 91 , error 92 , (++) 93 , show 94 ) 95import Data.Monoid (Monoid, mempty, mappend) 96import qualified Data.Foldable as F 97import Data.Conduit 98import Data.Conduit.Internal.Fusion 99import Data.Conduit.Internal.List.Stream 100import qualified Data.Conduit.Internal as CI 101import Control.Monad (when, (<=<), liftM, void) 102import Control.Monad.Trans.Class (lift) 103 104-- Defines INLINE_RULE0, INLINE_RULE, STREAMING0, and STREAMING. 105#include "fusion-macros.h" 106 107-- | Generate a source from a seed value. 108-- 109-- Subject to fusion 110-- 111-- Since 0.4.2 112unfold, unfoldC :: Monad m 113 => (b -> Maybe (a, b)) 114 -> b 115 -> ConduitT i a m () 116unfoldC f = 117 go 118 where 119 go seed = 120 case f seed of 121 Just (a, seed') -> yield a >> go seed' 122 Nothing -> return () 123{-# INLINE unfoldC #-} 124STREAMING(unfold, unfoldC, unfoldS, f x) 125 126-- | Generate a source from a seed value with a return value. 127-- 128-- Subject to fusion 129-- 130-- @since 1.2.11 131unfoldEither, unfoldEitherC :: Monad m 132 => (b -> Either r (a, b)) 133 -> b 134 -> ConduitT i a m r 135unfoldEitherC f = 136 go 137 where 138 go seed = 139 case f seed of 140 Right (a, seed') -> yield a >> go seed' 141 Left r -> return r 142{-# INLINE unfoldEitherC #-} 143STREAMING(unfoldEither, unfoldEitherC, unfoldEitherS, f x) 144 145-- | A monadic unfold. 146-- 147-- Subject to fusion 148-- 149-- Since 1.1.2 150unfoldM, unfoldMC :: Monad m 151 => (b -> m (Maybe (a, b))) 152 -> b 153 -> ConduitT i a m () 154unfoldMC f = 155 go 156 where 157 go seed = do 158 mres <- lift $ f seed 159 case mres of 160 Just (a, seed') -> yield a >> go seed' 161 Nothing -> return () 162STREAMING(unfoldM, unfoldMC, unfoldMS, f seed) 163 164-- | A monadic unfoldEither. 165-- 166-- Subject to fusion 167-- 168-- @since 1.2.11 169unfoldEitherM, unfoldEitherMC :: Monad m 170 => (b -> m (Either r (a, b))) 171 -> b 172 -> ConduitT i a m r 173unfoldEitherMC f = 174 go 175 where 176 go seed = do 177 mres <- lift $ f seed 178 case mres of 179 Right (a, seed') -> yield a >> go seed' 180 Left r -> return r 181STREAMING(unfoldEitherM, unfoldEitherMC, unfoldEitherMS, f seed) 182 183-- | Yield the values from the list. 184-- 185-- Subject to fusion 186sourceList, sourceListC :: Monad m => [a] -> ConduitT i a m () 187sourceListC = Prelude.mapM_ yield 188{-# INLINE sourceListC #-} 189STREAMING(sourceList, sourceListC, sourceListS, xs) 190 191-- | Enumerate from a value to a final value, inclusive, via 'succ'. 192-- 193-- This is generally more efficient than using @Prelude@\'s @enumFromTo@ and 194-- combining with @sourceList@ since this avoids any intermediate data 195-- structures. 196-- 197-- Subject to fusion 198-- 199-- Since 0.4.2 200enumFromTo, enumFromToC :: (Enum a, Prelude.Ord a, Monad m) 201 => a 202 -> a 203 -> ConduitT i a m () 204enumFromToC x0 y = 205 loop x0 206 where 207 loop x 208 | x Prelude.> y = return () 209 | otherwise = yield x >> loop (Prelude.succ x) 210{-# INLINE enumFromToC #-} 211STREAMING(enumFromTo, enumFromToC, enumFromToS, x0 y) 212 213-- | Produces an infinite stream of repeated applications of f to x. 214-- 215-- Subject to fusion 216-- 217iterate, iterateC :: Monad m => (a -> a) -> a -> ConduitT i a m () 218iterateC f = 219 go 220 where 221 go a = yield a >> go (f a) 222{-# INLINE iterateC #-} 223STREAMING(iterate, iterateC, iterateS, f a) 224 225-- | Replicate a single value the given number of times. 226-- 227-- Subject to fusion 228-- 229-- Since 1.2.0 230replicate, replicateC :: Monad m => Int -> a -> ConduitT i a m () 231replicateC cnt0 a = 232 loop cnt0 233 where 234 loop i 235 | i <= 0 = return () 236 | otherwise = yield a >> loop (i - 1) 237{-# INLINE replicateC #-} 238STREAMING(replicate, replicateC, replicateS, cnt0 a) 239 240-- | Replicate a monadic value the given number of times. 241-- 242-- Subject to fusion 243-- 244-- Since 1.2.0 245replicateM, replicateMC :: Monad m => Int -> m a -> ConduitT i a m () 246replicateMC cnt0 ma = 247 loop cnt0 248 where 249 loop i 250 | i <= 0 = return () 251 | otherwise = lift ma >>= yield >> loop (i - 1) 252{-# INLINE replicateMC #-} 253STREAMING(replicateM, replicateMC, replicateMS, cnt0 ma) 254 255-- | A strict left fold. 256-- 257-- Subject to fusion 258-- 259-- Since 0.3.0 260fold, foldC :: Monad m 261 => (b -> a -> b) 262 -> b 263 -> ConduitT a o m b 264foldC f = 265 loop 266 where 267 loop !accum = await >>= maybe (return accum) (loop . f accum) 268{-# INLINE foldC #-} 269STREAMING(fold, foldC, foldS, f accum) 270 271-- | A monadic strict left fold. 272-- 273-- Subject to fusion 274-- 275-- Since 0.3.0 276foldM, foldMC :: Monad m 277 => (b -> a -> m b) 278 -> b 279 -> ConduitT a o m b 280foldMC f = 281 loop 282 where 283 loop accum = do 284 await >>= maybe (return accum) go 285 where 286 go a = do 287 accum' <- lift $ f accum a 288 accum' `seq` loop accum' 289{-# INLINE foldMC #-} 290STREAMING(foldM, foldMC, foldMS, f accum) 291 292----------------------------------------------------------------- 293-- These are for cases where- for whatever reason- stream fusion cannot be 294-- applied. 295connectFold :: Monad m => ConduitT () a m () -> (b -> a -> b) -> b -> m b 296connectFold (CI.ConduitT src0) f = 297 go (src0 CI.Done) 298 where 299 go (CI.Done ()) b = return b 300 go (CI.HaveOutput src a) b = go src Prelude.$! f b a 301 go (CI.NeedInput _ c) b = go (c ()) b 302 go (CI.Leftover src ()) b = go src b 303 go (CI.PipeM msrc) b = do 304 src <- msrc 305 go src b 306{-# INLINE connectFold #-} 307{-# RULES "conduit: $$ fold" forall src f b. runConduit (src .| fold f b) = connectFold src f b #-} 308 309connectFoldM :: Monad m => ConduitT () a m () -> (b -> a -> m b) -> b -> m b 310connectFoldM (CI.ConduitT src0) f = 311 go (src0 CI.Done) 312 where 313 go (CI.Done ()) b = return b 314 go (CI.HaveOutput src a) b = do 315 !b' <- f b a 316 go src b' 317 go (CI.NeedInput _ c) b = go (c ()) b 318 go (CI.Leftover src ()) b = go src b 319 go (CI.PipeM msrc) b = do 320 src <- msrc 321 go src b 322{-# INLINE connectFoldM #-} 323{-# RULES "conduit: $$ foldM" forall src f b. runConduit (src .| foldM f b) = connectFoldM src f b #-} 324----------------------------------------------------------------- 325 326-- | A monoidal strict left fold. 327-- 328-- Subject to fusion 329-- 330-- Since 0.5.3 331foldMap :: (Monad m, Monoid b) 332 => (a -> b) 333 -> ConduitT a o m b 334INLINE_RULE(foldMap, f, let combiner accum = mappend accum . f in fold combiner mempty) 335 336-- | A monoidal strict left fold in a Monad. 337-- 338-- Since 1.0.8 339foldMapM :: (Monad m, Monoid b) 340 => (a -> m b) 341 -> ConduitT a o m b 342INLINE_RULE(foldMapM, f, let combiner accum = liftM (mappend accum) . f in foldM combiner mempty) 343 344-- | Apply the action to all values in the stream. 345-- 346-- Subject to fusion 347-- 348-- Since 0.3.0 349mapM_, mapM_C :: Monad m 350 => (a -> m ()) 351 -> ConduitT a o m () 352mapM_C f = awaitForever $ lift . f 353{-# INLINE mapM_C #-} 354STREAMING(mapM_, mapM_C, mapM_S, f) 355 356srcMapM_ :: Monad m => ConduitT () a m () -> (a -> m ()) -> m () 357srcMapM_ (CI.ConduitT src) f = 358 go (src CI.Done) 359 where 360 go (CI.Done ()) = return () 361 go (CI.PipeM mp) = mp >>= go 362 go (CI.Leftover p ()) = go p 363 go (CI.HaveOutput p o) = f o >> go p 364 go (CI.NeedInput _ c) = go (c ()) 365{-# INLINE srcMapM_ #-} 366{-# RULES "conduit: connect to mapM_" [2] forall f src. runConduit (src .| mapM_ f) = srcMapM_ src f #-} 367 368-- | Ignore a certain number of values in the stream. This function is 369-- semantically equivalent to: 370-- 371-- > drop i = take i >> return () 372-- 373-- However, @drop@ is more efficient as it does not need to hold values in 374-- memory. 375-- 376-- Subject to fusion 377-- 378-- Since 0.3.0 379drop, dropC :: Monad m 380 => Int 381 -> ConduitT a o m () 382dropC = 383 loop 384 where 385 loop i | i <= 0 = return () 386 loop count = await >>= maybe (return ()) (\_ -> loop (count - 1)) 387{-# INLINE dropC #-} 388STREAMING(drop, dropC, dropS, i) 389 390-- | Take some values from the stream and return as a list. If you want to 391-- instead create a conduit that pipes data to another sink, see 'isolate'. 392-- This function is semantically equivalent to: 393-- 394-- > take i = isolate i =$ consume 395-- 396-- Subject to fusion 397-- 398-- Since 0.3.0 399take, takeC :: Monad m 400 => Int 401 -> ConduitT a o m [a] 402takeC = 403 loop id 404 where 405 loop front count | count <= 0 = return $ front [] 406 loop front count = await >>= maybe 407 (return $ front []) 408 (\x -> loop (front . (x:)) (count - 1)) 409{-# INLINE takeC #-} 410STREAMING(take, takeC, takeS, i) 411 412-- | Take a single value from the stream, if available. 413-- 414-- Subject to fusion 415-- 416-- Since 0.3.0 417head, headC :: Monad m => ConduitT a o m (Maybe a) 418headC = await 419{-# INLINE headC #-} 420STREAMING0(head, headC, headS) 421 422-- | Look at the next value in the stream, if available. This function will not 423-- change the state of the stream. 424-- 425-- Since 0.3.0 426peek :: Monad m => ConduitT a o m (Maybe a) 427peek = await >>= maybe (return Nothing) (\x -> leftover x >> return (Just x)) 428 429-- | Apply a transformation to all values in a stream. 430-- 431-- Subject to fusion 432-- 433-- Since 0.3.0 434map, mapC :: Monad m => (a -> b) -> ConduitT a b m () 435mapC f = awaitForever $ yield . f 436{-# INLINE mapC #-} 437STREAMING(map, mapC, mapS, f) 438 439-- Since a Source never has any leftovers, fusion rules on it are safe. 440{- 441{-# RULES "conduit: source/map fusion .|" forall f src. src .| map f = mapFuseRight src f #-} 442 443mapFuseRight :: Monad m => Source m a -> (a -> b) -> Source m b 444mapFuseRight src f = CIC.mapOutput f src 445{-# INLINE mapFuseRight #-} 446-} 447 448{- 449 450It might be nice to include these rewrite rules, but they may have subtle 451differences based on leftovers. 452 453{-# RULES "conduit: map-to-mapOutput pipeL" forall f src. pipeL src (map f) = mapOutput f src #-} 454{-# RULES "conduit: map-to-mapOutput $=" forall f src. src $= (map f) = mapOutput f src #-} 455{-# RULES "conduit: map-to-mapOutput pipe" forall f src. pipe src (map f) = mapOutput f src #-} 456{-# RULES "conduit: map-to-mapOutput >+>" forall f src. src >+> (map f) = mapOutput f src #-} 457 458{-# RULES "conduit: map-to-mapInput pipeL" forall f sink. pipeL (map f) sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} 459{-# RULES "conduit: map-to-mapInput =$" forall f sink. map f =$ sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} 460{-# RULES "conduit: map-to-mapInput pipe" forall f sink. pipe (map f) sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} 461{-# RULES "conduit: map-to-mapInput >+>" forall f sink. map f >+> sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} 462 463{-# RULES "conduit: map-to-mapOutput .|" forall f con. con .| map f = mapOutput f con #-} 464{-# RULES "conduit: map-to-mapInput .|" forall f con. map f .| con = mapInput f (Prelude.const Prelude.Nothing) con #-} 465 466{-# INLINE [1] map #-} 467 468-} 469 470-- | Apply a monadic transformation to all values in a stream. 471-- 472-- If you do not need the transformed values, and instead just want the monadic 473-- side-effects of running the action, see 'mapM_'. 474-- 475-- Subject to fusion 476-- 477-- Since 0.3.0 478mapM, mapMC :: Monad m => (a -> m b) -> ConduitT a b m () 479mapMC f = awaitForever $ \a -> lift (f a) >>= yield 480{-# INLINE mapMC #-} 481STREAMING(mapM, mapMC, mapMS, f) 482 483-- | Apply a monadic action on all values in a stream. 484-- 485-- This @Conduit@ can be used to perform a monadic side-effect for every 486-- value, whilst passing the value through the @Conduit@ as-is. 487-- 488-- > iterM f = mapM (\a -> f a >>= \() -> return a) 489-- 490-- Subject to fusion 491-- 492-- Since 0.5.6 493iterM, iterMC :: Monad m => (a -> m ()) -> ConduitT a a m () 494iterMC f = awaitForever $ \a -> lift (f a) >> yield a 495{-# INLINE iterMC #-} 496STREAMING(iterM, iterMC, iterMS, f) 497 498-- | Apply a transformation that may fail to all values in a stream, discarding 499-- the failures. 500-- 501-- Subject to fusion 502-- 503-- Since 0.5.1 504mapMaybe, mapMaybeC :: Monad m => (a -> Maybe b) -> ConduitT a b m () 505mapMaybeC f = awaitForever $ maybe (return ()) yield . f 506{-# INLINE mapMaybeC #-} 507STREAMING(mapMaybe, mapMaybeC, mapMaybeS, f) 508 509-- | Apply a monadic transformation that may fail to all values in a stream, 510-- discarding the failures. 511-- 512-- Subject to fusion 513-- 514-- Since 0.5.1 515mapMaybeM, mapMaybeMC :: Monad m => (a -> m (Maybe b)) -> ConduitT a b m () 516mapMaybeMC f = awaitForever $ maybe (return ()) yield <=< lift . f 517{-# INLINE mapMaybeMC #-} 518STREAMING(mapMaybeM, mapMaybeMC, mapMaybeMS, f) 519 520-- | Filter the @Just@ values from a stream, discarding the @Nothing@ values. 521-- 522-- Subject to fusion 523-- 524-- Since 0.5.1 525catMaybes, catMaybesC :: Monad m => ConduitT (Maybe a) a m () 526catMaybesC = awaitForever $ maybe (return ()) yield 527{-# INLINE catMaybesC #-} 528STREAMING0(catMaybes, catMaybesC, catMaybesS) 529 530-- | Generalization of 'catMaybes'. It puts all values from 531-- 'F.Foldable' into stream. 532-- 533-- Subject to fusion 534-- 535-- Since 1.0.6 536concat, concatC :: (Monad m, F.Foldable f) => ConduitT (f a) a m () 537concatC = awaitForever $ F.mapM_ yield 538{-# INLINE concatC #-} 539STREAMING0(concat, concatC, concatS) 540 541-- | Apply a transformation to all values in a stream, concatenating the output 542-- values. 543-- 544-- Subject to fusion 545-- 546-- Since 0.3.0 547concatMap, concatMapC :: Monad m => (a -> [b]) -> ConduitT a b m () 548concatMapC f = awaitForever $ sourceList . f 549{-# INLINE concatMapC #-} 550STREAMING(concatMap, concatMapC, concatMapS, f) 551 552-- | Apply a monadic transformation to all values in a stream, concatenating 553-- the output values. 554-- 555-- Subject to fusion 556-- 557-- Since 0.3.0 558concatMapM, concatMapMC :: Monad m => (a -> m [b]) -> ConduitT a b m () 559concatMapMC f = awaitForever $ sourceList <=< lift . f 560{-# INLINE concatMapMC #-} 561STREAMING(concatMapM, concatMapMC, concatMapMS, f) 562 563-- | 'concatMap' with a strict accumulator. 564-- 565-- Subject to fusion 566-- 567-- Since 0.3.0 568concatMapAccum, concatMapAccumC :: Monad m => (a -> accum -> (accum, [b])) -> accum -> ConduitT a b m () 569concatMapAccumC f x0 = void (mapAccum f x0) .| concat 570{-# INLINE concatMapAccumC #-} 571STREAMING(concatMapAccum, concatMapAccumC, concatMapAccumS, f x0) 572 573-- | Deprecated synonym for @mapAccum@ 574-- 575-- Since 1.0.6 576scanl :: Monad m => (a -> s -> (s, b)) -> s -> ConduitT a b m () 577scanl f s = void $ mapAccum f s 578{-# DEPRECATED scanl "Use mapAccum instead" #-} 579 580-- | Deprecated synonym for @mapAccumM@ 581-- 582-- Since 1.0.6 583scanlM :: Monad m => (a -> s -> m (s, b)) -> s -> ConduitT a b m () 584scanlM f s = void $ mapAccumM f s 585{-# DEPRECATED scanlM "Use mapAccumM instead" #-} 586 587-- | Analog of @mapAccumL@ for lists. Note that in contrast to @mapAccumL@, the function argument 588-- takes the accumulator as its second argument, not its first argument, and the accumulated value 589-- is strict. 590-- 591-- Subject to fusion 592-- 593-- Since 1.1.1 594mapAccum, mapAccumC :: Monad m => (a -> s -> (s, b)) -> s -> ConduitT a b m s 595mapAccumC f = 596 loop 597 where 598 loop !s = await >>= maybe (return s) go 599 where 600 go a = case f a s of 601 (s', b) -> yield b >> loop s' 602STREAMING(mapAccum, mapAccumC, mapAccumS, f s) 603 604-- | Monadic `mapAccum`. 605-- 606-- Subject to fusion 607-- 608-- Since 1.1.1 609mapAccumM, mapAccumMC :: Monad m => (a -> s -> m (s, b)) -> s -> ConduitT a b m s 610mapAccumMC f = 611 loop 612 where 613 loop !s = await >>= maybe (return s) go 614 where 615 go a = do (s', b) <- lift $ f a s 616 yield b 617 loop s' 618{-# INLINE mapAccumMC #-} 619STREAMING(mapAccumM, mapAccumMC, mapAccumMS, f s) 620 621-- | Analog of 'Prelude.scanl' for lists. 622-- 623-- Subject to fusion 624-- 625-- Since 1.1.1 626scan :: Monad m => (a -> b -> b) -> b -> ConduitT a b m b 627INLINE_RULE(scan, f, mapAccum (\a b -> let r = f a b in (r, r))) 628 629-- | Monadic @scanl@. 630-- 631-- Subject to fusion 632-- 633-- Since 1.1.1 634scanM :: Monad m => (a -> b -> m b) -> b -> ConduitT a b m b 635INLINE_RULE(scanM, f, mapAccumM (\a b -> f a b >>= \r -> return (r, r))) 636 637-- | 'concatMapM' with a strict accumulator. 638-- 639-- Subject to fusion 640-- 641-- Since 0.3.0 642concatMapAccumM, concatMapAccumMC :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> ConduitT a b m () 643concatMapAccumMC f x0 = void (mapAccumM f x0) .| concat 644{-# INLINE concatMapAccumMC #-} 645STREAMING(concatMapAccumM, concatMapAccumMC, concatMapAccumMS, f x0) 646 647-- | Generalization of 'mapMaybe' and 'concatMap'. It applies function 648-- to all values in a stream and send values inside resulting 649-- 'Foldable' downstream. 650-- 651-- Subject to fusion 652-- 653-- Since 1.0.6 654mapFoldable, mapFoldableC :: (Monad m, F.Foldable f) => (a -> f b) -> ConduitT a b m () 655mapFoldableC f = awaitForever $ F.mapM_ yield . f 656{-# INLINE mapFoldableC #-} 657STREAMING(mapFoldable, mapFoldableC, mapFoldableS, f) 658 659-- | Monadic variant of 'mapFoldable'. 660-- 661-- Subject to fusion 662-- 663-- Since 1.0.6 664mapFoldableM, mapFoldableMC :: (Monad m, F.Foldable f) => (a -> m (f b)) -> ConduitT a b m () 665mapFoldableMC f = awaitForever $ F.mapM_ yield <=< lift . f 666{-# INLINE mapFoldableMC #-} 667STREAMING(mapFoldableM, mapFoldableMC, mapFoldableMS, f) 668 669-- | Consume all values from the stream and return as a list. Note that this 670-- will pull all values into memory. 671-- 672-- Subject to fusion 673-- 674-- Since 0.3.0 675consume, consumeC :: Monad m => ConduitT a o m [a] 676consumeC = 677 loop id 678 where 679 loop front = await >>= maybe (return $ front []) (\x -> loop $ front . (x:)) 680{-# INLINE consumeC #-} 681STREAMING0(consume, consumeC, consumeS) 682 683-- | Group a stream into chunks of a given size. The last chunk may contain 684-- fewer than n elements. 685-- 686-- Subject to fusion 687-- 688-- Since 1.2.9 689chunksOf :: Monad m => Int -> ConduitT a [a] m () 690chunksOf n = if n > 0 then loop n id else error $ "chunksOf size must be positive (given " ++ show n ++ ")" 691 where 692 loop 0 rest = yield (rest []) >> loop n id 693 loop count rest = await >>= \ma -> case ma of 694 Nothing -> case rest [] of 695 [] -> return () 696 nonempty -> yield nonempty 697 Just a -> loop (count - 1) (rest . (a :)) 698 699-- | Grouping input according to an equality function. 700-- 701-- Subject to fusion 702-- 703-- Since 0.3.0 704groupBy, groupByC :: Monad m => (a -> a -> Bool) -> ConduitT a [a] m () 705groupByC f = 706 start 707 where 708 start = await >>= maybe (return ()) (loop id) 709 710 loop rest x = 711 await >>= maybe (yield (x : rest [])) go 712 where 713 go y 714 | f x y = loop (rest . (y:)) x 715 | otherwise = yield (x : rest []) >> loop id y 716STREAMING(groupBy, groupByC, groupByS, f) 717 718-- | 'groupOn1' is similar to @groupBy id@ 719-- 720-- returns a pair, indicating there are always 1 or more items in the grouping. 721-- This is designed to be converted into a NonEmpty structure 722-- but it avoids a dependency on another package 723-- 724-- > import Data.List.NonEmpty 725-- > 726-- > groupOn1 :: (Monad m, Eq b) => (a -> b) -> Conduit a m (NonEmpty a) 727-- > groupOn1 f = CL.groupOn1 f .| CL.map (uncurry (:|)) 728-- 729-- Subject to fusion 730-- 731-- Since 1.1.7 732groupOn1, groupOn1C :: (Monad m, Eq b) 733 => (a -> b) 734 -> ConduitT a (a, [a]) m () 735groupOn1C f = 736 start 737 where 738 start = await >>= maybe (return ()) (loop id) 739 740 loop rest x = 741 await >>= maybe (yield (x, rest [])) go 742 where 743 go y 744 | f x == f y = loop (rest . (y:)) x 745 | otherwise = yield (x, rest []) >> loop id y 746STREAMING(groupOn1, groupOn1C, groupOn1S, f) 747 748-- | Ensure that the inner sink consumes no more than the given number of 749-- values. Note this this does /not/ ensure that the sink consumes all of those 750-- values. To get the latter behavior, combine with 'sinkNull', e.g.: 751-- 752-- > src $$ do 753-- > x <- isolate count =$ do 754-- > x <- someSink 755-- > sinkNull 756-- > return x 757-- > someOtherSink 758-- > ... 759-- 760-- Subject to fusion 761-- 762-- Since 0.3.0 763isolate, isolateC :: Monad m => Int -> ConduitT a a m () 764isolateC = 765 loop 766 where 767 loop count | count <= 0 = return () 768 loop count = await >>= maybe (return ()) (\x -> yield x >> loop (count - 1)) 769STREAMING(isolate, isolateC, isolateS, count) 770 771-- | Keep only values in the stream passing a given predicate. 772-- 773-- Subject to fusion 774-- 775-- Since 0.3.0 776filter, filterC :: Monad m => (a -> Bool) -> ConduitT a a m () 777filterC f = awaitForever $ \i -> when (f i) (yield i) 778STREAMING(filter, filterC, filterS, f) 779 780filterFuseRight 781 :: Monad m 782 => ConduitT i o m () 783 -> (o -> Bool) 784 -> ConduitT i o m () 785filterFuseRight (CI.ConduitT src) f = CI.ConduitT $ \rest -> let 786 go (CI.Done ()) = rest () 787 go (CI.PipeM mp) = CI.PipeM (liftM go mp) 788 go (CI.Leftover p i) = CI.Leftover (go p) i 789 go (CI.HaveOutput p o) 790 | f o = CI.HaveOutput (go p) o 791 | otherwise = go p 792 go (CI.NeedInput p c) = CI.NeedInput (go . p) (go . c) 793 in go (src CI.Done) 794-- Intermediate finalizers are dropped, but this is acceptable: the next 795-- yielded value would be demanded by downstream in any event, and that new 796-- finalizer will always override the existing finalizer. 797{-# RULES "conduit: source/filter fusion .|" forall f src. src .| filter f = filterFuseRight src f #-} 798{-# INLINE filterFuseRight #-} 799 800-- | Ignore the remainder of values in the source. Particularly useful when 801-- combined with 'isolate'. 802-- 803-- Subject to fusion 804-- 805-- Since 0.3.0 806sinkNull, sinkNullC :: Monad m => ConduitT i o m () 807sinkNullC = awaitForever $ \_ -> return () 808{-# INLINE sinkNullC #-} 809STREAMING0(sinkNull, sinkNullC, sinkNullS) 810 811srcSinkNull :: Monad m => ConduitT () o m () -> m () 812srcSinkNull (CI.ConduitT src) = 813 go (src CI.Done) 814 where 815 go (CI.Done ()) = return () 816 go (CI.PipeM mp) = mp >>= go 817 go (CI.Leftover p ()) = go p 818 go (CI.HaveOutput p _) = go p 819 go (CI.NeedInput _ c) = go (c ()) 820{-# INLINE srcSinkNull #-} 821{-# RULES "conduit: connect to sinkNull" forall src. runConduit (src .| sinkNull) = srcSinkNull src #-} 822 823-- | A source that outputs no values. Note that this is just a type-restricted 824-- synonym for 'mempty'. 825-- 826-- Subject to fusion 827-- 828-- Since 0.3.0 829sourceNull, sourceNullC :: Monad m => ConduitT i o m () 830sourceNullC = return () 831{-# INLINE sourceNullC #-} 832STREAMING0(sourceNull, sourceNullC, sourceNullS) 833 834-- | Run a @Pipe@ repeatedly, and output its result value downstream. Stops 835-- when no more input is available from upstream. 836-- 837-- Since 0.5.0 838sequence :: Monad m 839 => ConduitT i o m o -- ^ @Pipe@ to run repeatedly 840 -> ConduitT i o m () 841sequence sink = 842 self 843 where 844 self = awaitForever $ \i -> leftover i >> sink >>= yield 845