1{-# LANGUAGE RankNTypes #-}
2{-# LANGUAGE BangPatterns #-}
3{-# LANGUAGE CPP #-}
4{-# LANGUAGE Trustworthy #-}
5-- | /NOTE/ It is recommended to start using "Data.Conduit.Combinators" instead
6-- of this module.
7--
8-- Higher-level functions to interact with the elements of a stream. Most of
9-- these are based on list functions.
10--
11-- For many purposes, it's recommended to use the conduit-combinators library,
12-- which provides a more complete set of functions.
13--
14-- Note that these functions all deal with individual elements of a stream as a
15-- sort of \"black box\", where there is no introspection of the contained
16-- elements. Values such as @ByteString@ and @Text@ will likely need to be
17-- treated specially to deal with their contents properly (@Word8@ and @Char@,
18-- respectively). See the @Data.Conduit.Binary@ and @Data.Conduit.Text@
19-- modules in the @conduit-extra@ package.
20module Data.Conduit.List
21    ( -- * Sources
22      sourceList
23    , sourceNull
24    , unfold
25    , unfoldEither
26    , unfoldM
27    , unfoldEitherM
28    , enumFromTo
29    , iterate
30    , replicate
31    , replicateM
32      -- * Sinks
33      -- ** Pure
34    , fold
35    , foldMap
36    , uncons
37    , unconsEither
38    , take
39    , drop
40    , head
41    , peek
42    , consume
43    , sinkNull
44      -- ** Monadic
45    , foldMapM
46    , foldM
47    , unconsM
48    , unconsEitherM
49    , mapM_
50      -- * Conduits
51      -- ** Pure
52    , map
53    , mapMaybe
54    , mapFoldable
55    , catMaybes
56    , concat
57    , concatMap
58    , concatMapAccum
59    , scanl
60    , scan
61    , mapAccum
62    , chunksOf
63    , groupBy
64    , groupOn1
65    , isolate
66    , filter
67      -- ** Monadic
68    , mapM
69    , iterM
70    , scanlM
71    , scanM
72    , mapAccumM
73    , mapMaybeM
74    , mapFoldableM
75    , concatMapM
76    , concatMapAccumM
77      -- * Misc
78    , sequence
79    ) where
80
81import qualified Prelude
82import Prelude
83    ( ($), return, (==), (-), Int
84    , (.), id, Maybe (..), Monad
85    , Either (..)
86    , Bool (..)
87    , (>>)
88    , (>>=)
89    , seq
90    , otherwise
91    , Enum, Eq
92    , maybe
93    , (<=)
94    , (>)
95    , error
96    , (++)
97    , show
98    )
99import Data.Monoid (Monoid, mempty, mappend)
100import qualified Data.Foldable as F
101import Data.Conduit
102import Data.Conduit.Internal.Conduit (unconsM, unconsEitherM)
103import Data.Conduit.Internal.Fusion
104import Data.Conduit.Internal.List.Stream
105import qualified Data.Conduit.Internal as CI
106import Data.Functor.Identity (Identity (runIdentity))
107import Control.Monad (when, (<=<), liftM, void)
108import Control.Monad.Trans.Class (lift)
109
110-- Defines INLINE_RULE0, INLINE_RULE, STREAMING0, and STREAMING.
111#include "fusion-macros.h"
112
113-- | Generate a source from a seed value.
114--
115-- Subject to fusion
116--
117-- Since 0.4.2
118unfold, unfoldC :: Monad m
119                => (b -> Maybe (a, b))
120                -> b
121                -> ConduitT i a m ()
122unfoldC f =
123    go
124  where
125    go seed =
126        case f seed of
127            Just (a, seed') -> yield a >> go seed'
128            Nothing -> return ()
129{-# INLINE unfoldC #-}
130STREAMING(unfold, unfoldC, unfoldS, f x)
131
132-- | Generate a source from a seed value with a return value.
133--
134-- Subject to fusion
135--
136-- @since 1.2.11
137unfoldEither, unfoldEitherC :: Monad m
138                            => (b -> Either r (a, b))
139                            -> b
140                            -> ConduitT i a m r
141unfoldEitherC f =
142    go
143  where
144    go seed =
145        case f seed of
146            Right (a, seed') -> yield a >> go seed'
147            Left r -> return r
148{-# INLINE unfoldEitherC #-}
149STREAMING(unfoldEither, unfoldEitherC, unfoldEitherS, f x)
150
151-- | A monadic unfold.
152--
153-- Subject to fusion
154--
155-- Since 1.1.2
156unfoldM, unfoldMC :: Monad m
157                  => (b -> m (Maybe (a, b)))
158                  -> b
159                  -> ConduitT i a m ()
160unfoldMC f =
161    go
162  where
163    go seed = do
164        mres <- lift $ f seed
165        case mres of
166            Just (a, seed') -> yield a >> go seed'
167            Nothing -> return ()
168STREAMING(unfoldM, unfoldMC, unfoldMS, f seed)
169
170-- | A monadic unfoldEither.
171--
172-- Subject to fusion
173--
174-- @since 1.2.11
175unfoldEitherM, unfoldEitherMC :: Monad m
176                              => (b -> m (Either r (a, b)))
177                              -> b
178                              -> ConduitT i a m r
179unfoldEitherMC f =
180    go
181  where
182    go seed = do
183        mres <- lift $ f seed
184        case mres of
185            Right (a, seed') -> yield a >> go seed'
186            Left r -> return r
187STREAMING(unfoldEitherM, unfoldEitherMC, unfoldEitherMS, f seed)
188
189-- | Split a pure conduit into head and tail.
190-- This is equivalent to @runIdentity . unconsM@.
191--
192-- Note that you have to 'sealConduitT' it first.
193--
194-- Since 1.3.3
195uncons :: SealedConduitT () o Identity ()
196       -> Maybe (o, SealedConduitT () o Identity ())
197uncons = runIdentity . unconsM
198
199-- | Split a pure conduit into head and tail or return its result if it is done.
200-- This is equivalent to @runIdentity . unconsEitherM@.
201--
202-- Note that you have to 'sealConduitT' it first.
203--
204-- Since 1.3.3
205unconsEither :: SealedConduitT () o Identity r
206             -> Either r (o, SealedConduitT () o Identity r)
207unconsEither = runIdentity . unconsEitherM
208
209-- | Yield the values from the list.
210--
211-- Subject to fusion
212sourceList, sourceListC :: Monad m => [a] -> ConduitT i a m ()
213sourceListC = Prelude.mapM_ yield
214{-# INLINE sourceListC #-}
215STREAMING(sourceList, sourceListC, sourceListS, xs)
216
217-- | Enumerate from a value to a final value, inclusive, via 'succ'.
218--
219-- This is generally more efficient than using @Prelude@\'s @enumFromTo@ and
220-- combining with @sourceList@ since this avoids any intermediate data
221-- structures.
222--
223-- Subject to fusion
224--
225-- Since 0.4.2
226enumFromTo, enumFromToC :: (Enum a, Prelude.Ord a, Monad m)
227                        => a
228                        -> a
229                        -> ConduitT i a m ()
230enumFromToC x0 y =
231    loop x0
232  where
233    loop x
234        | x Prelude.> y = return ()
235        | otherwise = yield x >> loop (Prelude.succ x)
236{-# INLINE enumFromToC #-}
237STREAMING(enumFromTo, enumFromToC, enumFromToS, x0 y)
238
239-- | Produces an infinite stream of repeated applications of f to x.
240--
241-- Subject to fusion
242--
243iterate, iterateC :: Monad m => (a -> a) -> a -> ConduitT i a m ()
244iterateC f =
245    go
246  where
247    go a = yield a >> go (f a)
248{-# INLINE iterateC #-}
249STREAMING(iterate, iterateC, iterateS, f a)
250
251-- | Replicate a single value the given number of times.
252--
253-- Subject to fusion
254--
255-- Since 1.2.0
256replicate, replicateC :: Monad m => Int -> a -> ConduitT i a m ()
257replicateC cnt0 a =
258    loop cnt0
259  where
260    loop i
261        | i <= 0 = return ()
262        | otherwise = yield a >> loop (i - 1)
263{-# INLINE replicateC #-}
264STREAMING(replicate, replicateC, replicateS, cnt0 a)
265
266-- | Replicate a monadic value the given number of times.
267--
268-- Subject to fusion
269--
270-- Since 1.2.0
271replicateM, replicateMC :: Monad m => Int -> m a -> ConduitT i a m ()
272replicateMC cnt0 ma =
273    loop cnt0
274  where
275    loop i
276        | i <= 0 = return ()
277        | otherwise = lift ma >>= yield >> loop (i - 1)
278{-# INLINE replicateMC #-}
279STREAMING(replicateM, replicateMC, replicateMS, cnt0 ma)
280
281-- | A strict left fold.
282--
283-- Subject to fusion
284--
285-- Since 0.3.0
286fold, foldC :: Monad m
287            => (b -> a -> b)
288            -> b
289            -> ConduitT a o m b
290foldC f =
291    loop
292  where
293    loop !accum = await >>= maybe (return accum) (loop . f accum)
294{-# INLINE foldC #-}
295STREAMING(fold, foldC, foldS, f accum)
296
297-- | A monadic strict left fold.
298--
299-- Subject to fusion
300--
301-- Since 0.3.0
302foldM, foldMC :: Monad m
303              => (b -> a -> m b)
304              -> b
305              -> ConduitT a o m b
306foldMC f =
307    loop
308  where
309    loop accum = do
310        await >>= maybe (return accum) go
311      where
312        go a = do
313            accum' <- lift $ f accum a
314            accum' `seq` loop accum'
315{-# INLINE foldMC #-}
316STREAMING(foldM, foldMC, foldMS, f accum)
317
318-----------------------------------------------------------------
319-- These are for cases where- for whatever reason- stream fusion cannot be
320-- applied.
321connectFold :: Monad m => ConduitT () a m () -> (b -> a -> b) -> b -> m b
322connectFold (CI.ConduitT src0) f =
323    go (src0 CI.Done)
324  where
325    go (CI.Done ()) b = return b
326    go (CI.HaveOutput src a) b = go src Prelude.$! f b a
327    go (CI.NeedInput _ c) b = go (c ()) b
328    go (CI.Leftover src ()) b = go src b
329    go (CI.PipeM msrc) b = do
330        src <- msrc
331        go src b
332{-# INLINE connectFold #-}
333{-# RULES "conduit: $$ fold" forall src f b. runConduit (src .| fold f b) = connectFold src f b #-}
334
335connectFoldM :: Monad m => ConduitT () a m () -> (b -> a -> m b) -> b -> m b
336connectFoldM (CI.ConduitT src0) f =
337    go (src0 CI.Done)
338  where
339    go (CI.Done ()) b = return b
340    go (CI.HaveOutput src a) b = do
341        !b' <- f b a
342        go src b'
343    go (CI.NeedInput _ c) b = go (c ()) b
344    go (CI.Leftover src ()) b = go src b
345    go (CI.PipeM msrc) b = do
346        src <- msrc
347        go src b
348{-# INLINE connectFoldM #-}
349{-# RULES "conduit: $$ foldM" forall src f b. runConduit (src .| foldM f b) = connectFoldM src f b #-}
350-----------------------------------------------------------------
351
352-- | A monoidal strict left fold.
353--
354-- Subject to fusion
355--
356-- Since 0.5.3
357foldMap :: (Monad m, Monoid b)
358        => (a -> b)
359        -> ConduitT a o m b
360INLINE_RULE(foldMap, f, let combiner accum = mappend accum . f in fold combiner mempty)
361
362-- | A monoidal strict left fold in a Monad.
363--
364-- Since 1.0.8
365foldMapM :: (Monad m, Monoid b)
366        => (a -> m b)
367        -> ConduitT a o m b
368INLINE_RULE(foldMapM, f, let combiner accum = liftM (mappend accum) . f in foldM combiner mempty)
369
370-- | Apply the action to all values in the stream.
371--
372-- Subject to fusion
373--
374-- Since 0.3.0
375mapM_, mapM_C :: Monad m
376              => (a -> m ())
377              -> ConduitT a o m ()
378mapM_C f = awaitForever $ lift . f
379{-# INLINE mapM_C #-}
380STREAMING(mapM_, mapM_C, mapM_S, f)
381
382srcMapM_ :: Monad m => ConduitT () a m () -> (a -> m ()) -> m ()
383srcMapM_ (CI.ConduitT src) f =
384    go (src CI.Done)
385  where
386    go (CI.Done ()) = return ()
387    go (CI.PipeM mp) = mp >>= go
388    go (CI.Leftover p ()) = go p
389    go (CI.HaveOutput p o) = f o >> go p
390    go (CI.NeedInput _ c) = go (c ())
391{-# INLINE srcMapM_ #-}
392{-# RULES "conduit: connect to mapM_" [2] forall f src. runConduit (src .| mapM_ f) = srcMapM_ src f #-}
393
394-- | Ignore a certain number of values in the stream. This function is
395-- semantically equivalent to:
396--
397-- > drop i = take i >> return ()
398--
399-- However, @drop@ is more efficient as it does not need to hold values in
400-- memory.
401--
402-- Subject to fusion
403--
404-- Since 0.3.0
405drop, dropC :: Monad m
406            => Int
407            -> ConduitT a o m ()
408dropC =
409    loop
410  where
411    loop i | i <= 0 = return ()
412    loop count = await >>= maybe (return ()) (\_ -> loop (count - 1))
413{-# INLINE dropC #-}
414STREAMING(drop, dropC, dropS, i)
415
416-- | Take some values from the stream and return as a list. If you want to
417-- instead create a conduit that pipes data to another sink, see 'isolate'.
418-- This function is semantically equivalent to:
419--
420-- > take i = isolate i =$ consume
421--
422-- Subject to fusion
423--
424-- Since 0.3.0
425take, takeC :: Monad m
426            => Int
427            -> ConduitT a o m [a]
428takeC =
429    loop id
430  where
431    loop front count | count <= 0 = return $ front []
432    loop front count = await >>= maybe
433        (return $ front [])
434        (\x -> loop (front . (x:)) (count - 1))
435{-# INLINE takeC #-}
436STREAMING(take, takeC, takeS, i)
437
438-- | Take a single value from the stream, if available.
439--
440-- Subject to fusion
441--
442-- Since 0.3.0
443head, headC :: Monad m => ConduitT a o m (Maybe a)
444headC = await
445{-# INLINE headC #-}
446STREAMING0(head, headC, headS)
447
448-- | Look at the next value in the stream, if available. This function will not
449-- change the state of the stream.
450--
451-- Since 0.3.0
452peek :: Monad m => ConduitT a o m (Maybe a)
453peek = await >>= maybe (return Nothing) (\x -> leftover x >> return (Just x))
454
455-- | Apply a transformation to all values in a stream.
456--
457-- Subject to fusion
458--
459-- Since 0.3.0
460map, mapC :: Monad m => (a -> b) -> ConduitT a b m ()
461mapC f = awaitForever $ yield . f
462{-# INLINE mapC #-}
463STREAMING(map, mapC, mapS, f)
464
465-- Since a Source never has any leftovers, fusion rules on it are safe.
466{-
467{-# RULES "conduit: source/map fusion .|" forall f src. src .| map f = mapFuseRight src f #-}
468
469mapFuseRight :: Monad m => Source m a -> (a -> b) -> Source m b
470mapFuseRight src f = CIC.mapOutput f src
471{-# INLINE mapFuseRight #-}
472-}
473
474{-
475
476It might be nice to include these rewrite rules, but they may have subtle
477differences based on leftovers.
478
479{-# RULES "conduit: map-to-mapOutput pipeL" forall f src. pipeL src (map f) = mapOutput f src #-}
480{-# RULES "conduit: map-to-mapOutput $=" forall f src. src $= (map f) = mapOutput f src #-}
481{-# RULES "conduit: map-to-mapOutput pipe" forall f src. pipe src (map f) = mapOutput f src #-}
482{-# RULES "conduit: map-to-mapOutput >+>" forall f src. src >+> (map f) = mapOutput f src #-}
483
484{-# RULES "conduit: map-to-mapInput pipeL" forall f sink. pipeL (map f) sink = mapInput f (Prelude.const Prelude.Nothing) sink #-}
485{-# RULES "conduit: map-to-mapInput =$" forall f sink. map f =$ sink = mapInput f (Prelude.const Prelude.Nothing) sink #-}
486{-# RULES "conduit: map-to-mapInput pipe" forall f sink. pipe (map f) sink = mapInput f (Prelude.const Prelude.Nothing) sink #-}
487{-# RULES "conduit: map-to-mapInput >+>" forall f sink. map f >+> sink = mapInput f (Prelude.const Prelude.Nothing) sink #-}
488
489{-# RULES "conduit: map-to-mapOutput .|" forall f con. con .| map f = mapOutput f con #-}
490{-# RULES "conduit: map-to-mapInput .|" forall f con. map f .| con = mapInput f (Prelude.const Prelude.Nothing) con #-}
491
492{-# INLINE [1] map #-}
493
494-}
495
496-- | Apply a monadic transformation to all values in a stream.
497--
498-- If you do not need the transformed values, and instead just want the monadic
499-- side-effects of running the action, see 'mapM_'.
500--
501-- Subject to fusion
502--
503-- Since 0.3.0
504mapM, mapMC :: Monad m => (a -> m b) -> ConduitT a b m ()
505mapMC f = awaitForever $ \a -> lift (f a) >>= yield
506{-# INLINE mapMC #-}
507STREAMING(mapM, mapMC, mapMS, f)
508
509-- | Apply a monadic action on all values in a stream.
510--
511-- This @Conduit@ can be used to perform a monadic side-effect for every
512-- value, whilst passing the value through the @Conduit@ as-is.
513--
514-- > iterM f = mapM (\a -> f a >>= \() -> return a)
515--
516-- Subject to fusion
517--
518-- Since 0.5.6
519iterM, iterMC :: Monad m => (a -> m ()) -> ConduitT a a m ()
520iterMC f = awaitForever $ \a -> lift (f a) >> yield a
521{-# INLINE iterMC #-}
522STREAMING(iterM, iterMC, iterMS, f)
523
524-- | Apply a transformation that may fail to all values in a stream, discarding
525-- the failures.
526--
527-- Subject to fusion
528--
529-- Since 0.5.1
530mapMaybe, mapMaybeC :: Monad m => (a -> Maybe b) -> ConduitT a b m ()
531mapMaybeC f = awaitForever $ maybe (return ()) yield . f
532{-# INLINE mapMaybeC #-}
533STREAMING(mapMaybe, mapMaybeC, mapMaybeS, f)
534
535-- | Apply a monadic transformation that may fail to all values in a stream,
536-- discarding the failures.
537--
538-- Subject to fusion
539--
540-- Since 0.5.1
541mapMaybeM, mapMaybeMC :: Monad m => (a -> m (Maybe b)) -> ConduitT a b m ()
542mapMaybeMC f = awaitForever $ maybe (return ()) yield <=< lift . f
543{-# INLINE mapMaybeMC #-}
544STREAMING(mapMaybeM, mapMaybeMC, mapMaybeMS, f)
545
546-- | Filter the @Just@ values from a stream, discarding the @Nothing@  values.
547--
548-- Subject to fusion
549--
550-- Since 0.5.1
551catMaybes, catMaybesC :: Monad m => ConduitT (Maybe a) a m ()
552catMaybesC = awaitForever $ maybe (return ()) yield
553{-# INLINE catMaybesC #-}
554STREAMING0(catMaybes, catMaybesC, catMaybesS)
555
556-- | Generalization of 'catMaybes'. It puts all values from
557--   'F.Foldable' into stream.
558--
559-- Subject to fusion
560--
561-- Since 1.0.6
562concat, concatC :: (Monad m, F.Foldable f) => ConduitT (f a) a m ()
563concatC = awaitForever $ F.mapM_ yield
564{-# INLINE concatC #-}
565STREAMING0(concat, concatC, concatS)
566
567-- | Apply a transformation to all values in a stream, concatenating the output
568-- values.
569--
570-- Subject to fusion
571--
572-- Since 0.3.0
573concatMap, concatMapC :: Monad m => (a -> [b]) -> ConduitT a b m ()
574concatMapC f = awaitForever $ sourceList . f
575{-# INLINE concatMapC #-}
576STREAMING(concatMap, concatMapC, concatMapS, f)
577
578-- | Apply a monadic transformation to all values in a stream, concatenating
579-- the output values.
580--
581-- Subject to fusion
582--
583-- Since 0.3.0
584concatMapM, concatMapMC :: Monad m => (a -> m [b]) -> ConduitT a b m ()
585concatMapMC f = awaitForever $ sourceList <=< lift . f
586{-# INLINE concatMapMC #-}
587STREAMING(concatMapM, concatMapMC, concatMapMS, f)
588
589-- | 'concatMap' with a strict accumulator.
590--
591-- Subject to fusion
592--
593-- Since 0.3.0
594concatMapAccum, concatMapAccumC :: Monad m => (a -> accum -> (accum, [b])) -> accum -> ConduitT a b m ()
595concatMapAccumC f x0 = void (mapAccum f x0) .| concat
596{-# INLINE concatMapAccumC #-}
597STREAMING(concatMapAccum, concatMapAccumC, concatMapAccumS, f x0)
598
599-- | Deprecated synonym for @mapAccum@
600--
601-- Since 1.0.6
602scanl :: Monad m => (a -> s -> (s, b)) -> s -> ConduitT a b m ()
603scanl f s = void $ mapAccum f s
604{-# DEPRECATED scanl "Use mapAccum instead" #-}
605
606-- | Deprecated synonym for @mapAccumM@
607--
608-- Since 1.0.6
609scanlM :: Monad m => (a -> s -> m (s, b)) -> s -> ConduitT a b m ()
610scanlM f s = void $ mapAccumM f s
611{-# DEPRECATED scanlM "Use mapAccumM instead" #-}
612
613-- | Analog of @mapAccumL@ for lists. Note that in contrast to @mapAccumL@, the function argument
614--   takes the accumulator as its second argument, not its first argument, and the accumulated value
615--   is strict.
616--
617-- Subject to fusion
618--
619-- Since 1.1.1
620mapAccum, mapAccumC :: Monad m => (a -> s -> (s, b)) -> s -> ConduitT a b m s
621mapAccumC f =
622    loop
623  where
624    loop !s = await >>= maybe (return s) go
625      where
626        go a = case f a s of
627                 (s', b) -> yield b >> loop s'
628STREAMING(mapAccum, mapAccumC, mapAccumS, f s)
629
630-- | Monadic `mapAccum`.
631--
632-- Subject to fusion
633--
634-- Since 1.1.1
635mapAccumM, mapAccumMC :: Monad m => (a -> s -> m (s, b)) -> s -> ConduitT a b m s
636mapAccumMC f =
637    loop
638  where
639    loop !s = await >>= maybe (return s) go
640      where
641        go a = do (s', b) <- lift $ f a s
642                  yield b
643                  loop s'
644{-# INLINE mapAccumMC #-}
645STREAMING(mapAccumM, mapAccumMC, mapAccumMS, f s)
646
647-- | Analog of 'Prelude.scanl' for lists.
648--
649-- Subject to fusion
650--
651-- Since 1.1.1
652scan :: Monad m => (a -> b -> b) -> b -> ConduitT a b m b
653INLINE_RULE(scan, f, mapAccum (\a b -> let r = f a b in (r, r)))
654
655-- | Monadic @scanl@.
656--
657-- Subject to fusion
658--
659-- Since 1.1.1
660scanM :: Monad m => (a -> b -> m b) -> b -> ConduitT a b m b
661INLINE_RULE(scanM, f, mapAccumM (\a b -> f a b >>= \r -> return (r, r)))
662
663-- | 'concatMapM' with a strict accumulator.
664--
665-- Subject to fusion
666--
667-- Since 0.3.0
668concatMapAccumM, concatMapAccumMC :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> ConduitT a b m ()
669concatMapAccumMC f x0 = void (mapAccumM f x0) .| concat
670{-# INLINE concatMapAccumMC #-}
671STREAMING(concatMapAccumM, concatMapAccumMC, concatMapAccumMS, f x0)
672
673-- | Generalization of 'mapMaybe' and 'concatMap'. It applies function
674-- to all values in a stream and send values inside resulting
675-- 'Foldable' downstream.
676--
677-- Subject to fusion
678--
679-- Since 1.0.6
680mapFoldable, mapFoldableC :: (Monad m, F.Foldable f) => (a -> f b) -> ConduitT a b m ()
681mapFoldableC f = awaitForever $ F.mapM_ yield . f
682{-# INLINE mapFoldableC #-}
683STREAMING(mapFoldable, mapFoldableC, mapFoldableS, f)
684
685-- | Monadic variant of 'mapFoldable'.
686--
687-- Subject to fusion
688--
689-- Since 1.0.6
690mapFoldableM, mapFoldableMC :: (Monad m, F.Foldable f) => (a -> m (f b)) -> ConduitT a b m ()
691mapFoldableMC f = awaitForever $ F.mapM_ yield <=< lift . f
692{-# INLINE mapFoldableMC #-}
693STREAMING(mapFoldableM, mapFoldableMC, mapFoldableMS, f)
694
695-- | Consume all values from the stream and return as a list. Note that this
696-- will pull all values into memory.
697--
698-- Subject to fusion
699--
700-- Since 0.3.0
701consume, consumeC :: Monad m => ConduitT a o m [a]
702consumeC =
703    loop id
704  where
705    loop front = await >>= maybe (return $ front []) (\x -> loop $ front . (x:))
706{-# INLINE consumeC #-}
707STREAMING0(consume, consumeC, consumeS)
708
709-- | Group a stream into chunks of a given size. The last chunk may contain
710-- fewer than n elements.
711--
712-- Subject to fusion
713--
714-- Since 1.2.9
715chunksOf :: Monad m => Int -> ConduitT a [a] m ()
716chunksOf n = if n > 0 then loop n id else error $ "chunksOf size must be positive (given " ++ show n ++ ")"
717  where
718    loop 0 rest = yield (rest []) >> loop n id
719    loop count rest = await >>= \ma -> case ma of
720      Nothing -> case rest [] of
721        [] -> return ()
722        nonempty -> yield nonempty
723      Just a -> loop (count - 1) (rest . (a :))
724
725-- | Grouping input according to an equality function.
726--
727-- Subject to fusion
728--
729-- Since 0.3.0
730groupBy, groupByC :: Monad m => (a -> a -> Bool) -> ConduitT a [a] m ()
731groupByC f =
732    start
733  where
734    start = await >>= maybe (return ()) (loop id)
735
736    loop rest x =
737        await >>= maybe (yield (x : rest [])) go
738      where
739        go y
740            | f x y     = loop (rest . (y:)) x
741            | otherwise = yield (x : rest []) >> loop id y
742STREAMING(groupBy, groupByC, groupByS, f)
743
744-- | 'groupOn1' is similar to @groupBy id@
745--
746-- returns a pair, indicating there are always 1 or more items in the grouping.
747-- This is designed to be converted into a NonEmpty structure
748-- but it avoids a dependency on another package
749--
750-- > import Data.List.NonEmpty
751-- >
752-- > groupOn1 :: (Monad m, Eq b) => (a -> b) -> Conduit a m (NonEmpty a)
753-- > groupOn1 f = CL.groupOn1 f .| CL.map (uncurry (:|))
754--
755-- Subject to fusion
756--
757-- Since 1.1.7
758groupOn1, groupOn1C :: (Monad m, Eq b)
759                     => (a -> b)
760                     -> ConduitT a (a, [a]) m ()
761groupOn1C f =
762    start
763  where
764    start = await >>= maybe (return ()) (loop id)
765
766    loop rest x =
767        await >>= maybe (yield (x, rest [])) go
768      where
769        go y
770            | f x == f y = loop (rest . (y:)) x
771            | otherwise  = yield (x, rest []) >> loop id y
772STREAMING(groupOn1, groupOn1C, groupOn1S, f)
773
774-- | Ensure that the inner sink consumes no more than the given number of
775-- values. Note this this does /not/ ensure that the sink consumes all of those
776-- values. To get the latter behavior, combine with 'sinkNull', e.g.:
777--
778-- > src $$ do
779-- >     x <- isolate count =$ do
780-- >         x <- someSink
781-- >         sinkNull
782-- >         return x
783-- >     someOtherSink
784-- >     ...
785--
786-- Subject to fusion
787--
788-- Since 0.3.0
789isolate, isolateC :: Monad m => Int -> ConduitT a a m ()
790isolateC =
791    loop
792  where
793    loop count | count <= 0 = return ()
794    loop count = await >>= maybe (return ()) (\x -> yield x >> loop (count - 1))
795STREAMING(isolate, isolateC, isolateS, count)
796
797-- | Keep only values in the stream passing a given predicate.
798--
799-- Subject to fusion
800--
801-- Since 0.3.0
802filter, filterC :: Monad m => (a -> Bool) -> ConduitT a a m ()
803filterC f = awaitForever $ \i -> when (f i) (yield i)
804STREAMING(filter, filterC, filterS, f)
805
806filterFuseRight
807  :: Monad m
808  => ConduitT i o m ()
809  -> (o -> Bool)
810  -> ConduitT i o m ()
811filterFuseRight (CI.ConduitT src) f = CI.ConduitT $ \rest -> let
812    go (CI.Done ()) = rest ()
813    go (CI.PipeM mp) = CI.PipeM (liftM go mp)
814    go (CI.Leftover p i) = CI.Leftover (go p) i
815    go (CI.HaveOutput p o)
816        | f o = CI.HaveOutput (go p) o
817        | otherwise = go p
818    go (CI.NeedInput p c) = CI.NeedInput (go . p) (go . c)
819    in go (src CI.Done)
820-- Intermediate finalizers are dropped, but this is acceptable: the next
821-- yielded value would be demanded by downstream in any event, and that new
822-- finalizer will always override the existing finalizer.
823{-# RULES "conduit: source/filter fusion .|" forall f src. src .| filter f = filterFuseRight src f #-}
824{-# INLINE filterFuseRight #-}
825
826-- | Ignore the remainder of values in the source. Particularly useful when
827-- combined with 'isolate'.
828--
829-- Subject to fusion
830--
831-- Since 0.3.0
832sinkNull, sinkNullC :: Monad m => ConduitT i o m ()
833sinkNullC = awaitForever $ \_ -> return ()
834{-# INLINE sinkNullC #-}
835STREAMING0(sinkNull, sinkNullC, sinkNullS)
836
837srcSinkNull :: Monad m => ConduitT () o m () -> m ()
838srcSinkNull (CI.ConduitT src) =
839    go (src CI.Done)
840  where
841    go (CI.Done ()) = return ()
842    go (CI.PipeM mp) = mp >>= go
843    go (CI.Leftover p ()) = go p
844    go (CI.HaveOutput p _) = go p
845    go (CI.NeedInput _ c) = go (c ())
846{-# INLINE srcSinkNull #-}
847{-# RULES "conduit: connect to sinkNull" forall src. runConduit (src .| sinkNull) = srcSinkNull src #-}
848
849-- | A source that outputs no values. Note that this is just a type-restricted
850-- synonym for 'mempty'.
851--
852-- Subject to fusion
853--
854-- Since 0.3.0
855sourceNull, sourceNullC :: Monad m => ConduitT i o m ()
856sourceNullC = return ()
857{-# INLINE sourceNullC #-}
858STREAMING0(sourceNull, sourceNullC, sourceNullS)
859
860-- | Run a @Pipe@ repeatedly, and output its result value downstream. Stops
861-- when no more input is available from upstream.
862--
863-- Since 0.5.0
864sequence :: Monad m
865         => ConduitT i o m o -- ^ @Pipe@ to run repeatedly
866         -> ConduitT i o m ()
867sequence sink =
868    self
869  where
870    self = awaitForever $ \i -> leftover i >> sink >>= yield
871