1{-# OPTIONS_HADDOCK not-home #-}
2{-# LANGUAGE DeriveFunctor #-}
3{-# LANGUAGE FlexibleInstances #-}
4{-# LANGUAGE FlexibleContexts #-}
5{-# LANGUAGE CPP #-}
6{-# LANGUAGE MultiParamTypeClasses #-}
7{-# LANGUAGE UndecidableInstances #-}
8{-# LANGUAGE RankNTypes #-}
9{-# LANGUAGE TupleSections #-}
10{-# LANGUAGE Trustworthy #-}
11{-# LANGUAGE TypeFamilies #-}
12module Data.Conduit.Internal.Conduit
13    ( -- ** Types
14      ConduitT (..)
15    , ConduitM
16    , Source
17    , Producer
18    , Sink
19    , Consumer
20    , Conduit
21    , Flush (..)
22      -- *** Newtype wrappers
23    , ZipSource (..)
24    , ZipSink (..)
25    , ZipConduit (..)
26      -- ** Sealed
27    , SealedConduitT (..)
28    , sealConduitT
29    , unsealConduitT
30      -- ** Primitives
31    , await
32    , awaitForever
33    , yield
34    , yieldM
35    , leftover
36    , runConduit
37    , runConduitPure
38    , runConduitRes
39    , fuse
40    , connect
41      -- ** Composition
42    , connectResume
43    , connectResumeConduit
44    , fuseLeftovers
45    , fuseReturnLeftovers
46    , ($$+)
47    , ($$++)
48    , ($$+-)
49    , ($=+)
50    , (=$$+)
51    , (=$$++)
52    , (=$$+-)
53    , ($$)
54    , ($=)
55    , (=$)
56    , (=$=)
57    , (.|)
58      -- ** Generalizing
59    , sourceToPipe
60    , sinkToPipe
61    , conduitToPipe
62    , toProducer
63    , toConsumer
64      -- ** Cleanup
65    , bracketP
66      -- ** Exceptions
67    , catchC
68    , handleC
69    , tryC
70      -- ** Utilities
71    , Data.Conduit.Internal.Conduit.transPipe
72    , Data.Conduit.Internal.Conduit.mapOutput
73    , Data.Conduit.Internal.Conduit.mapOutputMaybe
74    , Data.Conduit.Internal.Conduit.mapInput
75    , Data.Conduit.Internal.Conduit.mapInputM
76    , zipSinks
77    , zipSources
78    , zipSourcesApp
79    , zipConduitApp
80    , mergeSource
81    , passthroughSink
82    , sourceToList
83    , fuseBoth
84    , fuseBothMaybe
85    , fuseUpstream
86    , sequenceSources
87    , sequenceSinks
88    , sequenceConduits
89    ) where
90
91import Control.Applicative (Applicative (..))
92import Control.Exception (Exception)
93import qualified Control.Exception as E (catch)
94import Control.Monad (liftM, liftM2, ap)
95import Control.Monad.Fail(MonadFail(..))
96import Control.Monad.Error.Class(MonadError(..))
97import Control.Monad.Reader.Class(MonadReader(..))
98import Control.Monad.RWS.Class(MonadRWS())
99import Control.Monad.Writer.Class(MonadWriter(..), censor)
100import Control.Monad.State.Class(MonadState(..))
101import Control.Monad.Trans.Class (MonadTrans (lift))
102import Control.Monad.IO.Unlift (MonadIO (liftIO), MonadUnliftIO, withRunInIO)
103import Control.Monad.Primitive (PrimMonad, PrimState, primitive)
104import Data.Functor.Identity (Identity, runIdentity)
105import Data.Void (Void, absurd)
106import Data.Monoid (Monoid (mappend, mempty))
107import Data.Semigroup (Semigroup ((<>)))
108import Control.Monad.Trans.Resource
109import Data.Conduit.Internal.Pipe hiding (yield, mapOutput, leftover, yieldM, await, awaitForever, bracketP)
110import qualified Data.Conduit.Internal.Pipe as CI
111import Control.Monad (forever)
112import Data.Traversable (Traversable (..))
113
114-- | Core datatype of the conduit package. This type represents a general
115-- component which can consume a stream of input values @i@, produce a stream
116-- of output values @o@, perform actions in the @m@ monad, and produce a final
117-- result @r@. The type synonyms provided here are simply wrappers around this
118-- type.
119--
120-- Since 1.3.0
121newtype ConduitT i o m r = ConduitT
122    { unConduitT :: forall b.
123                    (r -> Pipe i i o () m b) -> Pipe i i o () m b
124    }
125
126-- | In order to provide for efficient monadic composition, the
127-- @ConduitT@ type is implemented internally using a technique known
128-- as the codensity transform. This allows for cheap appending, but
129-- makes one case much more expensive: partially running a @ConduitT@
130-- and that capturing the new state.
131--
132-- This data type is the same as @ConduitT@, but does not use the
133-- codensity transform technique.
134--
135-- @since 1.3.0
136newtype SealedConduitT i o m r = SealedConduitT (Pipe i i o () m r)
137
138-- | Same as 'ConduitT', for backwards compat
139type ConduitM = ConduitT
140
141instance Functor (ConduitT i o m) where
142    fmap f (ConduitT c) = ConduitT $ \rest -> c (rest . f)
143
144instance Applicative (ConduitT i o m) where
145    pure x = ConduitT ($ x)
146    {-# INLINE pure #-}
147    (<*>) = ap
148    {-# INLINE (<*>) #-}
149
150instance Monad (ConduitT i o m) where
151    return = pure
152    ConduitT f >>= g = ConduitT $ \h -> f $ \a -> unConduitT (g a) h
153
154-- | @since 1.3.1
155instance MonadFail m => MonadFail (ConduitT i o m) where
156    fail = lift . Control.Monad.Fail.fail
157
158instance MonadThrow m => MonadThrow (ConduitT i o m) where
159    throwM = lift . throwM
160
161instance MonadIO m => MonadIO (ConduitT i o m) where
162    liftIO = lift . liftIO
163    {-# INLINE liftIO #-}
164
165instance MonadReader r m => MonadReader r (ConduitT i o m) where
166    ask = lift ask
167    {-# INLINE ask #-}
168
169    local f (ConduitT c0) = ConduitT $ \rest ->
170        let go (HaveOutput p o) = HaveOutput (go p) o
171            go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u))
172            go (Done x) = rest x
173            go (PipeM mp) = PipeM (liftM go $ local f mp)
174            go (Leftover p i) = Leftover (go p) i
175         in go (c0 Done)
176
177#ifndef MIN_VERSION_mtl
178#define MIN_VERSION_mtl(x, y, z) 0
179#endif
180
181instance MonadWriter w m => MonadWriter w (ConduitT i o m) where
182#if MIN_VERSION_mtl(2, 1, 0)
183    writer = lift . writer
184#endif
185    tell = lift . tell
186
187    listen (ConduitT c0) = ConduitT $ \rest ->
188        let go front (HaveOutput p o) = HaveOutput (go front p) o
189            go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u))
190            go front (Done x) = rest (x, front)
191            go front (PipeM mp) = PipeM $ do
192                (p,w) <- listen mp
193                return $ go (front `mappend` w) p
194            go front (Leftover p i) = Leftover (go front p) i
195         in go mempty (c0 Done)
196
197    pass (ConduitT c0) = ConduitT $ \rest ->
198        let go front (HaveOutput p o) = HaveOutput (go front p) o
199            go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u))
200            go front (PipeM mp) = PipeM $ do
201                (p,w) <- censor (const mempty) (listen mp)
202                return $ go (front `mappend` w) p
203            go front (Done (x,f)) = PipeM $ do
204                tell (f front)
205                return $ rest x
206            go front (Leftover p i) = Leftover (go front p) i
207         in go mempty (c0 Done)
208
209instance MonadState s m => MonadState s (ConduitT i o m) where
210    get = lift get
211    put = lift . put
212#if MIN_VERSION_mtl(2, 1, 0)
213    state = lift . state
214#endif
215
216instance MonadRWS r w s m => MonadRWS r w s (ConduitT i o m)
217
218instance MonadError e m => MonadError e (ConduitT i o m) where
219    throwError = lift . throwError
220    catchError (ConduitT c0) f = ConduitT $ \rest ->
221        let go (HaveOutput p o) = HaveOutput (go p) o
222            go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u))
223            go (Done x) = rest x
224            go (PipeM mp) =
225              PipeM $ catchError (liftM go mp) $ \e -> do
226                return $ unConduitT (f e) rest
227            go (Leftover p i) = Leftover (go p) i
228         in go (c0 Done)
229
230instance MonadTrans (ConduitT i o) where
231    lift mr = ConduitT $ \rest -> PipeM (liftM rest mr)
232    {-# INLINE [1] lift #-}
233
234instance MonadResource m => MonadResource (ConduitT i o m) where
235    liftResourceT = lift . liftResourceT
236    {-# INLINE liftResourceT #-}
237
238instance Monad m => Semigroup (ConduitT i o m ()) where
239    (<>) = (>>)
240    {-# INLINE (<>) #-}
241
242instance Monad m => Monoid (ConduitT i o m ()) where
243    mempty = return ()
244    {-# INLINE mempty #-}
245#if !(MIN_VERSION_base(4,11,0))
246    mappend = (<>)
247    {-# INLINE mappend #-}
248#endif
249
250instance PrimMonad m => PrimMonad (ConduitT i o m) where
251  type PrimState (ConduitT i o m) = PrimState m
252  primitive = lift . primitive
253
254-- | Provides a stream of output values, without consuming any input or
255-- producing a final result.
256--
257-- Since 0.5.0
258type Source m o = ConduitT () o m ()
259{-# DEPRECATED Source "Use ConduitT directly" #-}
260
261-- | A component which produces a stream of output values, regardless of the
262-- input stream. A @Producer@ is a generalization of a @Source@, and can be
263-- used as either a @Source@ or a @Conduit@.
264--
265-- Since 1.0.0
266type Producer m o = forall i. ConduitT i o m ()
267{-# DEPRECATED Producer "Use ConduitT directly" #-}
268
269-- | Consumes a stream of input values and produces a final result, without
270-- producing any output.
271--
272-- > type Sink i m r = ConduitT i Void m r
273--
274-- Since 0.5.0
275type Sink i = ConduitT i Void
276{-# DEPRECATED Sink "Use ConduitT directly" #-}
277
278-- | A component which consumes a stream of input values and produces a final
279-- result, regardless of the output stream. A @Consumer@ is a generalization of
280-- a @Sink@, and can be used as either a @Sink@ or a @Conduit@.
281--
282-- Since 1.0.0
283type Consumer i m r = forall o. ConduitT i o m r
284{-# DEPRECATED Consumer "Use ConduitT directly" #-}
285
286-- | Consumes a stream of input values and produces a stream of output values,
287-- without producing a final result.
288--
289-- Since 0.5.0
290type Conduit i m o = ConduitT i o m ()
291{-# DEPRECATED Conduit "Use ConduitT directly" #-}
292
293sealConduitT :: ConduitT i o m r -> SealedConduitT i o m r
294sealConduitT (ConduitT f) = SealedConduitT (f Done)
295
296unsealConduitT :: Monad m => SealedConduitT i o m r -> ConduitT i o m r
297unsealConduitT (SealedConduitT f) = ConduitT (f >>=)
298
299-- | Connect a @Source@ to a @Sink@ until the latter closes. Returns both the
300-- most recent state of the @Source@ and the result of the @Sink@.
301--
302-- Since 0.5.0
303connectResume :: Monad m
304              => SealedConduitT () a m ()
305              -> ConduitT a Void m r
306              -> m (SealedConduitT () a m (), r)
307connectResume (SealedConduitT left0) (ConduitT right0) =
308    goRight left0 (right0 Done)
309  where
310    goRight left right =
311        case right of
312            HaveOutput _ o   -> absurd o
313            NeedInput rp rc  -> goLeft rp rc left
314            Done r2          -> return (SealedConduitT left, r2)
315            PipeM mp         -> mp >>= goRight left
316            Leftover p i     -> goRight (HaveOutput left i) p
317
318    goLeft rp rc left =
319        case left of
320            HaveOutput left' o            -> goRight left' (rp o)
321            NeedInput _ lc                -> recurse (lc ())
322            Done ()                       -> goRight (Done ()) (rc ())
323            PipeM mp                      -> mp >>= recurse
324            Leftover p ()                 -> recurse p
325      where
326        recurse = goLeft rp rc
327
328sourceToPipe :: Monad m => Source m o -> Pipe l i o u m ()
329sourceToPipe =
330    go . flip unConduitT Done
331  where
332    go (HaveOutput p o) = HaveOutput (go p) o
333    go (NeedInput _ c) = go $ c ()
334    go (Done ()) = Done ()
335    go (PipeM mp) = PipeM (liftM go mp)
336    go (Leftover p ()) = go p
337
338sinkToPipe :: Monad m => Sink i m r -> Pipe l i o u m r
339sinkToPipe =
340    go . injectLeftovers . flip unConduitT Done
341  where
342    go (HaveOutput _ o) = absurd o
343    go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ())
344    go (Done r) = Done r
345    go (PipeM mp) = PipeM (liftM go mp)
346    go (Leftover _ l) = absurd l
347
348conduitToPipe :: Monad m => Conduit i m o -> Pipe l i o u m ()
349conduitToPipe =
350    go . injectLeftovers . flip unConduitT Done
351  where
352    go (HaveOutput p o) = HaveOutput (go p) o
353    go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ())
354    go (Done ()) = Done ()
355    go (PipeM mp) = PipeM (liftM go mp)
356    go (Leftover _ l) = absurd l
357
358-- | Generalize a 'Source' to a 'Producer'.
359--
360-- Since 1.0.0
361toProducer :: Monad m => Source m a -> Producer m a
362toProducer (ConduitT c0) = ConduitT $ \rest -> let
363    go (HaveOutput p o) = HaveOutput (go p) o
364    go (NeedInput _ c) = go (c ())
365    go (Done r) = rest r
366    go (PipeM mp) = PipeM (liftM go mp)
367    go (Leftover p ()) = go p
368    in go (c0 Done)
369
370-- | Generalize a 'Sink' to a 'Consumer'.
371--
372-- Since 1.0.0
373toConsumer :: Monad m => Sink a m b -> Consumer a m b
374toConsumer (ConduitT c0) = ConduitT $ \rest -> let
375    go (HaveOutput _ o) = absurd o
376    go (NeedInput p c) = NeedInput (go . p) (go . c)
377    go (Done r) = rest r
378    go (PipeM mp) = PipeM (liftM go mp)
379    go (Leftover p l) = Leftover (go p) l
380    in go (c0 Done)
381
382-- | Catch all exceptions thrown by the current component of the pipeline.
383--
384-- Note: this will /not/ catch exceptions thrown by other components! For
385-- example, if an exception is thrown in a @Source@ feeding to a @Sink@, and
386-- the @Sink@ uses @catchC@, the exception will /not/ be caught.
387--
388-- Due to this behavior (as well as lack of async exception safety), you
389-- should not try to implement combinators such as @onException@ in terms of this
390-- primitive function.
391--
392-- Note also that the exception handling will /not/ be applied to any
393-- finalizers generated by this conduit.
394--
395-- Since 1.0.11
396catchC :: (MonadUnliftIO m, Exception e)
397       => ConduitT i o m r
398       -> (e -> ConduitT i o m r)
399       -> ConduitT i o m r
400catchC (ConduitT p0) onErr = ConduitT $ \rest -> let
401    go (Done r) = rest r
402    go (PipeM mp) = PipeM $ withRunInIO $ \run -> E.catch (run (liftM go mp))
403        (return . flip unConduitT rest . onErr)
404    go (Leftover p i) = Leftover (go p) i
405    go (NeedInput x y) = NeedInput (go . x) (go . y)
406    go (HaveOutput p o) = HaveOutput (go p) o
407    in go (p0 Done)
408{-# INLINE catchC #-}
409
410-- | The same as @flip catchC@.
411--
412-- Since 1.0.11
413handleC :: (MonadUnliftIO m, Exception e)
414        => (e -> ConduitT i o m r)
415        -> ConduitT i o m r
416        -> ConduitT i o m r
417handleC = flip catchC
418{-# INLINE handleC #-}
419
420-- | A version of @try@ for use within a pipeline. See the comments in @catchC@
421-- for more details.
422--
423-- Since 1.0.11
424tryC :: (MonadUnliftIO m, Exception e)
425     => ConduitT i o m r
426     -> ConduitT i o m (Either e r)
427tryC c = fmap Right c `catchC` (return . Left)
428{-# INLINE tryC #-}
429
430-- | Combines two sinks. The new sink will complete when both input sinks have
431--   completed.
432--
433-- Any leftovers are discarded.
434--
435-- Since 0.4.1
436zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r')
437zipSinks (ConduitT x0) (ConduitT y0) = ConduitT $ \rest -> let
438    Leftover _  i    >< _                = absurd i
439    _                >< Leftover _  i    = absurd i
440    HaveOutput _ o   >< _                = absurd o
441    _                >< HaveOutput _ o   = absurd o
442
443    PipeM mx         >< y                = PipeM (liftM (>< y) mx)
444    x                >< PipeM my         = PipeM (liftM (x ><) my)
445    Done x           >< Done y           = rest (x, y)
446    NeedInput px cx  >< NeedInput py cy  = NeedInput (\i -> px i >< py i) (\() -> cx () >< cy ())
447    NeedInput px cx  >< y@Done{}         = NeedInput (\i -> px i >< y)    (\u -> cx u >< y)
448    x@Done{}         >< NeedInput py cy  = NeedInput (\i -> x >< py i)    (\u -> x >< cy u)
449    in injectLeftovers (x0 Done) >< injectLeftovers (y0 Done)
450
451-- | Combines two sources. The new source will stop producing once either
452--   source has been exhausted.
453--
454-- Since 1.0.13
455zipSources :: Monad m => Source m a -> Source m b -> Source m (a, b)
456zipSources (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
457    go (Leftover left ()) right = go left right
458    go left (Leftover right ())  = go left right
459    go (Done ()) (Done ()) = rest ()
460    go (Done ()) (HaveOutput _ _) = rest ()
461    go (HaveOutput _ _) (Done ()) = rest ()
462    go (Done ()) (PipeM _) = rest ()
463    go (PipeM _) (Done ()) = rest ()
464    go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my)
465    go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx)
466    go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my)
467    go (HaveOutput srcx x) (HaveOutput srcy y) = HaveOutput (go srcx srcy) (x, y)
468    go (NeedInput _ c) right = go (c ()) right
469    go left (NeedInput _ c) = go left (c ())
470    in go (left0 Done) (right0 Done)
471
472-- | Combines two sources. The new source will stop producing once either
473--   source has been exhausted.
474--
475-- Since 1.0.13
476zipSourcesApp :: Monad m => Source m (a -> b) -> Source m a -> Source m b
477zipSourcesApp (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
478    go (Leftover left ()) right = go left right
479    go left (Leftover right ())  = go left right
480    go (Done ()) (Done ()) = rest ()
481    go (Done ()) (HaveOutput _ _) = rest ()
482    go (HaveOutput _ _) (Done ()) = rest ()
483    go (Done ()) (PipeM _) = rest ()
484    go (PipeM _) (Done ()) = rest ()
485    go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my)
486    go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx)
487    go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my)
488    go (HaveOutput srcx x) (HaveOutput srcy y) = HaveOutput (go srcx srcy) (x y)
489    go (NeedInput _ c) right = go (c ()) right
490    go left (NeedInput _ c) = go left (c ())
491    in go (left0 Done) (right0 Done)
492
493-- |
494--
495-- Since 1.0.17
496zipConduitApp
497    :: Monad m
498    => ConduitT i o m (x -> y)
499    -> ConduitT i o m x
500    -> ConduitT i o m y
501zipConduitApp (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
502    go (Done f) (Done x) = rest (f x)
503    go (PipeM mx) y = PipeM (flip go y `liftM` mx)
504    go x (PipeM my) = PipeM (go x `liftM` my)
505    go (HaveOutput x o) y = HaveOutput (go x y) o
506    go x (HaveOutput y o) = HaveOutput (go x y) o
507    go (Leftover _ i) _ = absurd i
508    go _ (Leftover _ i) = absurd i
509    go (NeedInput px cx) (NeedInput py cy) = NeedInput
510        (\i -> go (px i) (py i))
511        (\u -> go (cx u) (cy u))
512    go (NeedInput px cx) (Done y) = NeedInput
513        (\i -> go (px i) (Done y))
514        (\u -> go (cx u) (Done y))
515    go (Done x) (NeedInput py cy) = NeedInput
516        (\i -> go (Done x) (py i))
517        (\u -> go (Done x) (cy u))
518  in go (injectLeftovers $ left0 Done) (injectLeftovers $ right0 Done)
519
520-- | Same as normal fusion (e.g. @=$=@), except instead of discarding leftovers
521-- from the downstream component, return them.
522--
523-- Since 1.0.17
524fuseReturnLeftovers :: Monad m
525                    => ConduitT a b m ()
526                    -> ConduitT b c m r
527                    -> ConduitT a c m (r, [b])
528fuseReturnLeftovers (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
529    goRight bs left right =
530        case right of
531            HaveOutput p o -> HaveOutput (recurse p) o
532            NeedInput rp rc  ->
533                case bs of
534                    [] -> goLeft rp rc left
535                    b:bs' -> goRight bs' left (rp b)
536            Done r2          -> rest (r2, bs)
537            PipeM mp         -> PipeM (liftM recurse mp)
538            Leftover p b     -> goRight (b:bs) left p
539      where
540        recurse = goRight bs left
541
542    goLeft rp rc left =
543        case left of
544            HaveOutput left' o        -> goRight [] left' (rp o)
545            NeedInput left' lc        -> NeedInput (recurse . left') (recurse . lc)
546            Done r1                   -> goRight [] (Done r1) (rc r1)
547            PipeM mp                  -> PipeM (liftM recurse mp)
548            Leftover left' i          -> Leftover (recurse left') i
549      where
550        recurse = goLeft rp rc
551    in goRight [] (left0 Done) (right0 Done)
552
553-- | Similar to @fuseReturnLeftovers@, but use the provided function to convert
554-- downstream leftovers to upstream leftovers.
555--
556-- Since 1.0.17
557fuseLeftovers
558    :: Monad m
559    => ([b] -> [a])
560    -> ConduitT a b m ()
561    -> ConduitT b c m r
562    -> ConduitT a c m r
563fuseLeftovers f left right = do
564    (r, bs) <- fuseReturnLeftovers left right
565    mapM_ leftover $ reverse $ f bs
566    return r
567
568-- | Connect a 'Conduit' to a sink and return the output of the sink
569-- together with a new 'Conduit'.
570--
571-- Since 1.0.17
572connectResumeConduit
573    :: Monad m
574    => SealedConduitT i o m ()
575    -> ConduitT o Void m r
576    -> ConduitT i Void m (SealedConduitT i o m (), r)
577connectResumeConduit (SealedConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
578    goRight left right =
579        case right of
580            HaveOutput _ o -> absurd o
581            NeedInput rp rc -> goLeft rp rc left
582            Done r2 -> rest (SealedConduitT left, r2)
583            PipeM mp -> PipeM (liftM (goRight left) mp)
584            Leftover p i -> goRight (HaveOutput left i) p
585
586    goLeft rp rc left =
587        case left of
588            HaveOutput left' o -> goRight left' (rp o)
589            NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
590            Done () -> goRight (Done ()) (rc ())
591            PipeM mp -> PipeM (liftM recurse mp)
592            Leftover left' i -> Leftover (recurse left') i -- recurse p
593      where
594        recurse = goLeft rp rc
595    in goRight left0 (right0 Done)
596
597-- | Merge a @Source@ into a @Conduit@.
598-- The new conduit will stop processing once either source or upstream have been exhausted.
599mergeSource
600  :: Monad m
601  => Source m i
602  -> Conduit a m (i, a)
603mergeSource = loop . sealConduitT
604  where
605    loop :: Monad m => SealedConduitT () i m () -> Conduit a m (i, a)
606    loop src0 = await >>= maybe (return ()) go
607      where
608        go a = do
609          (src1, mi) <- lift $ src0 $$++ await
610          case mi of
611            Nothing -> return ()
612            Just i  -> yield (i, a) >> loop src1
613
614
615-- | Turn a @Sink@ into a @Conduit@ in the following way:
616--
617-- * All input passed to the @Sink@ is yielded downstream.
618--
619-- * When the @Sink@ finishes processing, the result is passed to the provided to the finalizer function.
620--
621-- Note that the @Sink@ will stop receiving input as soon as the downstream it
622-- is connected to shuts down.
623--
624-- An example usage would be to write the result of a @Sink@ to some mutable
625-- variable while allowing other processing to continue.
626--
627-- Since 1.1.0
628passthroughSink :: Monad m
629                => Sink i m r
630                -> (r -> m ()) -- ^ finalizer
631                -> Conduit i m i
632passthroughSink (ConduitT sink0) final = ConduitT $ \rest -> let
633    -- A bit of explanation is in order, this function is
634    -- non-obvious. The purpose of go is to keep track of the sink
635    -- we're passing values to, and then yield values downstream. The
636    -- third argument to go is the current state of that sink. That's
637    -- relatively straightforward.
638    --
639    -- The second value is the leftover buffer. These are values that
640    -- the sink itself has called leftover on, and must be provided
641    -- back to the sink the next time it awaits. _However_, these
642    -- values should _not_ be reyielded downstream: we have already
643    -- yielded them downstream ourself, and it is the responsibility
644    -- of the functions wrapping around passthroughSink to handle the
645    -- leftovers from downstream.
646    --
647    -- The trickiest bit is the first argument, which is a solution to
648    -- bug https://github.com/snoyberg/conduit/issues/304. The issue
649    -- is that, once we get a value, we need to provide it to both the
650    -- inner sink _and_ yield it downstream. The obvious thing to do
651    -- is yield first and then recursively call go. Unfortunately,
652    -- this doesn't work in all cases: if the downstream component
653    -- never calls await again, our yield call will never return, and
654    -- our sink will not get the last value. This results is confusing
655    -- behavior where the sink and downstream component receive a
656    -- different number of values.
657    --
658    -- Solution: keep a buffer of the next value to yield downstream,
659    -- and only yield it downstream in one of two cases: our sink is
660    -- asking for another value, or our sink is done. This way, we
661    -- ensure that, in all cases, we pass exactly the same number of
662    -- values to the inner sink as to downstream.
663
664    go mbuf _ (Done r) = do
665        maybe (return ()) CI.yield mbuf
666        lift $ final r
667        unConduitT (awaitForever yield) rest
668    go mbuf is (Leftover sink i) = go mbuf (i:is) sink
669    go _ _ (HaveOutput _ o) = absurd o
670    go mbuf is (PipeM mx) = do
671        x <- lift mx
672        go mbuf is x
673    go mbuf (i:is) (NeedInput next _) = go mbuf is (next i)
674    go mbuf [] (NeedInput next done) = do
675        maybe (return ()) CI.yield mbuf
676        mx <- CI.await
677        case mx of
678            Nothing -> go Nothing [] (done ())
679            Just x -> go (Just x) [] (next x)
680    in go Nothing [] (sink0 Done)
681
682-- | Convert a @Source@ into a list. The basic functionality can be explained as:
683--
684-- > sourceToList src = src $$ Data.Conduit.List.consume
685--
686-- However, @sourceToList@ is able to produce its results lazily, which cannot
687-- be done when running a conduit pipeline in general. Unlike the
688-- @Data.Conduit.Lazy@ module (in conduit-extra), this function performs no
689-- unsafe I\/O operations, and therefore can only be as lazily as the
690-- underlying monad.
691--
692-- Since 1.2.6
693sourceToList :: Monad m => Source m a -> m [a]
694sourceToList =
695    go . flip unConduitT Done
696  where
697    go (Done _) = return []
698    go (HaveOutput src x) = liftM (x:) (go src)
699    go (PipeM msrc) = msrc >>= go
700    go (NeedInput _ c) = go (c ())
701    go (Leftover p _) = go p
702
703-- Define fixity of all our operators
704infixr 0 $$
705infixl 1 $=
706infixr 2 =$
707infixr 2 =$=
708infixr 0 $$+
709infixr 0 $$++
710infixr 0 $$+-
711infixl 1 $=+
712infixr 2 .|
713
714-- | Equivalent to using 'runConduit' and '.|' together.
715--
716-- Since 1.2.3
717connect :: Monad m
718        => ConduitT () a m ()
719        -> ConduitT a Void m r
720        -> m r
721connect = ($$)
722
723-- | Named function synonym for '.|'
724--
725-- Equivalent to '.|' and '=$='. However, the latter is
726-- deprecated and will be removed in a future version.
727--
728-- Since 1.2.3
729fuse :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r
730fuse = (=$=)
731
732-- | Combine two @Conduit@s together into a new @Conduit@ (aka 'fuse').
733--
734-- Output from the upstream (left) conduit will be fed into the
735-- downstream (right) conduit. Processing will terminate when
736-- downstream (right) returns.
737-- Leftover data returned from the right @Conduit@ will be discarded.
738--
739-- Equivalent to 'fuse' and '=$=', however the latter is deprecated and will
740-- be removed in a future version.
741--
742-- Note that, while this operator looks like categorical composition
743-- (from "Control.Category"), there are a few reasons it's different:
744--
745-- * The position of the type parameters to 'ConduitT' do not
746--   match. We would need to change @ConduitT i o m r@ to @ConduitT r
747--   m i o@, which would preclude a 'Monad' or 'MonadTrans' instance.
748--
749-- * The result value from upstream and downstream are allowed to
750--   differ between upstream and downstream. In other words, we would
751--   need the type signature here to look like @ConduitT a b m r ->
752--   ConduitT b c m r -> ConduitT a c m r@.
753--
754-- * Due to leftovers, we do not have a left identity in Conduit. This
755--   can be achieved with the underlying @Pipe@ datatype, but this is
756--   not generally recommended. See <https://stackoverflow.com/a/15263700>.
757--
758-- @since 1.2.8
759(.|) :: Monad m
760     => ConduitM a b m () -- ^ upstream
761     -> ConduitM b c m r -- ^ downstream
762     -> ConduitM a c m r
763(.|) = fuse
764{-# INLINE (.|) #-}
765
766-- | The connect operator, which pulls data from a source and pushes to a sink.
767-- If you would like to keep the @Source@ open to be used for other
768-- operations, use the connect-and-resume operator '$$+'.
769--
770-- Since 0.4.0
771($$) :: Monad m => Source m a -> Sink a m b -> m b
772src $$ sink = do
773    (rsrc, res) <- src $$+ sink
774    rsrc $$+- return ()
775    return res
776{-# INLINE [1] ($$) #-}
777{-# DEPRECATED ($$) "Use runConduit and .|" #-}
778
779-- | A synonym for '=$=' for backwards compatibility.
780--
781-- Since 0.4.0
782($=) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r
783($=) = (=$=)
784{-# INLINE [0] ($=) #-}
785{-# RULES "conduit: $= is =$=" ($=) = (=$=) #-}
786{-# DEPRECATED ($=) "Use .|" #-}
787
788-- | A synonym for '=$=' for backwards compatibility.
789--
790-- Since 0.4.0
791(=$) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r
792(=$) = (=$=)
793{-# INLINE [0] (=$) #-}
794{-# RULES "conduit: =$ is =$=" (=$) = (=$=) #-}
795{-# DEPRECATED (=$) "Use .|" #-}
796
797-- | Deprecated fusion operator.
798--
799-- Since 0.4.0
800(=$=) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r
801ConduitT left0 =$= ConduitT right0 = ConduitT $ \rest ->
802    let goRight left right =
803            case right of
804                HaveOutput p o    -> HaveOutput (recurse p) o
805                NeedInput rp rc   -> goLeft rp rc left
806                Done r2           -> rest r2
807                PipeM mp          -> PipeM (liftM recurse mp)
808                Leftover right' i -> goRight (HaveOutput left i) right'
809          where
810            recurse = goRight left
811
812        goLeft rp rc left =
813            case left of
814                HaveOutput left' o        -> goRight left' (rp o)
815                NeedInput left' lc        -> NeedInput (recurse . left') (recurse . lc)
816                Done r1                   -> goRight (Done r1) (rc r1)
817                PipeM mp                  -> PipeM (liftM recurse mp)
818                Leftover left' i          -> Leftover (recurse left') i
819          where
820            recurse = goLeft rp rc
821     in goRight (left0 Done) (right0 Done)
822{-# INLINE [1] (=$=) #-}
823{-# DEPRECATED (=$=) "Use .|" #-}
824
825-- | Wait for a single input value from upstream. If no data is available,
826-- returns @Nothing@. Once @await@ returns @Nothing@, subsequent calls will
827-- also return @Nothing@.
828--
829-- Since 0.5.0
830await :: Monad m => Consumer i m (Maybe i)
831await = ConduitT $ \f -> NeedInput (f . Just) (const $ f Nothing)
832{-# INLINE [0] await #-}
833
834await' :: Monad m
835       => ConduitT i o m r
836       -> (i -> ConduitT i o m r)
837       -> ConduitT i o m r
838await' f g = ConduitT $ \rest -> NeedInput
839    (\i -> unConduitT (g i) rest)
840    (const $ unConduitT f rest)
841{-# INLINE await' #-}
842{-# RULES "conduit: await >>= maybe" forall x y. await >>= maybe x y = await' x y #-}
843
844-- | Send a value downstream to the next component to consume. If the
845-- downstream component terminates, this call will never return control.
846--
847-- Since 0.5.0
848yield :: Monad m
849      => o -- ^ output value
850      -> ConduitT i o m ()
851yield o = ConduitT $ \rest -> HaveOutput (rest ()) o
852{-# INLINE yield #-}
853
854-- | Send a monadic value downstream for the next component to consume.
855--
856-- @since 1.2.7
857yieldM :: Monad m => m o -> ConduitT i o m ()
858yieldM mo = lift mo >>= yield
859{-# INLINE yieldM #-}
860
861  -- FIXME rule won't fire, see FIXME in .Pipe; "mapM_ yield" mapM_ yield = ConduitT . sourceList
862
863-- | Provide a single piece of leftover input to be consumed by the next
864-- component in the current monadic binding.
865--
866-- /Note/: it is highly encouraged to only return leftover values from input
867-- already consumed from upstream.
868--
869-- @since 0.5.0
870leftover :: i -> ConduitT i o m ()
871leftover i = ConduitT $ \rest -> Leftover (rest ()) i
872{-# INLINE leftover #-}
873
874-- | Run a pipeline until processing completes.
875--
876-- Since 1.2.1
877runConduit :: Monad m => ConduitT () Void m r -> m r
878runConduit (ConduitT p) = runPipe $ injectLeftovers $ p Done
879{-# INLINE [0] runConduit #-}
880
881-- | Bracket a conduit computation between allocation and release of a
882-- resource. Two guarantees are given about resource finalization:
883--
884-- 1. It will be /prompt/. The finalization will be run as early as possible.
885--
886-- 2. It is exception safe. Due to usage of @resourcet@, the finalization will
887-- be run in the event of any exceptions.
888--
889-- Since 0.5.0
890bracketP :: MonadResource m
891
892         => IO a
893            -- ^ computation to run first (\"acquire resource\")
894         -> (a -> IO ())
895            -- ^ computation to run last (\"release resource\")
896         -> (a -> ConduitT i o m r)
897            -- ^ computation to run in-between
898         -> ConduitT i o m r
899            -- returns the value from the in-between computation
900bracketP alloc free inside = ConduitT $ \rest -> do
901  (key, seed) <- allocate alloc free
902  unConduitT (inside seed) $ \res -> do
903    release key
904    rest res
905
906-- | Wait for input forever, calling the given inner component for each piece of
907-- new input.
908--
909-- This function is provided as a convenience for the common pattern of
910-- @await@ing input, checking if it's @Just@ and then looping.
911--
912-- Since 0.5.0
913awaitForever :: Monad m => (i -> ConduitT i o m r) -> ConduitT i o m ()
914awaitForever f = ConduitT $ \rest ->
915    let go = NeedInput (\i -> unConduitT (f i) (const go)) rest
916     in go
917
918-- | Transform the monad that a @ConduitT@ lives in.
919--
920-- Note that the monad transforming function will be run multiple times,
921-- resulting in unintuitive behavior in some cases. For a fuller treatment,
922-- please see:
923--
924-- <https://github.com/snoyberg/conduit/wiki/Dealing-with-monad-transformers>
925--
926-- Since 0.4.0
927transPipe :: Monad m => (forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
928transPipe f (ConduitT c0) = ConduitT $ \rest -> let
929        go (HaveOutput p o) = HaveOutput (go p) o
930        go (NeedInput p c) = NeedInput (go . p) (go . c)
931        go (Done r) = rest r
932        go (PipeM mp) =
933            PipeM (f $ liftM go $ collapse mp)
934          where
935            -- Combine a series of monadic actions into a single action.  Since we
936            -- throw away side effects between different actions, an arbitrary break
937            -- between actions will lead to a violation of the monad transformer laws.
938            -- Example available at:
939            --
940            -- http://hpaste.org/75520
941            collapse mpipe = do
942                pipe' <- mpipe
943                case pipe' of
944                    PipeM mpipe' -> collapse mpipe'
945                    _ -> return pipe'
946        go (Leftover p i) = Leftover (go p) i
947        in go (c0 Done)
948
949-- | Apply a function to all the output values of a @ConduitT@.
950--
951-- This mimics the behavior of `fmap` for a `Source` and `Conduit` in pre-0.4
952-- days. It can also be simulated by fusing with the @map@ conduit from
953-- "Data.Conduit.List".
954--
955-- Since 0.4.1
956mapOutput :: Monad m => (o1 -> o2) -> ConduitT i o1 m r -> ConduitT i o2 m r
957mapOutput f (ConduitT c0) = ConduitT $ \rest -> let
958    go (HaveOutput p o) = HaveOutput (go p) (f o)
959    go (NeedInput p c) = NeedInput (go . p) (go . c)
960    go (Done r) = rest r
961    go (PipeM mp) = PipeM (liftM (go) mp)
962    go (Leftover p i) = Leftover (go p) i
963    in go (c0 Done)
964
965-- | Same as 'mapOutput', but use a function that returns @Maybe@ values.
966--
967-- Since 0.5.0
968mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> ConduitT i o1 m r -> ConduitT i o2 m r
969mapOutputMaybe f (ConduitT c0) = ConduitT $ \rest -> let
970    go (HaveOutput p o) = maybe id (\o' p' -> HaveOutput p' o') (f o) (go p)
971    go (NeedInput p c) = NeedInput (go . p) (go . c)
972    go (Done r) = rest r
973    go (PipeM mp) = PipeM (liftM (go) mp)
974    go (Leftover p i) = Leftover (go p) i
975    in go (c0 Done)
976
977-- | Apply a function to all the input values of a @ConduitT@.
978--
979-- Since 0.5.0
980mapInput :: Monad m
981         => (i1 -> i2) -- ^ map initial input to new input
982         -> (i2 -> Maybe i1) -- ^ map new leftovers to initial leftovers
983         -> ConduitT i2 o m r
984         -> ConduitT i1 o m r
985mapInput f f' (ConduitT c0) = ConduitT $ \rest -> let
986    go (HaveOutput p o) = HaveOutput (go p) o
987    go (NeedInput p c) = NeedInput (go . p . f) (go . c)
988    go (Done r) = rest r
989    go (PipeM mp) = PipeM $ liftM go mp
990    go (Leftover p i) = maybe id (flip Leftover) (f' i) (go p)
991    in go (c0 Done)
992
993-- | Apply a monadic action to all the input values of a @ConduitT@.
994--
995-- Since 1.3.2
996mapInputM :: Monad m
997          => (i1 -> m i2) -- ^ map initial input to new input
998          -> (i2 -> m (Maybe i1)) -- ^ map new leftovers to initial leftovers
999          -> ConduitT i2 o m r
1000          -> ConduitT i1 o m r
1001mapInputM f f' (ConduitT c0) = ConduitT $ \rest -> let
1002    go (HaveOutput p o) = HaveOutput (go p) o
1003    go (NeedInput p c)  = NeedInput (\i -> PipeM $ go . p <$> f i) (go . c)
1004    go (Done r)         = rest r
1005    go (PipeM mp)       = PipeM $ fmap go mp
1006    go (Leftover p i)   = PipeM $ (\x -> maybe id (flip Leftover) x (go p)) <$> f' i
1007    in go (c0 Done)
1008
1009-- | The connect-and-resume operator. This does not close the @Source@, but
1010-- instead returns it to be used again. This allows a @Source@ to be used
1011-- incrementally in a large program, without forcing the entire program to live
1012-- in the @Sink@ monad.
1013--
1014-- Mnemonic: connect + do more.
1015--
1016-- Since 0.5.0
1017($$+) :: Monad m => Source m a -> Sink a m b -> m (SealedConduitT () a m (), b)
1018src $$+ sink = connectResume (sealConduitT src) sink
1019{-# INLINE ($$+) #-}
1020
1021-- | Continue processing after usage of @$$+@.
1022--
1023-- Since 0.5.0
1024($$++) :: Monad m => SealedConduitT () a m () -> Sink a m b -> m (SealedConduitT () a m (), b)
1025($$++) = connectResume
1026{-# INLINE ($$++) #-}
1027
1028-- | Same as @$$++@ and @connectResume@, but doesn't include the
1029-- updated @SealedConduitT@.
1030--
1031-- /NOTE/ In previous versions, this would cause finalizers to
1032-- run. Since version 1.3.0, there are no finalizers in conduit.
1033--
1034-- Since 0.5.0
1035($$+-) :: Monad m => SealedConduitT () a m () -> Sink a m b -> m b
1036rsrc $$+- sink = do
1037    (_, res) <- connectResume rsrc sink
1038    return res
1039{-# INLINE ($$+-) #-}
1040
1041-- | Left fusion for a sealed source.
1042--
1043-- Since 1.0.16
1044($=+) :: Monad m => SealedConduitT () a m () -> Conduit a m b -> SealedConduitT () b m ()
1045SealedConduitT src $=+ ConduitT sink = SealedConduitT (src `pipeL` sink Done)
1046
1047-- | Provide for a stream of data that can be flushed.
1048--
1049-- A number of @Conduit@s (e.g., zlib compression) need the ability to flush
1050-- the stream at some point. This provides a single wrapper datatype to be used
1051-- in all such circumstances.
1052--
1053-- Since 0.3.0
1054data Flush a = Chunk a | Flush
1055    deriving (Show, Eq, Ord)
1056instance Functor Flush where
1057    fmap _ Flush = Flush
1058    fmap f (Chunk a) = Chunk (f a)
1059
1060-- | A wrapper for defining an 'Applicative' instance for 'Source's which allows
1061-- to combine sources together, generalizing 'zipSources'. A combined source
1062-- will take input yielded from each of its @Source@s until any of them stop
1063-- producing output.
1064--
1065-- Since 1.0.13
1066newtype ZipSource m o = ZipSource { getZipSource :: Source m o }
1067
1068instance Monad m => Functor (ZipSource m) where
1069    fmap f = ZipSource . mapOutput f . getZipSource
1070instance Monad m => Applicative (ZipSource m) where
1071    pure  = ZipSource . forever . yield
1072    (ZipSource f) <*> (ZipSource x) = ZipSource $ zipSourcesApp f x
1073
1074-- | Coalesce all values yielded by all of the @Source@s.
1075--
1076-- Implemented on top of @ZipSource@ and as such, it exhibits the same
1077-- short-circuiting behavior as @ZipSource@. See that data type for more
1078-- details. If you want to create a source that yields *all* values from
1079-- multiple sources, use `sequence_`.
1080--
1081-- Since 1.0.13
1082sequenceSources :: (Traversable f, Monad m) => f (Source m o) -> Source m (f o)
1083sequenceSources = getZipSource . sequenceA . fmap ZipSource
1084
1085-- | A wrapper for defining an 'Applicative' instance for 'Sink's which allows
1086-- to combine sinks together, generalizing 'zipSinks'. A combined sink
1087-- distributes the input to all its participants and when all finish, produces
1088-- the result. This allows to define functions like
1089--
1090-- @
1091-- sequenceSinks :: (Monad m)
1092--           => [Sink i m r] -> Sink i m [r]
1093-- sequenceSinks = getZipSink . sequenceA . fmap ZipSink
1094-- @
1095--
1096-- Note that the standard 'Applicative' instance for conduits works
1097-- differently. It feeds one sink with input until it finishes, then switches
1098-- to another, etc., and at the end combines their results.
1099--
1100-- This newtype is in fact a type constrained version of 'ZipConduit', and has
1101-- the same behavior. It's presented as a separate type since (1) it
1102-- historically predates @ZipConduit@, and (2) the type constraining can make
1103-- your code clearer (and thereby make your error messages more easily
1104-- understood).
1105--
1106-- Since 1.0.13
1107newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r }
1108
1109instance Monad m => Functor (ZipSink i m) where
1110    fmap f (ZipSink x) = ZipSink (liftM f x)
1111instance Monad m => Applicative (ZipSink i m) where
1112    pure  = ZipSink . return
1113    (ZipSink f) <*> (ZipSink x) =
1114         ZipSink $ liftM (uncurry ($)) $ zipSinks f x
1115
1116-- | Send incoming values to all of the @Sink@ providing, and ultimately
1117-- coalesce together all return values.
1118--
1119-- Implemented on top of @ZipSink@, see that data type for more details.
1120--
1121-- Since 1.0.13
1122sequenceSinks :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r)
1123sequenceSinks = getZipSink . sequenceA . fmap ZipSink
1124
1125-- | The connect-and-resume operator. This does not close the @Conduit@, but
1126-- instead returns it to be used again. This allows a @Conduit@ to be used
1127-- incrementally in a large program, without forcing the entire program to live
1128-- in the @Sink@ monad.
1129--
1130-- Leftover data returned from the @Sink@ will be discarded.
1131--
1132-- Mnemonic: connect + do more.
1133--
1134-- Since 1.0.17
1135(=$$+) :: Monad m
1136       => ConduitT a b m ()
1137       -> ConduitT b Void m r
1138       -> ConduitT a Void m (SealedConduitT a b m (), r)
1139(=$$+) conduit = connectResumeConduit (sealConduitT conduit)
1140{-# INLINE (=$$+) #-}
1141
1142-- | Continue processing after usage of '=$$+'. Connect a 'SealedConduitT' to
1143-- a sink and return the output of the sink together with a new
1144-- 'SealedConduitT'.
1145--
1146-- Since 1.0.17
1147(=$$++) :: Monad m => SealedConduitT i o m () -> ConduitT o Void m r -> ConduitT i Void m (SealedConduitT i o m (), r)
1148(=$$++) = connectResumeConduit
1149{-# INLINE (=$$++) #-}
1150
1151-- | Same as @=$$++@, but doesn't include the updated
1152-- @SealedConduitT@.
1153--
1154-- /NOTE/ In previous versions, this would cause finalizers to
1155-- run. Since version 1.3.0, there are no finalizers in conduit.
1156--
1157-- Since 1.0.17
1158(=$$+-) :: Monad m => SealedConduitT i o m () -> ConduitT o Void m r -> ConduitT i Void m r
1159rsrc =$$+- sink = do
1160    (_, res) <- connectResumeConduit rsrc sink
1161    return res
1162{-# INLINE (=$$+-) #-}
1163
1164
1165infixr 0 =$$+
1166infixr 0 =$$++
1167infixr 0 =$$+-
1168
1169-- | Provides an alternative @Applicative@ instance for @ConduitT@. In this instance,
1170-- every incoming value is provided to all @ConduitT@s, and output is coalesced together.
1171-- Leftovers from individual @ConduitT@s will be used within that component, and then discarded
1172-- at the end of their computation. Output and finalizers will both be handled in a left-biased manner.
1173--
1174-- As an example, take the following program:
1175--
1176-- @
1177-- main :: IO ()
1178-- main = do
1179--     let src = mapM_ yield [1..3 :: Int]
1180--         conduit1 = CL.map (+1)
1181--         conduit2 = CL.concatMap (replicate 2)
1182--         conduit = getZipConduit $ ZipConduit conduit1 <* ZipConduit conduit2
1183--         sink = CL.mapM_ print
1184--     src $$ conduit =$ sink
1185-- @
1186--
1187-- It will produce the output: 2, 1, 1, 3, 2, 2, 4, 3, 3
1188--
1189-- Since 1.0.17
1190newtype ZipConduit i o m r = ZipConduit { getZipConduit :: ConduitT i o m r }
1191    deriving Functor
1192instance Monad m => Applicative (ZipConduit i o m) where
1193    pure = ZipConduit . pure
1194    ZipConduit left <*> ZipConduit right = ZipConduit (zipConduitApp left right)
1195
1196-- | Provide identical input to all of the @Conduit@s and combine their outputs
1197-- into a single stream.
1198--
1199-- Implemented on top of @ZipConduit@, see that data type for more details.
1200--
1201-- Since 1.0.17
1202sequenceConduits :: (Traversable f, Monad m) => f (ConduitT i o m r) -> ConduitT i o m (f r)
1203sequenceConduits = getZipConduit . sequenceA . fmap ZipConduit
1204
1205-- | Fuse two @ConduitT@s together, and provide the return value of both. Note
1206-- that this will force the entire upstream @ConduitT@ to be run to produce the
1207-- result value, even if the downstream terminates early.
1208--
1209-- Since 1.1.5
1210fuseBoth :: Monad m => ConduitT a b m r1 -> ConduitT b c m r2 -> ConduitT a c m (r1, r2)
1211fuseBoth (ConduitT up) (ConduitT down) =
1212    ConduitT (pipeL (up Done) (withUpstream $ generalizeUpstream $ down Done) >>=)
1213{-# INLINE fuseBoth #-}
1214
1215-- | Like 'fuseBoth', but does not force consumption of the @Producer@.
1216-- In the case that the @Producer@ terminates, the result value is
1217-- provided as a @Just@ value. If it does not terminate, then a
1218-- @Nothing@ value is returned.
1219--
1220-- One thing to note here is that "termination" here only occurs if the
1221-- @Producer@ actually yields a @Nothing@ value. For example, with the
1222-- @Producer@ @mapM_ yield [1..5]@, if five values are requested, the
1223-- @Producer@ has not yet terminated. Termination only occurs when the
1224-- sixth value is awaited for and the @Producer@ signals termination.
1225--
1226-- Since 1.2.4
1227fuseBothMaybe
1228    :: Monad m
1229    => ConduitT a b m r1
1230    -> ConduitT b c m r2
1231    -> ConduitT a c m (Maybe r1, r2)
1232fuseBothMaybe (ConduitT up) (ConduitT down) =
1233    ConduitT (pipeL (up Done) (go Nothing $ down Done) >>=)
1234  where
1235    go mup (Done r) = Done (mup, r)
1236    go mup (PipeM mp) = PipeM $ liftM (go mup) mp
1237    go mup (HaveOutput p o) = HaveOutput (go mup p) o
1238    go _ (NeedInput p c) = NeedInput
1239        (\i -> go Nothing (p i))
1240        (\u -> go (Just u) (c ()))
1241    go mup (Leftover p i) = Leftover (go mup p) i
1242{-# INLINABLE fuseBothMaybe #-}
1243
1244-- | Same as @fuseBoth@, but ignore the return value from the downstream
1245-- @Conduit@. Same caveats of forced consumption apply.
1246--
1247-- Since 1.1.5
1248fuseUpstream :: Monad m => ConduitT a b m r -> Conduit b m c -> ConduitT a c m r
1249fuseUpstream up down = fmap fst (fuseBoth up down)
1250{-# INLINE fuseUpstream #-}
1251
1252-- Rewrite rules
1253
1254{- FIXME
1255{-# RULES "conduit: ConduitT: lift x >>= f" forall m f. lift m >>= f = ConduitT (PipeM (liftM (unConduitT . f) m)) #-}
1256{-# RULES "conduit: ConduitT: lift x >> f" forall m f. lift m >> f = ConduitT (PipeM (liftM (\_ -> unConduitT f) m)) #-}
1257
1258{-# RULES "conduit: ConduitT: liftIO x >>= f" forall m (f :: MonadIO m => a -> ConduitT i o m r). liftIO m >>= f = ConduitT (PipeM (liftM (unConduitT . f) (liftIO m))) #-}
1259{-# RULES "conduit: ConduitT: liftIO x >> f" forall m (f :: MonadIO m => ConduitT i o m r). liftIO m >> f = ConduitT (PipeM (liftM (\_ -> unConduitT f) (liftIO m))) #-}
1260
1261{-# RULES "conduit: ConduitT: liftBase x >>= f" forall m (f :: MonadBase b m => a -> ConduitT i o m r). liftBase m >>= f = ConduitT (PipeM (liftM (unConduitT . f) (liftBase m))) #-}
1262{-# RULES "conduit: ConduitT: liftBase x >> f" forall m (f :: MonadBase b m => ConduitT i o m r). liftBase m >> f = ConduitT (PipeM (liftM (\_ -> unConduitT f) (liftBase m))) #-}
1263
1264{-# RULES
1265    "yield o >> p" forall o (p :: ConduitT i o m r). yield o >> p = ConduitT (HaveOutput (unConduitT p) o)
1266  ; "when yield next" forall b o p. when b (yield o) >> p =
1267        if b then ConduitT (HaveOutput (unConduitT p) o) else p
1268  ; "unless yield next" forall b o p. unless b (yield o) >> p =
1269        if b then p else ConduitT (HaveOutput (unConduitT p) o)
1270  ; "lift m >>= yield" forall m. lift m >>= yield = yieldM m
1271   #-}
1272{-# RULES "conduit: leftover l >> p" forall l (p :: ConduitT i o m r). leftover l >> p =
1273    ConduitT (Leftover (unConduitT p) l) #-}
1274    -}
1275
1276-- | Run a pure pipeline until processing completes, i.e. a pipeline
1277-- with @Identity@ as the base monad. This is equivalient to
1278-- @runIdentity . runConduit@.
1279--
1280-- @since 1.2.8
1281runConduitPure :: ConduitT () Void Identity r -> r
1282runConduitPure = runIdentity . runConduit
1283{-# INLINE runConduitPure #-}
1284
1285-- | Run a pipeline which acquires resources with @ResourceT@, and
1286-- then run the @ResourceT@ transformer. This is equivalent to
1287-- @runResourceT . runConduit@.
1288--
1289-- @since 1.2.8
1290runConduitRes :: MonadUnliftIO m
1291              => ConduitT () Void (ResourceT m) r
1292              -> m r
1293runConduitRes = runResourceT . runConduit
1294{-# INLINE runConduitRes #-}
1295