1{-# OPTIONS_HADDOCK not-home #-}
2{-# LANGUAGE DeriveFunctor #-}
3{-# LANGUAGE FlexibleInstances #-}
4{-# LANGUAGE FlexibleContexts #-}
5{-# LANGUAGE CPP #-}
6{-# LANGUAGE MultiParamTypeClasses #-}
7{-# LANGUAGE UndecidableInstances #-}
8{-# LANGUAGE RankNTypes #-}
9{-# LANGUAGE TupleSections #-}
10{-# LANGUAGE Trustworthy #-}
11{-# LANGUAGE TypeFamilies #-}
12module Data.Conduit.Internal.Conduit
13    ( -- ** Types
14      ConduitT (..)
15    , ConduitM
16    , Source
17    , Producer
18    , Sink
19    , Consumer
20    , Conduit
21    , Flush (..)
22      -- *** Newtype wrappers
23    , ZipSource (..)
24    , ZipSink (..)
25    , ZipConduit (..)
26      -- ** Sealed
27    , SealedConduitT (..)
28    , sealConduitT
29    , unsealConduitT
30      -- ** Primitives
31    , await
32    , awaitForever
33    , yield
34    , yieldM
35    , leftover
36    , runConduit
37    , runConduitPure
38    , runConduitRes
39    , fuse
40    , connect
41    , unconsM
42    , unconsEitherM
43      -- ** Composition
44    , connectResume
45    , connectResumeConduit
46    , fuseLeftovers
47    , fuseReturnLeftovers
48    , ($$+)
49    , ($$++)
50    , ($$+-)
51    , ($=+)
52    , (=$$+)
53    , (=$$++)
54    , (=$$+-)
55    , ($$)
56    , ($=)
57    , (=$)
58    , (=$=)
59    , (.|)
60      -- ** Generalizing
61    , sourceToPipe
62    , sinkToPipe
63    , conduitToPipe
64    , toProducer
65    , toConsumer
66      -- ** Cleanup
67    , bracketP
68      -- ** Exceptions
69    , catchC
70    , handleC
71    , tryC
72      -- ** Utilities
73    , Data.Conduit.Internal.Conduit.transPipe
74    , Data.Conduit.Internal.Conduit.mapOutput
75    , Data.Conduit.Internal.Conduit.mapOutputMaybe
76    , Data.Conduit.Internal.Conduit.mapInput
77    , Data.Conduit.Internal.Conduit.mapInputM
78    , zipSinks
79    , zipSources
80    , zipSourcesApp
81    , zipConduitApp
82    , mergeSource
83    , passthroughSink
84    , sourceToList
85    , fuseBoth
86    , fuseBothMaybe
87    , fuseUpstream
88    , sequenceSources
89    , sequenceSinks
90    , sequenceConduits
91    ) where
92
93import Control.Applicative (Applicative (..))
94import Control.Exception (Exception)
95import qualified Control.Exception as E (catch)
96import Control.Monad (liftM, liftM2, ap)
97import Control.Monad.Fail(MonadFail(..))
98import Control.Monad.Error.Class(MonadError(..))
99import Control.Monad.Reader.Class(MonadReader(..))
100import Control.Monad.RWS.Class(MonadRWS())
101import Control.Monad.Writer.Class(MonadWriter(..), censor)
102import Control.Monad.State.Class(MonadState(..))
103import Control.Monad.Trans.Class (MonadTrans (lift))
104import Control.Monad.IO.Unlift (MonadIO (liftIO), MonadUnliftIO, withRunInIO)
105import Control.Monad.Primitive (PrimMonad, PrimState, primitive)
106import Data.Functor.Identity (Identity, runIdentity)
107import Data.Void (Void, absurd)
108import Data.Monoid (Monoid (mappend, mempty))
109import Data.Semigroup (Semigroup ((<>)))
110import Control.Monad.Trans.Resource
111import Data.Conduit.Internal.Pipe hiding (yield, mapOutput, leftover, yieldM, await, awaitForever, bracketP, unconsM, unconsEitherM)
112import qualified Data.Conduit.Internal.Pipe as CI
113import Control.Monad (forever)
114import Data.Traversable (Traversable (..))
115
116-- | Core datatype of the conduit package. This type represents a general
117-- component which can consume a stream of input values @i@, produce a stream
118-- of output values @o@, perform actions in the @m@ monad, and produce a final
119-- result @r@. The type synonyms provided here are simply wrappers around this
120-- type.
121--
122-- Since 1.3.0
123newtype ConduitT i o m r = ConduitT
124    { unConduitT :: forall b.
125                    (r -> Pipe i i o () m b) -> Pipe i i o () m b
126    }
127
128-- | In order to provide for efficient monadic composition, the
129-- @ConduitT@ type is implemented internally using a technique known
130-- as the codensity transform. This allows for cheap appending, but
131-- makes one case much more expensive: partially running a @ConduitT@
132-- and that capturing the new state.
133--
134-- This data type is the same as @ConduitT@, but does not use the
135-- codensity transform technique.
136--
137-- @since 1.3.0
138newtype SealedConduitT i o m r = SealedConduitT (Pipe i i o () m r)
139
140-- | Same as 'ConduitT', for backwards compat
141type ConduitM = ConduitT
142
143instance Functor (ConduitT i o m) where
144    fmap f (ConduitT c) = ConduitT $ \rest -> c (rest . f)
145
146instance Applicative (ConduitT i o m) where
147    pure x = ConduitT ($ x)
148    {-# INLINE pure #-}
149    (<*>) = ap
150    {-# INLINE (<*>) #-}
151
152instance Monad (ConduitT i o m) where
153    return = pure
154    ConduitT f >>= g = ConduitT $ \h -> f $ \a -> unConduitT (g a) h
155
156-- | @since 1.3.1
157instance MonadFail m => MonadFail (ConduitT i o m) where
158    fail = lift . Control.Monad.Fail.fail
159
160instance MonadThrow m => MonadThrow (ConduitT i o m) where
161    throwM = lift . throwM
162
163instance MonadIO m => MonadIO (ConduitT i o m) where
164    liftIO = lift . liftIO
165    {-# INLINE liftIO #-}
166
167instance MonadReader r m => MonadReader r (ConduitT i o m) where
168    ask = lift ask
169    {-# INLINE ask #-}
170
171    local f (ConduitT c0) = ConduitT $ \rest ->
172        let go (HaveOutput p o) = HaveOutput (go p) o
173            go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u))
174            go (Done x) = rest x
175            go (PipeM mp) = PipeM (liftM go $ local f mp)
176            go (Leftover p i) = Leftover (go p) i
177         in go (c0 Done)
178
179#ifndef MIN_VERSION_mtl
180#define MIN_VERSION_mtl(x, y, z) 0
181#endif
182
183instance MonadWriter w m => MonadWriter w (ConduitT i o m) where
184#if MIN_VERSION_mtl(2, 1, 0)
185    writer = lift . writer
186#endif
187    tell = lift . tell
188
189    listen (ConduitT c0) = ConduitT $ \rest ->
190        let go front (HaveOutput p o) = HaveOutput (go front p) o
191            go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u))
192            go front (Done x) = rest (x, front)
193            go front (PipeM mp) = PipeM $ do
194                (p,w) <- listen mp
195                return $ go (front `mappend` w) p
196            go front (Leftover p i) = Leftover (go front p) i
197         in go mempty (c0 Done)
198
199    pass (ConduitT c0) = ConduitT $ \rest ->
200        let go front (HaveOutput p o) = HaveOutput (go front p) o
201            go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u))
202            go front (PipeM mp) = PipeM $ do
203                (p,w) <- censor (const mempty) (listen mp)
204                return $ go (front `mappend` w) p
205            go front (Done (x,f)) = PipeM $ do
206                tell (f front)
207                return $ rest x
208            go front (Leftover p i) = Leftover (go front p) i
209         in go mempty (c0 Done)
210
211instance MonadState s m => MonadState s (ConduitT i o m) where
212    get = lift get
213    put = lift . put
214#if MIN_VERSION_mtl(2, 1, 0)
215    state = lift . state
216#endif
217
218instance MonadRWS r w s m => MonadRWS r w s (ConduitT i o m)
219
220instance MonadError e m => MonadError e (ConduitT i o m) where
221    throwError = lift . throwError
222    catchError (ConduitT c0) f = ConduitT $ \rest ->
223        let go (HaveOutput p o) = HaveOutput (go p) o
224            go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u))
225            go (Done x) = rest x
226            go (PipeM mp) =
227              PipeM $ catchError (liftM go mp) $ \e -> do
228                return $ unConduitT (f e) rest
229            go (Leftover p i) = Leftover (go p) i
230         in go (c0 Done)
231
232instance MonadTrans (ConduitT i o) where
233    lift mr = ConduitT $ \rest -> PipeM (liftM rest mr)
234    {-# INLINE [1] lift #-}
235
236instance MonadResource m => MonadResource (ConduitT i o m) where
237    liftResourceT = lift . liftResourceT
238    {-# INLINE liftResourceT #-}
239
240instance Monad m => Semigroup (ConduitT i o m ()) where
241    (<>) = (>>)
242    {-# INLINE (<>) #-}
243
244instance Monad m => Monoid (ConduitT i o m ()) where
245    mempty = return ()
246    {-# INLINE mempty #-}
247#if !(MIN_VERSION_base(4,11,0))
248    mappend = (<>)
249    {-# INLINE mappend #-}
250#endif
251
252instance PrimMonad m => PrimMonad (ConduitT i o m) where
253  type PrimState (ConduitT i o m) = PrimState m
254  primitive = lift . primitive
255
256-- | Provides a stream of output values, without consuming any input or
257-- producing a final result.
258--
259-- Since 0.5.0
260type Source m o = ConduitT () o m ()
261{-# DEPRECATED Source "Use ConduitT directly" #-}
262
263-- | A component which produces a stream of output values, regardless of the
264-- input stream. A @Producer@ is a generalization of a @Source@, and can be
265-- used as either a @Source@ or a @Conduit@.
266--
267-- Since 1.0.0
268type Producer m o = forall i. ConduitT i o m ()
269{-# DEPRECATED Producer "Use ConduitT directly" #-}
270
271-- | Consumes a stream of input values and produces a final result, without
272-- producing any output.
273--
274-- > type Sink i m r = ConduitT i Void m r
275--
276-- Since 0.5.0
277type Sink i = ConduitT i Void
278{-# DEPRECATED Sink "Use ConduitT directly" #-}
279
280-- | A component which consumes a stream of input values and produces a final
281-- result, regardless of the output stream. A @Consumer@ is a generalization of
282-- a @Sink@, and can be used as either a @Sink@ or a @Conduit@.
283--
284-- Since 1.0.0
285type Consumer i m r = forall o. ConduitT i o m r
286{-# DEPRECATED Consumer "Use ConduitT directly" #-}
287
288-- | Consumes a stream of input values and produces a stream of output values,
289-- without producing a final result.
290--
291-- Since 0.5.0
292type Conduit i m o = ConduitT i o m ()
293{-# DEPRECATED Conduit "Use ConduitT directly" #-}
294
295sealConduitT :: ConduitT i o m r -> SealedConduitT i o m r
296sealConduitT (ConduitT f) = SealedConduitT (f Done)
297
298unsealConduitT :: Monad m => SealedConduitT i o m r -> ConduitT i o m r
299unsealConduitT (SealedConduitT f) = ConduitT (f >>=)
300
301-- | Connect a @Source@ to a @Sink@ until the latter closes. Returns both the
302-- most recent state of the @Source@ and the result of the @Sink@.
303--
304-- Since 0.5.0
305connectResume :: Monad m
306              => SealedConduitT () a m ()
307              -> ConduitT a Void m r
308              -> m (SealedConduitT () a m (), r)
309connectResume (SealedConduitT left0) (ConduitT right0) =
310    goRight left0 (right0 Done)
311  where
312    goRight left right =
313        case right of
314            HaveOutput _ o   -> absurd o
315            NeedInput rp rc  -> goLeft rp rc left
316            Done r2          -> return (SealedConduitT left, r2)
317            PipeM mp         -> mp >>= goRight left
318            Leftover p i     -> goRight (HaveOutput left i) p
319
320    goLeft rp rc left =
321        case left of
322            HaveOutput left' o            -> goRight left' (rp o)
323            NeedInput _ lc                -> recurse (lc ())
324            Done ()                       -> goRight (Done ()) (rc ())
325            PipeM mp                      -> mp >>= recurse
326            Leftover p ()                 -> recurse p
327      where
328        recurse = goLeft rp rc
329
330sourceToPipe :: Monad m => Source m o -> Pipe l i o u m ()
331sourceToPipe =
332    go . (`unConduitT` Done)
333  where
334    go (HaveOutput p o) = HaveOutput (go p) o
335    go (NeedInput _ c) = go $ c ()
336    go (Done ()) = Done ()
337    go (PipeM mp) = PipeM (liftM go mp)
338    go (Leftover p ()) = go p
339
340sinkToPipe :: Monad m => Sink i m r -> Pipe l i o u m r
341sinkToPipe =
342    go . injectLeftovers . (`unConduitT` Done)
343  where
344    go (HaveOutput _ o) = absurd o
345    go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ())
346    go (Done r) = Done r
347    go (PipeM mp) = PipeM (liftM go mp)
348    go (Leftover _ l) = absurd l
349
350conduitToPipe :: Monad m => Conduit i m o -> Pipe l i o u m ()
351conduitToPipe =
352    go . injectLeftovers . (`unConduitT` Done)
353  where
354    go (HaveOutput p o) = HaveOutput (go p) o
355    go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ())
356    go (Done ()) = Done ()
357    go (PipeM mp) = PipeM (liftM go mp)
358    go (Leftover _ l) = absurd l
359
360-- | Generalize a 'Source' to a 'Producer'.
361--
362-- Since 1.0.0
363toProducer :: Monad m => Source m a -> Producer m a
364toProducer (ConduitT c0) = ConduitT $ \rest -> let
365    go (HaveOutput p o) = HaveOutput (go p) o
366    go (NeedInput _ c) = go (c ())
367    go (Done r) = rest r
368    go (PipeM mp) = PipeM (liftM go mp)
369    go (Leftover p ()) = go p
370    in go (c0 Done)
371
372-- | Generalize a 'Sink' to a 'Consumer'.
373--
374-- Since 1.0.0
375toConsumer :: Monad m => Sink a m b -> Consumer a m b
376toConsumer (ConduitT c0) = ConduitT $ \rest -> let
377    go (HaveOutput _ o) = absurd o
378    go (NeedInput p c) = NeedInput (go . p) (go . c)
379    go (Done r) = rest r
380    go (PipeM mp) = PipeM (liftM go mp)
381    go (Leftover p l) = Leftover (go p) l
382    in go (c0 Done)
383
384-- | Catch all exceptions thrown by the current component of the pipeline.
385--
386-- Note: this will /not/ catch exceptions thrown by other components! For
387-- example, if an exception is thrown in a @Source@ feeding to a @Sink@, and
388-- the @Sink@ uses @catchC@, the exception will /not/ be caught.
389--
390-- Due to this behavior (as well as lack of async exception safety), you
391-- should not try to implement combinators such as @onException@ in terms of this
392-- primitive function.
393--
394-- Note also that the exception handling will /not/ be applied to any
395-- finalizers generated by this conduit.
396--
397-- Since 1.0.11
398catchC :: (MonadUnliftIO m, Exception e)
399       => ConduitT i o m r
400       -> (e -> ConduitT i o m r)
401       -> ConduitT i o m r
402catchC (ConduitT p0) onErr = ConduitT $ \rest -> let
403    go (Done r) = rest r
404    go (PipeM mp) = PipeM $ withRunInIO $ \run -> E.catch (run (liftM go mp))
405        (return . (`unConduitT`rest) . onErr)
406    go (Leftover p i) = Leftover (go p) i
407    go (NeedInput x y) = NeedInput (go . x) (go . y)
408    go (HaveOutput p o) = HaveOutput (go p) o
409    in go (p0 Done)
410{-# INLINE catchC #-}
411
412-- | The same as @flip catchC@.
413--
414-- Since 1.0.11
415handleC :: (MonadUnliftIO m, Exception e)
416        => (e -> ConduitT i o m r)
417        -> ConduitT i o m r
418        -> ConduitT i o m r
419handleC = flip catchC
420{-# INLINE handleC #-}
421
422-- | A version of @try@ for use within a pipeline. See the comments in @catchC@
423-- for more details.
424--
425-- Since 1.0.11
426tryC :: (MonadUnliftIO m, Exception e)
427     => ConduitT i o m r
428     -> ConduitT i o m (Either e r)
429tryC c = fmap Right c `catchC` (return . Left)
430{-# INLINE tryC #-}
431
432-- | Combines two sinks. The new sink will complete when both input sinks have
433--   completed.
434--
435-- Any leftovers are discarded.
436--
437-- Since 0.4.1
438zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r')
439zipSinks (ConduitT x0) (ConduitT y0) = ConduitT $ \rest -> let
440    Leftover _  i    >< _                = absurd i
441    _                >< Leftover _  i    = absurd i
442    HaveOutput _ o   >< _                = absurd o
443    _                >< HaveOutput _ o   = absurd o
444
445    PipeM mx         >< y                = PipeM (liftM (>< y) mx)
446    x                >< PipeM my         = PipeM (liftM (x ><) my)
447    Done x           >< Done y           = rest (x, y)
448    NeedInput px cx  >< NeedInput py cy  = NeedInput (\i -> px i >< py i) (\() -> cx () >< cy ())
449    NeedInput px cx  >< y@Done{}         = NeedInput (\i -> px i >< y)    (\u -> cx u >< y)
450    x@Done{}         >< NeedInput py cy  = NeedInput (\i -> x >< py i)    (\u -> x >< cy u)
451    in injectLeftovers (x0 Done) >< injectLeftovers (y0 Done)
452
453-- | Combines two sources. The new source will stop producing once either
454--   source has been exhausted.
455--
456-- Since 1.0.13
457zipSources :: Monad m => Source m a -> Source m b -> Source m (a, b)
458zipSources (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
459    go (Leftover left ()) right = go left right
460    go left (Leftover right ())  = go left right
461    go (Done ()) (Done ()) = rest ()
462    go (Done ()) (HaveOutput _ _) = rest ()
463    go (HaveOutput _ _) (Done ()) = rest ()
464    go (Done ()) (PipeM _) = rest ()
465    go (PipeM _) (Done ()) = rest ()
466    go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my)
467    go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx)
468    go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my)
469    go (HaveOutput srcx x) (HaveOutput srcy y) = HaveOutput (go srcx srcy) (x, y)
470    go (NeedInput _ c) right = go (c ()) right
471    go left (NeedInput _ c) = go left (c ())
472    in go (left0 Done) (right0 Done)
473
474-- | Combines two sources. The new source will stop producing once either
475--   source has been exhausted.
476--
477-- Since 1.0.13
478zipSourcesApp :: Monad m => Source m (a -> b) -> Source m a -> Source m b
479zipSourcesApp (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
480    go (Leftover left ()) right = go left right
481    go left (Leftover right ())  = go left right
482    go (Done ()) (Done ()) = rest ()
483    go (Done ()) (HaveOutput _ _) = rest ()
484    go (HaveOutput _ _) (Done ()) = rest ()
485    go (Done ()) (PipeM _) = rest ()
486    go (PipeM _) (Done ()) = rest ()
487    go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my)
488    go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx)
489    go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my)
490    go (HaveOutput srcx x) (HaveOutput srcy y) = HaveOutput (go srcx srcy) (x y)
491    go (NeedInput _ c) right = go (c ()) right
492    go left (NeedInput _ c) = go left (c ())
493    in go (left0 Done) (right0 Done)
494
495-- |
496--
497-- Since 1.0.17
498zipConduitApp
499    :: Monad m
500    => ConduitT i o m (x -> y)
501    -> ConduitT i o m x
502    -> ConduitT i o m y
503zipConduitApp (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
504    go (Done f) (Done x) = rest (f x)
505    go (PipeM mx) y = PipeM (flip go y `liftM` mx)
506    go x (PipeM my) = PipeM (go x `liftM` my)
507    go (HaveOutput x o) y = HaveOutput (go x y) o
508    go x (HaveOutput y o) = HaveOutput (go x y) o
509    go (Leftover _ i) _ = absurd i
510    go _ (Leftover _ i) = absurd i
511    go (NeedInput px cx) (NeedInput py cy) = NeedInput
512        (\i -> go (px i) (py i))
513        (\u -> go (cx u) (cy u))
514    go (NeedInput px cx) (Done y) = NeedInput
515        (\i -> go (px i) (Done y))
516        (\u -> go (cx u) (Done y))
517    go (Done x) (NeedInput py cy) = NeedInput
518        (\i -> go (Done x) (py i))
519        (\u -> go (Done x) (cy u))
520  in go (injectLeftovers $ left0 Done) (injectLeftovers $ right0 Done)
521
522-- | Same as normal fusion (e.g. @=$=@), except instead of discarding leftovers
523-- from the downstream component, return them.
524--
525-- Since 1.0.17
526fuseReturnLeftovers :: Monad m
527                    => ConduitT a b m ()
528                    -> ConduitT b c m r
529                    -> ConduitT a c m (r, [b])
530fuseReturnLeftovers (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
531    goRight bs left right =
532        case right of
533            HaveOutput p o -> HaveOutput (recurse p) o
534            NeedInput rp rc  ->
535                case bs of
536                    [] -> goLeft rp rc left
537                    b:bs' -> goRight bs' left (rp b)
538            Done r2          -> rest (r2, bs)
539            PipeM mp         -> PipeM (liftM recurse mp)
540            Leftover p b     -> goRight (b:bs) left p
541      where
542        recurse = goRight bs left
543
544    goLeft rp rc left =
545        case left of
546            HaveOutput left' o        -> goRight [] left' (rp o)
547            NeedInput left' lc        -> NeedInput (recurse . left') (recurse . lc)
548            Done r1                   -> goRight [] (Done r1) (rc r1)
549            PipeM mp                  -> PipeM (liftM recurse mp)
550            Leftover left' i          -> Leftover (recurse left') i
551      where
552        recurse = goLeft rp rc
553    in goRight [] (left0 Done) (right0 Done)
554
555-- | Similar to @fuseReturnLeftovers@, but use the provided function to convert
556-- downstream leftovers to upstream leftovers.
557--
558-- Since 1.0.17
559fuseLeftovers
560    :: Monad m
561    => ([b] -> [a])
562    -> ConduitT a b m ()
563    -> ConduitT b c m r
564    -> ConduitT a c m r
565fuseLeftovers f left right = do
566    (r, bs) <- fuseReturnLeftovers left right
567    mapM_ leftover $ reverse $ f bs
568    return r
569
570-- | Connect a 'Conduit' to a sink and return the output of the sink
571-- together with a new 'Conduit'.
572--
573-- Since 1.0.17
574connectResumeConduit
575    :: Monad m
576    => SealedConduitT i o m ()
577    -> ConduitT o Void m r
578    -> ConduitT i Void m (SealedConduitT i o m (), r)
579connectResumeConduit (SealedConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
580    goRight left right =
581        case right of
582            HaveOutput _ o -> absurd o
583            NeedInput rp rc -> goLeft rp rc left
584            Done r2 -> rest (SealedConduitT left, r2)
585            PipeM mp -> PipeM (liftM (goRight left) mp)
586            Leftover p i -> goRight (HaveOutput left i) p
587
588    goLeft rp rc left =
589        case left of
590            HaveOutput left' o -> goRight left' (rp o)
591            NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
592            Done () -> goRight (Done ()) (rc ())
593            PipeM mp -> PipeM (liftM recurse mp)
594            Leftover left' i -> Leftover (recurse left') i -- recurse p
595      where
596        recurse = goLeft rp rc
597    in goRight left0 (right0 Done)
598
599-- | Merge a @Source@ into a @Conduit@.
600-- The new conduit will stop processing once either source or upstream have been exhausted.
601mergeSource
602  :: Monad m
603  => Source m i
604  -> Conduit a m (i, a)
605mergeSource = loop . sealConduitT
606  where
607    loop :: Monad m => SealedConduitT () i m () -> Conduit a m (i, a)
608    loop src0 = await >>= maybe (return ()) go
609      where
610        go a = do
611          (src1, mi) <- lift $ src0 $$++ await
612          case mi of
613            Nothing -> return ()
614            Just i  -> yield (i, a) >> loop src1
615
616
617-- | Turn a @Sink@ into a @Conduit@ in the following way:
618--
619-- * All input passed to the @Sink@ is yielded downstream.
620--
621-- * When the @Sink@ finishes processing, the result is passed to the provided to the finalizer function.
622--
623-- Note that the @Sink@ will stop receiving input as soon as the downstream it
624-- is connected to shuts down.
625--
626-- An example usage would be to write the result of a @Sink@ to some mutable
627-- variable while allowing other processing to continue.
628--
629-- Since 1.1.0
630passthroughSink :: Monad m
631                => Sink i m r
632                -> (r -> m ()) -- ^ finalizer
633                -> Conduit i m i
634passthroughSink (ConduitT sink0) final = ConduitT $ \rest -> let
635    -- A bit of explanation is in order, this function is
636    -- non-obvious. The purpose of go is to keep track of the sink
637    -- we're passing values to, and then yield values downstream. The
638    -- third argument to go is the current state of that sink. That's
639    -- relatively straightforward.
640    --
641    -- The second value is the leftover buffer. These are values that
642    -- the sink itself has called leftover on, and must be provided
643    -- back to the sink the next time it awaits. _However_, these
644    -- values should _not_ be reyielded downstream: we have already
645    -- yielded them downstream ourself, and it is the responsibility
646    -- of the functions wrapping around passthroughSink to handle the
647    -- leftovers from downstream.
648    --
649    -- The trickiest bit is the first argument, which is a solution to
650    -- bug https://github.com/snoyberg/conduit/issues/304. The issue
651    -- is that, once we get a value, we need to provide it to both the
652    -- inner sink _and_ yield it downstream. The obvious thing to do
653    -- is yield first and then recursively call go. Unfortunately,
654    -- this doesn't work in all cases: if the downstream component
655    -- never calls await again, our yield call will never return, and
656    -- our sink will not get the last value. This results is confusing
657    -- behavior where the sink and downstream component receive a
658    -- different number of values.
659    --
660    -- Solution: keep a buffer of the next value to yield downstream,
661    -- and only yield it downstream in one of two cases: our sink is
662    -- asking for another value, or our sink is done. This way, we
663    -- ensure that, in all cases, we pass exactly the same number of
664    -- values to the inner sink as to downstream.
665
666    go mbuf _ (Done r) = do
667        maybe (return ()) CI.yield mbuf
668        lift $ final r
669        unConduitT (awaitForever yield) rest
670    go mbuf is (Leftover sink i) = go mbuf (i:is) sink
671    go _ _ (HaveOutput _ o) = absurd o
672    go mbuf is (PipeM mx) = do
673        x <- lift mx
674        go mbuf is x
675    go mbuf (i:is) (NeedInput next _) = go mbuf is (next i)
676    go mbuf [] (NeedInput next done) = do
677        maybe (return ()) CI.yield mbuf
678        mx <- CI.await
679        case mx of
680            Nothing -> go Nothing [] (done ())
681            Just x -> go (Just x) [] (next x)
682    in go Nothing [] (sink0 Done)
683
684-- | Convert a @Source@ into a list. The basic functionality can be explained as:
685--
686-- > sourceToList src = src $$ Data.Conduit.List.consume
687--
688-- However, @sourceToList@ is able to produce its results lazily, which cannot
689-- be done when running a conduit pipeline in general. Unlike the
690-- @Data.Conduit.Lazy@ module (in conduit-extra), this function performs no
691-- unsafe I\/O operations, and therefore can only be as lazily as the
692-- underlying monad.
693--
694-- Since 1.2.6
695sourceToList :: Monad m => Source m a -> m [a]
696sourceToList =
697    go . (`unConduitT` Done)
698  where
699    go (Done _) = return []
700    go (HaveOutput src x) = liftM (x:) (go src)
701    go (PipeM msrc) = msrc >>= go
702    go (NeedInput _ c) = go (c ())
703    go (Leftover p _) = go p
704
705-- Define fixity of all our operators
706infixr 0 $$
707infixl 1 $=
708infixr 2 =$
709infixr 2 =$=
710infixr 0 $$+
711infixr 0 $$++
712infixr 0 $$+-
713infixl 1 $=+
714infixr 2 .|
715
716-- | Equivalent to using 'runConduit' and '.|' together.
717--
718-- Since 1.2.3
719connect :: Monad m
720        => ConduitT () a m ()
721        -> ConduitT a Void m r
722        -> m r
723connect = ($$)
724
725-- | Split a conduit into head and tail.
726--
727-- Note that you have to 'sealConduitT' it first.
728--
729-- Since 1.3.3
730unconsM :: Monad m
731        => SealedConduitT () o m ()
732        -> m (Maybe (o, SealedConduitT () o m ()))
733unconsM (SealedConduitT p) = go p
734  where
735    -- This function is the same as @Pipe.unconsM@ but it ignores leftovers.
736    go (HaveOutput p o) = pure $ Just (o, SealedConduitT p)
737    go (NeedInput _ c) = go $ c ()
738    go (Done ()) = pure Nothing
739    go (PipeM mp) = mp >>= go
740    go (Leftover p ()) = go p
741
742-- | Split a conduit into head and tail or return its result if it is done.
743--
744-- Note that you have to 'sealConduitT' it first.
745--
746-- Since 1.3.3
747unconsEitherM :: Monad m
748              => SealedConduitT () o m r
749              -> m (Either r (o, SealedConduitT () o m r))
750unconsEitherM (SealedConduitT p) = go p
751  where
752    -- This function is the same as @Pipe.unconsEitherM@ but it ignores leftovers.
753    go (HaveOutput p o) = pure $ Right (o, SealedConduitT p)
754    go (NeedInput _ c) = go $ c ()
755    go (Done r) = pure $ Left r
756    go (PipeM mp) = mp >>= go
757    go (Leftover p ()) = go p
758
759-- | Named function synonym for '.|'
760--
761-- Equivalent to '.|' and '=$='. However, the latter is
762-- deprecated and will be removed in a future version.
763--
764-- Since 1.2.3
765fuse :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r
766fuse = (=$=)
767
768-- | Combine two @Conduit@s together into a new @Conduit@ (aka 'fuse').
769--
770-- Output from the upstream (left) conduit will be fed into the
771-- downstream (right) conduit. Processing will terminate when
772-- downstream (right) returns.
773-- Leftover data returned from the right @Conduit@ will be discarded.
774--
775-- Equivalent to 'fuse' and '=$=', however the latter is deprecated and will
776-- be removed in a future version.
777--
778-- Note that, while this operator looks like categorical composition
779-- (from "Control.Category"), there are a few reasons it's different:
780--
781-- * The position of the type parameters to 'ConduitT' do not
782--   match. We would need to change @ConduitT i o m r@ to @ConduitT r
783--   m i o@, which would preclude a 'Monad' or 'MonadTrans' instance.
784--
785-- * The result value from upstream and downstream are allowed to
786--   differ between upstream and downstream. In other words, we would
787--   need the type signature here to look like @ConduitT a b m r ->
788--   ConduitT b c m r -> ConduitT a c m r@.
789--
790-- * Due to leftovers, we do not have a left identity in Conduit. This
791--   can be achieved with the underlying @Pipe@ datatype, but this is
792--   not generally recommended. See <https://stackoverflow.com/a/15263700>.
793--
794-- @since 1.2.8
795(.|) :: Monad m
796     => ConduitM a b m () -- ^ upstream
797     -> ConduitM b c m r -- ^ downstream
798     -> ConduitM a c m r
799(.|) = fuse
800{-# INLINE (.|) #-}
801
802-- | The connect operator, which pulls data from a source and pushes to a sink.
803-- If you would like to keep the @Source@ open to be used for other
804-- operations, use the connect-and-resume operator '$$+'.
805--
806-- Since 0.4.0
807($$) :: Monad m => Source m a -> Sink a m b -> m b
808src $$ sink = do
809    (rsrc, res) <- src $$+ sink
810    rsrc $$+- return ()
811    return res
812{-# INLINE [1] ($$) #-}
813{-# DEPRECATED ($$) "Use runConduit and .|" #-}
814
815-- | A synonym for '=$=' for backwards compatibility.
816--
817-- Since 0.4.0
818($=) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r
819($=) = (=$=)
820{-# INLINE [0] ($=) #-}
821{-# RULES "conduit: $= is =$=" ($=) = (=$=) #-}
822{-# DEPRECATED ($=) "Use .|" #-}
823
824-- | A synonym for '=$=' for backwards compatibility.
825--
826-- Since 0.4.0
827(=$) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r
828(=$) = (=$=)
829{-# INLINE [0] (=$) #-}
830{-# RULES "conduit: =$ is =$=" (=$) = (=$=) #-}
831{-# DEPRECATED (=$) "Use .|" #-}
832
833-- | Deprecated fusion operator.
834--
835-- Since 0.4.0
836(=$=) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r
837ConduitT left0 =$= ConduitT right0 = ConduitT $ \rest ->
838    let goRight left right =
839            case right of
840                HaveOutput p o    -> HaveOutput (recurse p) o
841                NeedInput rp rc   -> goLeft rp rc left
842                Done r2           -> rest r2
843                PipeM mp          -> PipeM (liftM recurse mp)
844                Leftover right' i -> goRight (HaveOutput left i) right'
845          where
846            recurse = goRight left
847
848        goLeft rp rc left =
849            case left of
850                HaveOutput left' o        -> goRight left' (rp o)
851                NeedInput left' lc        -> NeedInput (recurse . left') (recurse . lc)
852                Done r1                   -> goRight (Done r1) (rc r1)
853                PipeM mp                  -> PipeM (liftM recurse mp)
854                Leftover left' i          -> Leftover (recurse left') i
855          where
856            recurse = goLeft rp rc
857     in goRight (left0 Done) (right0 Done)
858{-# INLINE [1] (=$=) #-}
859{-# DEPRECATED (=$=) "Use .|" #-}
860
861-- | Wait for a single input value from upstream. If no data is available,
862-- returns @Nothing@. Once @await@ returns @Nothing@, subsequent calls will
863-- also return @Nothing@.
864--
865-- Since 0.5.0
866await :: Monad m => Consumer i m (Maybe i)
867await = ConduitT $ \f -> NeedInput (f . Just) (const $ f Nothing)
868{-# INLINE [0] await #-}
869
870await' :: Monad m
871       => ConduitT i o m r
872       -> (i -> ConduitT i o m r)
873       -> ConduitT i o m r
874await' f g = ConduitT $ \rest -> NeedInput
875    (\i -> unConduitT (g i) rest)
876    (const $ unConduitT f rest)
877{-# INLINE await' #-}
878{-# RULES "conduit: await >>= maybe" forall x y. await >>= maybe x y = await' x y #-}
879
880-- | Send a value downstream to the next component to consume. If the
881-- downstream component terminates, this call will never return control.
882--
883-- Since 0.5.0
884yield :: Monad m
885      => o -- ^ output value
886      -> ConduitT i o m ()
887yield o = ConduitT $ \rest -> HaveOutput (rest ()) o
888{-# INLINE yield #-}
889
890-- | Send a monadic value downstream for the next component to consume.
891--
892-- @since 1.2.7
893yieldM :: Monad m => m o -> ConduitT i o m ()
894yieldM mo = lift mo >>= yield
895{-# INLINE yieldM #-}
896
897  -- FIXME rule won't fire, see FIXME in .Pipe; "mapM_ yield" mapM_ yield = ConduitT . sourceList
898
899-- | Provide a single piece of leftover input to be consumed by the next
900-- component in the current monadic binding.
901--
902-- /Note/: it is highly encouraged to only return leftover values from input
903-- already consumed from upstream.
904--
905-- @since 0.5.0
906leftover :: i -> ConduitT i o m ()
907leftover i = ConduitT $ \rest -> Leftover (rest ()) i
908{-# INLINE leftover #-}
909
910-- | Run a pipeline until processing completes.
911--
912-- Since 1.2.1
913runConduit :: Monad m => ConduitT () Void m r -> m r
914runConduit (ConduitT p) = runPipe $ injectLeftovers $ p Done
915{-# INLINE [0] runConduit #-}
916
917-- | Bracket a conduit computation between allocation and release of a
918-- resource. Two guarantees are given about resource finalization:
919--
920-- 1. It will be /prompt/. The finalization will be run as early as possible.
921--
922-- 2. It is exception safe. Due to usage of @resourcet@, the finalization will
923-- be run in the event of any exceptions.
924--
925-- Since 0.5.0
926bracketP :: MonadResource m
927
928         => IO a
929            -- ^ computation to run first (\"acquire resource\")
930         -> (a -> IO ())
931            -- ^ computation to run last (\"release resource\")
932         -> (a -> ConduitT i o m r)
933            -- ^ computation to run in-between
934         -> ConduitT i o m r
935            -- returns the value from the in-between computation
936bracketP alloc free inside = ConduitT $ \rest -> do
937  (key, seed) <- allocate alloc free
938  unConduitT (inside seed) $ \res -> do
939    release key
940    rest res
941
942-- | Wait for input forever, calling the given inner component for each piece of
943-- new input.
944--
945-- This function is provided as a convenience for the common pattern of
946-- @await@ing input, checking if it's @Just@ and then looping.
947--
948-- Since 0.5.0
949awaitForever :: Monad m => (i -> ConduitT i o m r) -> ConduitT i o m ()
950awaitForever f = ConduitT $ \rest ->
951    let go = NeedInput (\i -> unConduitT (f i) (const go)) rest
952     in go
953
954-- | Transform the monad that a @ConduitT@ lives in.
955--
956-- Note that the monad transforming function will be run multiple times,
957-- resulting in unintuitive behavior in some cases. For a fuller treatment,
958-- please see:
959--
960-- <https://github.com/snoyberg/conduit/wiki/Dealing-with-monad-transformers>
961--
962-- Since 0.4.0
963transPipe :: Monad m => (forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
964transPipe f (ConduitT c0) = ConduitT $ \rest -> let
965        go (HaveOutput p o) = HaveOutput (go p) o
966        go (NeedInput p c) = NeedInput (go . p) (go . c)
967        go (Done r) = rest r
968        go (PipeM mp) =
969            PipeM (f $ liftM go $ collapse mp)
970          where
971            -- Combine a series of monadic actions into a single action.  Since we
972            -- throw away side effects between different actions, an arbitrary break
973            -- between actions will lead to a violation of the monad transformer laws.
974            -- Example available at:
975            --
976            -- http://hpaste.org/75520
977            collapse mpipe = do
978                pipe' <- mpipe
979                case pipe' of
980                    PipeM mpipe' -> collapse mpipe'
981                    _ -> return pipe'
982        go (Leftover p i) = Leftover (go p) i
983        in go (c0 Done)
984
985-- | Apply a function to all the output values of a @ConduitT@.
986--
987-- This mimics the behavior of `fmap` for a `Source` and `Conduit` in pre-0.4
988-- days. It can also be simulated by fusing with the @map@ conduit from
989-- "Data.Conduit.List".
990--
991-- Since 0.4.1
992mapOutput :: Monad m => (o1 -> o2) -> ConduitT i o1 m r -> ConduitT i o2 m r
993mapOutput f (ConduitT c0) = ConduitT $ \rest -> let
994    go (HaveOutput p o) = HaveOutput (go p) (f o)
995    go (NeedInput p c) = NeedInput (go . p) (go . c)
996    go (Done r) = rest r
997    go (PipeM mp) = PipeM (liftM (go) mp)
998    go (Leftover p i) = Leftover (go p) i
999    in go (c0 Done)
1000
1001-- | Same as 'mapOutput', but use a function that returns @Maybe@ values.
1002--
1003-- Since 0.5.0
1004mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> ConduitT i o1 m r -> ConduitT i o2 m r
1005mapOutputMaybe f (ConduitT c0) = ConduitT $ \rest -> let
1006    go (HaveOutput p o) = maybe id (\o' p' -> HaveOutput p' o') (f o) (go p)
1007    go (NeedInput p c) = NeedInput (go . p) (go . c)
1008    go (Done r) = rest r
1009    go (PipeM mp) = PipeM (liftM (go) mp)
1010    go (Leftover p i) = Leftover (go p) i
1011    in go (c0 Done)
1012
1013-- | Apply a function to all the input values of a @ConduitT@.
1014--
1015-- Since 0.5.0
1016mapInput :: Monad m
1017         => (i1 -> i2) -- ^ map initial input to new input
1018         -> (i2 -> Maybe i1) -- ^ map new leftovers to initial leftovers
1019         -> ConduitT i2 o m r
1020         -> ConduitT i1 o m r
1021mapInput f f' (ConduitT c0) = ConduitT $ \rest -> let
1022    go (HaveOutput p o) = HaveOutput (go p) o
1023    go (NeedInput p c) = NeedInput (go . p . f) (go . c)
1024    go (Done r) = rest r
1025    go (PipeM mp) = PipeM $ liftM go mp
1026    go (Leftover p i) = maybe id (flip Leftover) (f' i) (go p)
1027    in go (c0 Done)
1028
1029-- | Apply a monadic action to all the input values of a @ConduitT@.
1030--
1031-- Since 1.3.2
1032mapInputM :: Monad m
1033          => (i1 -> m i2) -- ^ map initial input to new input
1034          -> (i2 -> m (Maybe i1)) -- ^ map new leftovers to initial leftovers
1035          -> ConduitT i2 o m r
1036          -> ConduitT i1 o m r
1037mapInputM f f' (ConduitT c0) = ConduitT $ \rest -> let
1038    go (HaveOutput p o) = HaveOutput (go p) o
1039    go (NeedInput p c)  = NeedInput (\i -> PipeM $ go . p <$> f i) (go . c)
1040    go (Done r)         = rest r
1041    go (PipeM mp)       = PipeM $ fmap go mp
1042    go (Leftover p i)   = PipeM $ (\x -> maybe id (flip Leftover) x (go p)) <$> f' i
1043    in go (c0 Done)
1044
1045-- | The connect-and-resume operator. This does not close the @Source@, but
1046-- instead returns it to be used again. This allows a @Source@ to be used
1047-- incrementally in a large program, without forcing the entire program to live
1048-- in the @Sink@ monad.
1049--
1050-- Mnemonic: connect + do more.
1051--
1052-- Since 0.5.0
1053($$+) :: Monad m => Source m a -> Sink a m b -> m (SealedConduitT () a m (), b)
1054src $$+ sink = connectResume (sealConduitT src) sink
1055{-# INLINE ($$+) #-}
1056
1057-- | Continue processing after usage of @$$+@.
1058--
1059-- Since 0.5.0
1060($$++) :: Monad m => SealedConduitT () a m () -> Sink a m b -> m (SealedConduitT () a m (), b)
1061($$++) = connectResume
1062{-# INLINE ($$++) #-}
1063
1064-- | Same as @$$++@ and @connectResume@, but doesn't include the
1065-- updated @SealedConduitT@.
1066--
1067-- /NOTE/ In previous versions, this would cause finalizers to
1068-- run. Since version 1.3.0, there are no finalizers in conduit.
1069--
1070-- Since 0.5.0
1071($$+-) :: Monad m => SealedConduitT () a m () -> Sink a m b -> m b
1072rsrc $$+- sink = do
1073    (_, res) <- connectResume rsrc sink
1074    return res
1075{-# INLINE ($$+-) #-}
1076
1077-- | Left fusion for a sealed source.
1078--
1079-- Since 1.0.16
1080($=+) :: Monad m => SealedConduitT () a m () -> Conduit a m b -> SealedConduitT () b m ()
1081SealedConduitT src $=+ ConduitT sink = SealedConduitT (src `pipeL` sink Done)
1082
1083-- | Provide for a stream of data that can be flushed.
1084--
1085-- A number of @Conduit@s (e.g., zlib compression) need the ability to flush
1086-- the stream at some point. This provides a single wrapper datatype to be used
1087-- in all such circumstances.
1088--
1089-- Since 0.3.0
1090data Flush a = Chunk a | Flush
1091    deriving (Show, Eq, Ord)
1092instance Functor Flush where
1093    fmap _ Flush = Flush
1094    fmap f (Chunk a) = Chunk (f a)
1095
1096-- | A wrapper for defining an 'Applicative' instance for 'Source's which allows
1097-- to combine sources together, generalizing 'zipSources'. A combined source
1098-- will take input yielded from each of its @Source@s until any of them stop
1099-- producing output.
1100--
1101-- Since 1.0.13
1102newtype ZipSource m o = ZipSource { getZipSource :: Source m o }
1103
1104instance Monad m => Functor (ZipSource m) where
1105    fmap f = ZipSource . mapOutput f . getZipSource
1106instance Monad m => Applicative (ZipSource m) where
1107    pure  = ZipSource . forever . yield
1108    (ZipSource f) <*> (ZipSource x) = ZipSource $ zipSourcesApp f x
1109
1110-- | Coalesce all values yielded by all of the @Source@s.
1111--
1112-- Implemented on top of @ZipSource@ and as such, it exhibits the same
1113-- short-circuiting behavior as @ZipSource@. See that data type for more
1114-- details. If you want to create a source that yields *all* values from
1115-- multiple sources, use `sequence_`.
1116--
1117-- Since 1.0.13
1118sequenceSources :: (Traversable f, Monad m) => f (Source m o) -> Source m (f o)
1119sequenceSources = getZipSource . sequenceA . fmap ZipSource
1120
1121-- | A wrapper for defining an 'Applicative' instance for 'Sink's which allows
1122-- to combine sinks together, generalizing 'zipSinks'. A combined sink
1123-- distributes the input to all its participants and when all finish, produces
1124-- the result. This allows to define functions like
1125--
1126-- @
1127-- sequenceSinks :: (Monad m)
1128--           => [Sink i m r] -> Sink i m [r]
1129-- sequenceSinks = getZipSink . sequenceA . fmap ZipSink
1130-- @
1131--
1132-- Note that the standard 'Applicative' instance for conduits works
1133-- differently. It feeds one sink with input until it finishes, then switches
1134-- to another, etc., and at the end combines their results.
1135--
1136-- This newtype is in fact a type constrained version of 'ZipConduit', and has
1137-- the same behavior. It's presented as a separate type since (1) it
1138-- historically predates @ZipConduit@, and (2) the type constraining can make
1139-- your code clearer (and thereby make your error messages more easily
1140-- understood).
1141--
1142-- Since 1.0.13
1143newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r }
1144
1145instance Monad m => Functor (ZipSink i m) where
1146    fmap f (ZipSink x) = ZipSink (liftM f x)
1147instance Monad m => Applicative (ZipSink i m) where
1148    pure  = ZipSink . return
1149    (ZipSink f) <*> (ZipSink x) =
1150         ZipSink $ liftM (uncurry ($)) $ zipSinks f x
1151
1152-- | Send incoming values to all of the @Sink@ providing, and ultimately
1153-- coalesce together all return values.
1154--
1155-- Implemented on top of @ZipSink@, see that data type for more details.
1156--
1157-- Since 1.0.13
1158sequenceSinks :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r)
1159sequenceSinks = getZipSink . sequenceA . fmap ZipSink
1160
1161-- | The connect-and-resume operator. This does not close the @Conduit@, but
1162-- instead returns it to be used again. This allows a @Conduit@ to be used
1163-- incrementally in a large program, without forcing the entire program to live
1164-- in the @Sink@ monad.
1165--
1166-- Leftover data returned from the @Sink@ will be discarded.
1167--
1168-- Mnemonic: connect + do more.
1169--
1170-- Since 1.0.17
1171(=$$+) :: Monad m
1172       => ConduitT a b m ()
1173       -> ConduitT b Void m r
1174       -> ConduitT a Void m (SealedConduitT a b m (), r)
1175(=$$+) conduit = connectResumeConduit (sealConduitT conduit)
1176{-# INLINE (=$$+) #-}
1177
1178-- | Continue processing after usage of '=$$+'. Connect a 'SealedConduitT' to
1179-- a sink and return the output of the sink together with a new
1180-- 'SealedConduitT'.
1181--
1182-- Since 1.0.17
1183(=$$++) :: Monad m => SealedConduitT i o m () -> ConduitT o Void m r -> ConduitT i Void m (SealedConduitT i o m (), r)
1184(=$$++) = connectResumeConduit
1185{-# INLINE (=$$++) #-}
1186
1187-- | Same as @=$$++@, but doesn't include the updated
1188-- @SealedConduitT@.
1189--
1190-- /NOTE/ In previous versions, this would cause finalizers to
1191-- run. Since version 1.3.0, there are no finalizers in conduit.
1192--
1193-- Since 1.0.17
1194(=$$+-) :: Monad m => SealedConduitT i o m () -> ConduitT o Void m r -> ConduitT i Void m r
1195rsrc =$$+- sink = do
1196    (_, res) <- connectResumeConduit rsrc sink
1197    return res
1198{-# INLINE (=$$+-) #-}
1199
1200
1201infixr 0 =$$+
1202infixr 0 =$$++
1203infixr 0 =$$+-
1204
1205-- | Provides an alternative @Applicative@ instance for @ConduitT@. In this instance,
1206-- every incoming value is provided to all @ConduitT@s, and output is coalesced together.
1207-- Leftovers from individual @ConduitT@s will be used within that component, and then discarded
1208-- at the end of their computation. Output and finalizers will both be handled in a left-biased manner.
1209--
1210-- As an example, take the following program:
1211--
1212-- @
1213-- main :: IO ()
1214-- main = do
1215--     let src = mapM_ yield [1..3 :: Int]
1216--         conduit1 = CL.map (+1)
1217--         conduit2 = CL.concatMap (replicate 2)
1218--         conduit = getZipConduit $ ZipConduit conduit1 <* ZipConduit conduit2
1219--         sink = CL.mapM_ print
1220--     src $$ conduit =$ sink
1221-- @
1222--
1223-- It will produce the output: 2, 1, 1, 3, 2, 2, 4, 3, 3
1224--
1225-- Since 1.0.17
1226newtype ZipConduit i o m r = ZipConduit { getZipConduit :: ConduitT i o m r }
1227    deriving Functor
1228instance Monad m => Applicative (ZipConduit i o m) where
1229    pure = ZipConduit . pure
1230    ZipConduit left <*> ZipConduit right = ZipConduit (zipConduitApp left right)
1231
1232-- | Provide identical input to all of the @Conduit@s and combine their outputs
1233-- into a single stream.
1234--
1235-- Implemented on top of @ZipConduit@, see that data type for more details.
1236--
1237-- Since 1.0.17
1238sequenceConduits :: (Traversable f, Monad m) => f (ConduitT i o m r) -> ConduitT i o m (f r)
1239sequenceConduits = getZipConduit . sequenceA . fmap ZipConduit
1240
1241-- | Fuse two @ConduitT@s together, and provide the return value of both. Note
1242-- that this will force the entire upstream @ConduitT@ to be run to produce the
1243-- result value, even if the downstream terminates early.
1244--
1245-- Since 1.1.5
1246fuseBoth :: Monad m => ConduitT a b m r1 -> ConduitT b c m r2 -> ConduitT a c m (r1, r2)
1247fuseBoth (ConduitT up) (ConduitT down) =
1248    ConduitT (pipeL (up Done) (withUpstream $ generalizeUpstream $ down Done) >>=)
1249{-# INLINE fuseBoth #-}
1250
1251-- | Like 'fuseBoth', but does not force consumption of the @Producer@.
1252-- In the case that the @Producer@ terminates, the result value is
1253-- provided as a @Just@ value. If it does not terminate, then a
1254-- @Nothing@ value is returned.
1255--
1256-- One thing to note here is that "termination" here only occurs if the
1257-- @Producer@ actually yields a @Nothing@ value. For example, with the
1258-- @Producer@ @mapM_ yield [1..5]@, if five values are requested, the
1259-- @Producer@ has not yet terminated. Termination only occurs when the
1260-- sixth value is awaited for and the @Producer@ signals termination.
1261--
1262-- Since 1.2.4
1263fuseBothMaybe
1264    :: Monad m
1265    => ConduitT a b m r1
1266    -> ConduitT b c m r2
1267    -> ConduitT a c m (Maybe r1, r2)
1268fuseBothMaybe (ConduitT up) (ConduitT down) =
1269    ConduitT (pipeL (up Done) (go Nothing $ down Done) >>=)
1270  where
1271    go mup (Done r) = Done (mup, r)
1272    go mup (PipeM mp) = PipeM $ liftM (go mup) mp
1273    go mup (HaveOutput p o) = HaveOutput (go mup p) o
1274    go _ (NeedInput p c) = NeedInput
1275        (\i -> go Nothing (p i))
1276        (\u -> go (Just u) (c ()))
1277    go mup (Leftover p i) = Leftover (go mup p) i
1278{-# INLINABLE fuseBothMaybe #-}
1279
1280-- | Same as @fuseBoth@, but ignore the return value from the downstream
1281-- @Conduit@. Same caveats of forced consumption apply.
1282--
1283-- Since 1.1.5
1284fuseUpstream :: Monad m => ConduitT a b m r -> Conduit b m c -> ConduitT a c m r
1285fuseUpstream up down = fmap fst (fuseBoth up down)
1286{-# INLINE fuseUpstream #-}
1287
1288-- Rewrite rules
1289
1290{- FIXME
1291{-# RULES "conduit: ConduitT: lift x >>= f" forall m f. lift m >>= f = ConduitT (PipeM (liftM (unConduitT . f) m)) #-}
1292{-# RULES "conduit: ConduitT: lift x >> f" forall m f. lift m >> f = ConduitT (PipeM (liftM (\_ -> unConduitT f) m)) #-}
1293
1294{-# RULES "conduit: ConduitT: liftIO x >>= f" forall m (f :: MonadIO m => a -> ConduitT i o m r). liftIO m >>= f = ConduitT (PipeM (liftM (unConduitT . f) (liftIO m))) #-}
1295{-# RULES "conduit: ConduitT: liftIO x >> f" forall m (f :: MonadIO m => ConduitT i o m r). liftIO m >> f = ConduitT (PipeM (liftM (\_ -> unConduitT f) (liftIO m))) #-}
1296
1297{-# RULES "conduit: ConduitT: liftBase x >>= f" forall m (f :: MonadBase b m => a -> ConduitT i o m r). liftBase m >>= f = ConduitT (PipeM (liftM (unConduitT . f) (liftBase m))) #-}
1298{-# RULES "conduit: ConduitT: liftBase x >> f" forall m (f :: MonadBase b m => ConduitT i o m r). liftBase m >> f = ConduitT (PipeM (liftM (\_ -> unConduitT f) (liftBase m))) #-}
1299
1300{-# RULES
1301    "yield o >> p" forall o (p :: ConduitT i o m r). yield o >> p = ConduitT (HaveOutput (unConduitT p) o)
1302  ; "when yield next" forall b o p. when b (yield o) >> p =
1303        if b then ConduitT (HaveOutput (unConduitT p) o) else p
1304  ; "unless yield next" forall b o p. unless b (yield o) >> p =
1305        if b then p else ConduitT (HaveOutput (unConduitT p) o)
1306  ; "lift m >>= yield" forall m. lift m >>= yield = yieldM m
1307   #-}
1308{-# RULES "conduit: leftover l >> p" forall l (p :: ConduitT i o m r). leftover l >> p =
1309    ConduitT (Leftover (unConduitT p) l) #-}
1310    -}
1311
1312-- | Run a pure pipeline until processing completes, i.e. a pipeline
1313-- with @Identity@ as the base monad. This is equivalient to
1314-- @runIdentity . runConduit@.
1315--
1316-- @since 1.2.8
1317runConduitPure :: ConduitT () Void Identity r -> r
1318runConduitPure = runIdentity . runConduit
1319{-# INLINE runConduitPure #-}
1320
1321-- | Run a pipeline which acquires resources with @ResourceT@, and
1322-- then run the @ResourceT@ transformer. This is equivalent to
1323-- @runResourceT . runConduit@.
1324--
1325-- @since 1.2.8
1326runConduitRes :: MonadUnliftIO m
1327              => ConduitT () Void (ResourceT m) r
1328              -> m r
1329runConduitRes = runResourceT . runConduit
1330{-# INLINE runConduitRes #-}
1331