1{-# LANGUAGE RankNTypes #-}
2{-# LANGUAGE BangPatterns #-}
3{-# LANGUAGE CPP #-}
4{-# LANGUAGE Trustworthy #-}
5-- | /NOTE/ It is recommended to start using "Data.Conduit.Combinators" instead
6-- of this module.
7--
8-- Higher-level functions to interact with the elements of a stream. Most of
9-- these are based on list functions.
10--
11-- For many purposes, it's recommended to use the conduit-combinators library,
12-- which provides a more complete set of functions.
13--
14-- Note that these functions all deal with individual elements of a stream as a
15-- sort of \"black box\", where there is no introspection of the contained
16-- elements. Values such as @ByteString@ and @Text@ will likely need to be
17-- treated specially to deal with their contents properly (@Word8@ and @Char@,
18-- respectively). See the "Data.Conduit.Binary" and "Data.Conduit.Text"
19-- modules.
20module Data.Conduit.List
21    ( -- * Sources
22      sourceList
23    , sourceNull
24    , unfold
25    , unfoldEither
26    , unfoldM
27    , unfoldEitherM
28    , enumFromTo
29    , iterate
30    , replicate
31    , replicateM
32      -- * Sinks
33      -- ** Pure
34    , fold
35    , foldMap
36    , take
37    , drop
38    , head
39    , peek
40    , consume
41    , sinkNull
42      -- ** Monadic
43    , foldMapM
44    , foldM
45    , mapM_
46      -- * Conduits
47      -- ** Pure
48    , map
49    , mapMaybe
50    , mapFoldable
51    , catMaybes
52    , concat
53    , concatMap
54    , concatMapAccum
55    , scanl
56    , scan
57    , mapAccum
58    , chunksOf
59    , groupBy
60    , groupOn1
61    , isolate
62    , filter
63      -- ** Monadic
64    , mapM
65    , iterM
66    , scanlM
67    , scanM
68    , mapAccumM
69    , mapMaybeM
70    , mapFoldableM
71    , concatMapM
72    , concatMapAccumM
73      -- * Misc
74    , sequence
75    ) where
76
77import qualified Prelude
78import Prelude
79    ( ($), return, (==), (-), Int
80    , (.), id, Maybe (..), Monad
81    , Either (..)
82    , Bool (..)
83    , (>>)
84    , (>>=)
85    , seq
86    , otherwise
87    , Enum, Eq
88    , maybe
89    , (<=)
90    , (>)
91    , error
92    , (++)
93    , show
94    )
95import Data.Monoid (Monoid, mempty, mappend)
96import qualified Data.Foldable as F
97import Data.Conduit
98import Data.Conduit.Internal.Fusion
99import Data.Conduit.Internal.List.Stream
100import qualified Data.Conduit.Internal as CI
101import Control.Monad (when, (<=<), liftM, void)
102import Control.Monad.Trans.Class (lift)
103
104-- Defines INLINE_RULE0, INLINE_RULE, STREAMING0, and STREAMING.
105#include "fusion-macros.h"
106
107-- | Generate a source from a seed value.
108--
109-- Subject to fusion
110--
111-- Since 0.4.2
112unfold, unfoldC :: Monad m
113                => (b -> Maybe (a, b))
114                -> b
115                -> ConduitT i a m ()
116unfoldC f =
117    go
118  where
119    go seed =
120        case f seed of
121            Just (a, seed') -> yield a >> go seed'
122            Nothing -> return ()
123{-# INLINE unfoldC #-}
124STREAMING(unfold, unfoldC, unfoldS, f x)
125
126-- | Generate a source from a seed value with a return value.
127--
128-- Subject to fusion
129--
130-- @since 1.2.11
131unfoldEither, unfoldEitherC :: Monad m
132                            => (b -> Either r (a, b))
133                            -> b
134                            -> ConduitT i a m r
135unfoldEitherC f =
136    go
137  where
138    go seed =
139        case f seed of
140            Right (a, seed') -> yield a >> go seed'
141            Left r -> return r
142{-# INLINE unfoldEitherC #-}
143STREAMING(unfoldEither, unfoldEitherC, unfoldEitherS, f x)
144
145-- | A monadic unfold.
146--
147-- Subject to fusion
148--
149-- Since 1.1.2
150unfoldM, unfoldMC :: Monad m
151                  => (b -> m (Maybe (a, b)))
152                  -> b
153                  -> ConduitT i a m ()
154unfoldMC f =
155    go
156  where
157    go seed = do
158        mres <- lift $ f seed
159        case mres of
160            Just (a, seed') -> yield a >> go seed'
161            Nothing -> return ()
162STREAMING(unfoldM, unfoldMC, unfoldMS, f seed)
163
164-- | A monadic unfoldEither.
165--
166-- Subject to fusion
167--
168-- @since 1.2.11
169unfoldEitherM, unfoldEitherMC :: Monad m
170                              => (b -> m (Either r (a, b)))
171                              -> b
172                              -> ConduitT i a m r
173unfoldEitherMC f =
174    go
175  where
176    go seed = do
177        mres <- lift $ f seed
178        case mres of
179            Right (a, seed') -> yield a >> go seed'
180            Left r -> return r
181STREAMING(unfoldEitherM, unfoldEitherMC, unfoldEitherMS, f seed)
182
183-- | Yield the values from the list.
184--
185-- Subject to fusion
186sourceList, sourceListC :: Monad m => [a] -> ConduitT i a m ()
187sourceListC = Prelude.mapM_ yield
188{-# INLINE sourceListC #-}
189STREAMING(sourceList, sourceListC, sourceListS, xs)
190
191-- | Enumerate from a value to a final value, inclusive, via 'succ'.
192--
193-- This is generally more efficient than using @Prelude@\'s @enumFromTo@ and
194-- combining with @sourceList@ since this avoids any intermediate data
195-- structures.
196--
197-- Subject to fusion
198--
199-- Since 0.4.2
200enumFromTo, enumFromToC :: (Enum a, Prelude.Ord a, Monad m)
201                        => a
202                        -> a
203                        -> ConduitT i a m ()
204enumFromToC x0 y =
205    loop x0
206  where
207    loop x
208        | x Prelude.> y = return ()
209        | otherwise = yield x >> loop (Prelude.succ x)
210{-# INLINE enumFromToC #-}
211STREAMING(enumFromTo, enumFromToC, enumFromToS, x0 y)
212
213-- | Produces an infinite stream of repeated applications of f to x.
214--
215-- Subject to fusion
216--
217iterate, iterateC :: Monad m => (a -> a) -> a -> ConduitT i a m ()
218iterateC f =
219    go
220  where
221    go a = yield a >> go (f a)
222{-# INLINE iterateC #-}
223STREAMING(iterate, iterateC, iterateS, f a)
224
225-- | Replicate a single value the given number of times.
226--
227-- Subject to fusion
228--
229-- Since 1.2.0
230replicate, replicateC :: Monad m => Int -> a -> ConduitT i a m ()
231replicateC cnt0 a =
232    loop cnt0
233  where
234    loop i
235        | i <= 0 = return ()
236        | otherwise = yield a >> loop (i - 1)
237{-# INLINE replicateC #-}
238STREAMING(replicate, replicateC, replicateS, cnt0 a)
239
240-- | Replicate a monadic value the given number of times.
241--
242-- Subject to fusion
243--
244-- Since 1.2.0
245replicateM, replicateMC :: Monad m => Int -> m a -> ConduitT i a m ()
246replicateMC cnt0 ma =
247    loop cnt0
248  where
249    loop i
250        | i <= 0 = return ()
251        | otherwise = lift ma >>= yield >> loop (i - 1)
252{-# INLINE replicateMC #-}
253STREAMING(replicateM, replicateMC, replicateMS, cnt0 ma)
254
255-- | A strict left fold.
256--
257-- Subject to fusion
258--
259-- Since 0.3.0
260fold, foldC :: Monad m
261            => (b -> a -> b)
262            -> b
263            -> ConduitT a o m b
264foldC f =
265    loop
266  where
267    loop !accum = await >>= maybe (return accum) (loop . f accum)
268{-# INLINE foldC #-}
269STREAMING(fold, foldC, foldS, f accum)
270
271-- | A monadic strict left fold.
272--
273-- Subject to fusion
274--
275-- Since 0.3.0
276foldM, foldMC :: Monad m
277              => (b -> a -> m b)
278              -> b
279              -> ConduitT a o m b
280foldMC f =
281    loop
282  where
283    loop accum = do
284        await >>= maybe (return accum) go
285      where
286        go a = do
287            accum' <- lift $ f accum a
288            accum' `seq` loop accum'
289{-# INLINE foldMC #-}
290STREAMING(foldM, foldMC, foldMS, f accum)
291
292-----------------------------------------------------------------
293-- These are for cases where- for whatever reason- stream fusion cannot be
294-- applied.
295connectFold :: Monad m => ConduitT () a m () -> (b -> a -> b) -> b -> m b
296connectFold (CI.ConduitT src0) f =
297    go (src0 CI.Done)
298  where
299    go (CI.Done ()) b = return b
300    go (CI.HaveOutput src a) b = go src Prelude.$! f b a
301    go (CI.NeedInput _ c) b = go (c ()) b
302    go (CI.Leftover src ()) b = go src b
303    go (CI.PipeM msrc) b = do
304        src <- msrc
305        go src b
306{-# INLINE connectFold #-}
307{-# RULES "conduit: $$ fold" forall src f b. runConduit (src .| fold f b) = connectFold src f b #-}
308
309connectFoldM :: Monad m => ConduitT () a m () -> (b -> a -> m b) -> b -> m b
310connectFoldM (CI.ConduitT src0) f =
311    go (src0 CI.Done)
312  where
313    go (CI.Done ()) b = return b
314    go (CI.HaveOutput src a) b = do
315        !b' <- f b a
316        go src b'
317    go (CI.NeedInput _ c) b = go (c ()) b
318    go (CI.Leftover src ()) b = go src b
319    go (CI.PipeM msrc) b = do
320        src <- msrc
321        go src b
322{-# INLINE connectFoldM #-}
323{-# RULES "conduit: $$ foldM" forall src f b. runConduit (src .| foldM f b) = connectFoldM src f b #-}
324-----------------------------------------------------------------
325
326-- | A monoidal strict left fold.
327--
328-- Subject to fusion
329--
330-- Since 0.5.3
331foldMap :: (Monad m, Monoid b)
332        => (a -> b)
333        -> ConduitT a o m b
334INLINE_RULE(foldMap, f, let combiner accum = mappend accum . f in fold combiner mempty)
335
336-- | A monoidal strict left fold in a Monad.
337--
338-- Since 1.0.8
339foldMapM :: (Monad m, Monoid b)
340        => (a -> m b)
341        -> ConduitT a o m b
342INLINE_RULE(foldMapM, f, let combiner accum = liftM (mappend accum) . f in foldM combiner mempty)
343
344-- | Apply the action to all values in the stream.
345--
346-- Subject to fusion
347--
348-- Since 0.3.0
349mapM_, mapM_C :: Monad m
350              => (a -> m ())
351              -> ConduitT a o m ()
352mapM_C f = awaitForever $ lift . f
353{-# INLINE mapM_C #-}
354STREAMING(mapM_, mapM_C, mapM_S, f)
355
356srcMapM_ :: Monad m => ConduitT () a m () -> (a -> m ()) -> m ()
357srcMapM_ (CI.ConduitT src) f =
358    go (src CI.Done)
359  where
360    go (CI.Done ()) = return ()
361    go (CI.PipeM mp) = mp >>= go
362    go (CI.Leftover p ()) = go p
363    go (CI.HaveOutput p o) = f o >> go p
364    go (CI.NeedInput _ c) = go (c ())
365{-# INLINE srcMapM_ #-}
366{-# RULES "conduit: connect to mapM_" [2] forall f src. runConduit (src .| mapM_ f) = srcMapM_ src f #-}
367
368-- | Ignore a certain number of values in the stream. This function is
369-- semantically equivalent to:
370--
371-- > drop i = take i >> return ()
372--
373-- However, @drop@ is more efficient as it does not need to hold values in
374-- memory.
375--
376-- Subject to fusion
377--
378-- Since 0.3.0
379drop, dropC :: Monad m
380            => Int
381            -> ConduitT a o m ()
382dropC =
383    loop
384  where
385    loop i | i <= 0 = return ()
386    loop count = await >>= maybe (return ()) (\_ -> loop (count - 1))
387{-# INLINE dropC #-}
388STREAMING(drop, dropC, dropS, i)
389
390-- | Take some values from the stream and return as a list. If you want to
391-- instead create a conduit that pipes data to another sink, see 'isolate'.
392-- This function is semantically equivalent to:
393--
394-- > take i = isolate i =$ consume
395--
396-- Subject to fusion
397--
398-- Since 0.3.0
399take, takeC :: Monad m
400            => Int
401            -> ConduitT a o m [a]
402takeC =
403    loop id
404  where
405    loop front count | count <= 0 = return $ front []
406    loop front count = await >>= maybe
407        (return $ front [])
408        (\x -> loop (front . (x:)) (count - 1))
409{-# INLINE takeC #-}
410STREAMING(take, takeC, takeS, i)
411
412-- | Take a single value from the stream, if available.
413--
414-- Subject to fusion
415--
416-- Since 0.3.0
417head, headC :: Monad m => ConduitT a o m (Maybe a)
418headC = await
419{-# INLINE headC #-}
420STREAMING0(head, headC, headS)
421
422-- | Look at the next value in the stream, if available. This function will not
423-- change the state of the stream.
424--
425-- Since 0.3.0
426peek :: Monad m => ConduitT a o m (Maybe a)
427peek = await >>= maybe (return Nothing) (\x -> leftover x >> return (Just x))
428
429-- | Apply a transformation to all values in a stream.
430--
431-- Subject to fusion
432--
433-- Since 0.3.0
434map, mapC :: Monad m => (a -> b) -> ConduitT a b m ()
435mapC f = awaitForever $ yield . f
436{-# INLINE mapC #-}
437STREAMING(map, mapC, mapS, f)
438
439-- Since a Source never has any leftovers, fusion rules on it are safe.
440{-
441{-# RULES "conduit: source/map fusion .|" forall f src. src .| map f = mapFuseRight src f #-}
442
443mapFuseRight :: Monad m => Source m a -> (a -> b) -> Source m b
444mapFuseRight src f = CIC.mapOutput f src
445{-# INLINE mapFuseRight #-}
446-}
447
448{-
449
450It might be nice to include these rewrite rules, but they may have subtle
451differences based on leftovers.
452
453{-# RULES "conduit: map-to-mapOutput pipeL" forall f src. pipeL src (map f) = mapOutput f src #-}
454{-# RULES "conduit: map-to-mapOutput $=" forall f src. src $= (map f) = mapOutput f src #-}
455{-# RULES "conduit: map-to-mapOutput pipe" forall f src. pipe src (map f) = mapOutput f src #-}
456{-# RULES "conduit: map-to-mapOutput >+>" forall f src. src >+> (map f) = mapOutput f src #-}
457
458{-# RULES "conduit: map-to-mapInput pipeL" forall f sink. pipeL (map f) sink = mapInput f (Prelude.const Prelude.Nothing) sink #-}
459{-# RULES "conduit: map-to-mapInput =$" forall f sink. map f =$ sink = mapInput f (Prelude.const Prelude.Nothing) sink #-}
460{-# RULES "conduit: map-to-mapInput pipe" forall f sink. pipe (map f) sink = mapInput f (Prelude.const Prelude.Nothing) sink #-}
461{-# RULES "conduit: map-to-mapInput >+>" forall f sink. map f >+> sink = mapInput f (Prelude.const Prelude.Nothing) sink #-}
462
463{-# RULES "conduit: map-to-mapOutput .|" forall f con. con .| map f = mapOutput f con #-}
464{-# RULES "conduit: map-to-mapInput .|" forall f con. map f .| con = mapInput f (Prelude.const Prelude.Nothing) con #-}
465
466{-# INLINE [1] map #-}
467
468-}
469
470-- | Apply a monadic transformation to all values in a stream.
471--
472-- If you do not need the transformed values, and instead just want the monadic
473-- side-effects of running the action, see 'mapM_'.
474--
475-- Subject to fusion
476--
477-- Since 0.3.0
478mapM, mapMC :: Monad m => (a -> m b) -> ConduitT a b m ()
479mapMC f = awaitForever $ \a -> lift (f a) >>= yield
480{-# INLINE mapMC #-}
481STREAMING(mapM, mapMC, mapMS, f)
482
483-- | Apply a monadic action on all values in a stream.
484--
485-- This @Conduit@ can be used to perform a monadic side-effect for every
486-- value, whilst passing the value through the @Conduit@ as-is.
487--
488-- > iterM f = mapM (\a -> f a >>= \() -> return a)
489--
490-- Subject to fusion
491--
492-- Since 0.5.6
493iterM, iterMC :: Monad m => (a -> m ()) -> ConduitT a a m ()
494iterMC f = awaitForever $ \a -> lift (f a) >> yield a
495{-# INLINE iterMC #-}
496STREAMING(iterM, iterMC, iterMS, f)
497
498-- | Apply a transformation that may fail to all values in a stream, discarding
499-- the failures.
500--
501-- Subject to fusion
502--
503-- Since 0.5.1
504mapMaybe, mapMaybeC :: Monad m => (a -> Maybe b) -> ConduitT a b m ()
505mapMaybeC f = awaitForever $ maybe (return ()) yield . f
506{-# INLINE mapMaybeC #-}
507STREAMING(mapMaybe, mapMaybeC, mapMaybeS, f)
508
509-- | Apply a monadic transformation that may fail to all values in a stream,
510-- discarding the failures.
511--
512-- Subject to fusion
513--
514-- Since 0.5.1
515mapMaybeM, mapMaybeMC :: Monad m => (a -> m (Maybe b)) -> ConduitT a b m ()
516mapMaybeMC f = awaitForever $ maybe (return ()) yield <=< lift . f
517{-# INLINE mapMaybeMC #-}
518STREAMING(mapMaybeM, mapMaybeMC, mapMaybeMS, f)
519
520-- | Filter the @Just@ values from a stream, discarding the @Nothing@  values.
521--
522-- Subject to fusion
523--
524-- Since 0.5.1
525catMaybes, catMaybesC :: Monad m => ConduitT (Maybe a) a m ()
526catMaybesC = awaitForever $ maybe (return ()) yield
527{-# INLINE catMaybesC #-}
528STREAMING0(catMaybes, catMaybesC, catMaybesS)
529
530-- | Generalization of 'catMaybes'. It puts all values from
531--   'F.Foldable' into stream.
532--
533-- Subject to fusion
534--
535-- Since 1.0.6
536concat, concatC :: (Monad m, F.Foldable f) => ConduitT (f a) a m ()
537concatC = awaitForever $ F.mapM_ yield
538{-# INLINE concatC #-}
539STREAMING0(concat, concatC, concatS)
540
541-- | Apply a transformation to all values in a stream, concatenating the output
542-- values.
543--
544-- Subject to fusion
545--
546-- Since 0.3.0
547concatMap, concatMapC :: Monad m => (a -> [b]) -> ConduitT a b m ()
548concatMapC f = awaitForever $ sourceList . f
549{-# INLINE concatMapC #-}
550STREAMING(concatMap, concatMapC, concatMapS, f)
551
552-- | Apply a monadic transformation to all values in a stream, concatenating
553-- the output values.
554--
555-- Subject to fusion
556--
557-- Since 0.3.0
558concatMapM, concatMapMC :: Monad m => (a -> m [b]) -> ConduitT a b m ()
559concatMapMC f = awaitForever $ sourceList <=< lift . f
560{-# INLINE concatMapMC #-}
561STREAMING(concatMapM, concatMapMC, concatMapMS, f)
562
563-- | 'concatMap' with a strict accumulator.
564--
565-- Subject to fusion
566--
567-- Since 0.3.0
568concatMapAccum, concatMapAccumC :: Monad m => (a -> accum -> (accum, [b])) -> accum -> ConduitT a b m ()
569concatMapAccumC f x0 = void (mapAccum f x0) .| concat
570{-# INLINE concatMapAccumC #-}
571STREAMING(concatMapAccum, concatMapAccumC, concatMapAccumS, f x0)
572
573-- | Deprecated synonym for @mapAccum@
574--
575-- Since 1.0.6
576scanl :: Monad m => (a -> s -> (s, b)) -> s -> ConduitT a b m ()
577scanl f s = void $ mapAccum f s
578{-# DEPRECATED scanl "Use mapAccum instead" #-}
579
580-- | Deprecated synonym for @mapAccumM@
581--
582-- Since 1.0.6
583scanlM :: Monad m => (a -> s -> m (s, b)) -> s -> ConduitT a b m ()
584scanlM f s = void $ mapAccumM f s
585{-# DEPRECATED scanlM "Use mapAccumM instead" #-}
586
587-- | Analog of @mapAccumL@ for lists. Note that in contrast to @mapAccumL@, the function argument
588--   takes the accumulator as its second argument, not its first argument, and the accumulated value
589--   is strict.
590--
591-- Subject to fusion
592--
593-- Since 1.1.1
594mapAccum, mapAccumC :: Monad m => (a -> s -> (s, b)) -> s -> ConduitT a b m s
595mapAccumC f =
596    loop
597  where
598    loop !s = await >>= maybe (return s) go
599      where
600        go a = case f a s of
601                 (s', b) -> yield b >> loop s'
602STREAMING(mapAccum, mapAccumC, mapAccumS, f s)
603
604-- | Monadic `mapAccum`.
605--
606-- Subject to fusion
607--
608-- Since 1.1.1
609mapAccumM, mapAccumMC :: Monad m => (a -> s -> m (s, b)) -> s -> ConduitT a b m s
610mapAccumMC f =
611    loop
612  where
613    loop !s = await >>= maybe (return s) go
614      where
615        go a = do (s', b) <- lift $ f a s
616                  yield b
617                  loop s'
618{-# INLINE mapAccumMC #-}
619STREAMING(mapAccumM, mapAccumMC, mapAccumMS, f s)
620
621-- | Analog of 'Prelude.scanl' for lists.
622--
623-- Subject to fusion
624--
625-- Since 1.1.1
626scan :: Monad m => (a -> b -> b) -> b -> ConduitT a b m b
627INLINE_RULE(scan, f, mapAccum (\a b -> let r = f a b in (r, r)))
628
629-- | Monadic @scanl@.
630--
631-- Subject to fusion
632--
633-- Since 1.1.1
634scanM :: Monad m => (a -> b -> m b) -> b -> ConduitT a b m b
635INLINE_RULE(scanM, f, mapAccumM (\a b -> f a b >>= \r -> return (r, r)))
636
637-- | 'concatMapM' with a strict accumulator.
638--
639-- Subject to fusion
640--
641-- Since 0.3.0
642concatMapAccumM, concatMapAccumMC :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> ConduitT a b m ()
643concatMapAccumMC f x0 = void (mapAccumM f x0) .| concat
644{-# INLINE concatMapAccumMC #-}
645STREAMING(concatMapAccumM, concatMapAccumMC, concatMapAccumMS, f x0)
646
647-- | Generalization of 'mapMaybe' and 'concatMap'. It applies function
648-- to all values in a stream and send values inside resulting
649-- 'Foldable' downstream.
650--
651-- Subject to fusion
652--
653-- Since 1.0.6
654mapFoldable, mapFoldableC :: (Monad m, F.Foldable f) => (a -> f b) -> ConduitT a b m ()
655mapFoldableC f = awaitForever $ F.mapM_ yield . f
656{-# INLINE mapFoldableC #-}
657STREAMING(mapFoldable, mapFoldableC, mapFoldableS, f)
658
659-- | Monadic variant of 'mapFoldable'.
660--
661-- Subject to fusion
662--
663-- Since 1.0.6
664mapFoldableM, mapFoldableMC :: (Monad m, F.Foldable f) => (a -> m (f b)) -> ConduitT a b m ()
665mapFoldableMC f = awaitForever $ F.mapM_ yield <=< lift . f
666{-# INLINE mapFoldableMC #-}
667STREAMING(mapFoldableM, mapFoldableMC, mapFoldableMS, f)
668
669-- | Consume all values from the stream and return as a list. Note that this
670-- will pull all values into memory.
671--
672-- Subject to fusion
673--
674-- Since 0.3.0
675consume, consumeC :: Monad m => ConduitT a o m [a]
676consumeC =
677    loop id
678  where
679    loop front = await >>= maybe (return $ front []) (\x -> loop $ front . (x:))
680{-# INLINE consumeC #-}
681STREAMING0(consume, consumeC, consumeS)
682
683-- | Group a stream into chunks of a given size. The last chunk may contain
684-- fewer than n elements.
685--
686-- Subject to fusion
687--
688-- Since 1.2.9
689chunksOf :: Monad m => Int -> ConduitT a [a] m ()
690chunksOf n = if n > 0 then loop n id else error $ "chunksOf size must be positive (given " ++ show n ++ ")"
691  where
692    loop 0 rest = yield (rest []) >> loop n id
693    loop count rest = await >>= \ma -> case ma of
694      Nothing -> case rest [] of
695        [] -> return ()
696        nonempty -> yield nonempty
697      Just a -> loop (count - 1) (rest . (a :))
698
699-- | Grouping input according to an equality function.
700--
701-- Subject to fusion
702--
703-- Since 0.3.0
704groupBy, groupByC :: Monad m => (a -> a -> Bool) -> ConduitT a [a] m ()
705groupByC f =
706    start
707  where
708    start = await >>= maybe (return ()) (loop id)
709
710    loop rest x =
711        await >>= maybe (yield (x : rest [])) go
712      where
713        go y
714            | f x y     = loop (rest . (y:)) x
715            | otherwise = yield (x : rest []) >> loop id y
716STREAMING(groupBy, groupByC, groupByS, f)
717
718-- | 'groupOn1' is similar to @groupBy id@
719--
720-- returns a pair, indicating there are always 1 or more items in the grouping.
721-- This is designed to be converted into a NonEmpty structure
722-- but it avoids a dependency on another package
723--
724-- > import Data.List.NonEmpty
725-- >
726-- > groupOn1 :: (Monad m, Eq b) => (a -> b) -> Conduit a m (NonEmpty a)
727-- > groupOn1 f = CL.groupOn1 f .| CL.map (uncurry (:|))
728--
729-- Subject to fusion
730--
731-- Since 1.1.7
732groupOn1, groupOn1C :: (Monad m, Eq b)
733                     => (a -> b)
734                     -> ConduitT a (a, [a]) m ()
735groupOn1C f =
736    start
737  where
738    start = await >>= maybe (return ()) (loop id)
739
740    loop rest x =
741        await >>= maybe (yield (x, rest [])) go
742      where
743        go y
744            | f x == f y = loop (rest . (y:)) x
745            | otherwise  = yield (x, rest []) >> loop id y
746STREAMING(groupOn1, groupOn1C, groupOn1S, f)
747
748-- | Ensure that the inner sink consumes no more than the given number of
749-- values. Note this this does /not/ ensure that the sink consumes all of those
750-- values. To get the latter behavior, combine with 'sinkNull', e.g.:
751--
752-- > src $$ do
753-- >     x <- isolate count =$ do
754-- >         x <- someSink
755-- >         sinkNull
756-- >         return x
757-- >     someOtherSink
758-- >     ...
759--
760-- Subject to fusion
761--
762-- Since 0.3.0
763isolate, isolateC :: Monad m => Int -> ConduitT a a m ()
764isolateC =
765    loop
766  where
767    loop count | count <= 0 = return ()
768    loop count = await >>= maybe (return ()) (\x -> yield x >> loop (count - 1))
769STREAMING(isolate, isolateC, isolateS, count)
770
771-- | Keep only values in the stream passing a given predicate.
772--
773-- Subject to fusion
774--
775-- Since 0.3.0
776filter, filterC :: Monad m => (a -> Bool) -> ConduitT a a m ()
777filterC f = awaitForever $ \i -> when (f i) (yield i)
778STREAMING(filter, filterC, filterS, f)
779
780filterFuseRight
781  :: Monad m
782  => ConduitT i o m ()
783  -> (o -> Bool)
784  -> ConduitT i o m ()
785filterFuseRight (CI.ConduitT src) f = CI.ConduitT $ \rest -> let
786    go (CI.Done ()) = rest ()
787    go (CI.PipeM mp) = CI.PipeM (liftM go mp)
788    go (CI.Leftover p i) = CI.Leftover (go p) i
789    go (CI.HaveOutput p o)
790        | f o = CI.HaveOutput (go p) o
791        | otherwise = go p
792    go (CI.NeedInput p c) = CI.NeedInput (go . p) (go . c)
793    in go (src CI.Done)
794-- Intermediate finalizers are dropped, but this is acceptable: the next
795-- yielded value would be demanded by downstream in any event, and that new
796-- finalizer will always override the existing finalizer.
797{-# RULES "conduit: source/filter fusion .|" forall f src. src .| filter f = filterFuseRight src f #-}
798{-# INLINE filterFuseRight #-}
799
800-- | Ignore the remainder of values in the source. Particularly useful when
801-- combined with 'isolate'.
802--
803-- Subject to fusion
804--
805-- Since 0.3.0
806sinkNull, sinkNullC :: Monad m => ConduitT i o m ()
807sinkNullC = awaitForever $ \_ -> return ()
808{-# INLINE sinkNullC #-}
809STREAMING0(sinkNull, sinkNullC, sinkNullS)
810
811srcSinkNull :: Monad m => ConduitT () o m () -> m ()
812srcSinkNull (CI.ConduitT src) =
813    go (src CI.Done)
814  where
815    go (CI.Done ()) = return ()
816    go (CI.PipeM mp) = mp >>= go
817    go (CI.Leftover p ()) = go p
818    go (CI.HaveOutput p _) = go p
819    go (CI.NeedInput _ c) = go (c ())
820{-# INLINE srcSinkNull #-}
821{-# RULES "conduit: connect to sinkNull" forall src. runConduit (src .| sinkNull) = srcSinkNull src #-}
822
823-- | A source that outputs no values. Note that this is just a type-restricted
824-- synonym for 'mempty'.
825--
826-- Subject to fusion
827--
828-- Since 0.3.0
829sourceNull, sourceNullC :: Monad m => ConduitT i o m ()
830sourceNullC = return ()
831{-# INLINE sourceNullC #-}
832STREAMING0(sourceNull, sourceNullC, sourceNullS)
833
834-- | Run a @Pipe@ repeatedly, and output its result value downstream. Stops
835-- when no more input is available from upstream.
836--
837-- Since 0.5.0
838sequence :: Monad m
839         => ConduitT i o m o -- ^ @Pipe@ to run repeatedly
840         -> ConduitT i o m ()
841sequence sink =
842    self
843  where
844    self = awaitForever $ \i -> leftover i >> sink >>= yield
845