1{-# OPTIONS_HADDOCK not-home #-}
2{-# LANGUAGE FlexibleInstances #-}
3{-# LANGUAGE FlexibleContexts #-}
4{-# LANGUAGE CPP #-}
5{-# LANGUAGE MultiParamTypeClasses #-}
6{-# LANGUAGE UndecidableInstances #-}
7{-# LANGUAGE RankNTypes #-}
8{-# LANGUAGE TupleSections #-}
9{-# LANGUAGE Trustworthy #-}
10{-# LANGUAGE TypeFamilies #-}
11module Data.Conduit.Internal.Pipe
12    ( -- ** Types
13      Pipe (..)
14      -- ** Primitives
15    , await
16    , awaitE
17    , awaitForever
18    , yield
19    , yieldM
20    , leftover
21      -- ** Finalization
22    , bracketP
23      -- ** Composition
24    , idP
25    , pipe
26    , pipeL
27    , runPipe
28    , injectLeftovers
29    , (>+>)
30    , (<+<)
31      -- ** Exceptions
32    , catchP
33    , handleP
34    , tryP
35      -- ** Utilities
36    , transPipe
37    , mapOutput
38    , mapOutputMaybe
39    , mapInput
40    , sourceList
41    , withUpstream
42    , Data.Conduit.Internal.Pipe.enumFromTo
43    , generalizeUpstream
44    ) where
45
46import Control.Applicative (Applicative (..))
47import Control.Monad ((>=>), liftM, ap)
48import Control.Monad.Error.Class(MonadError(..))
49import Control.Monad.Reader.Class(MonadReader(..))
50import Control.Monad.RWS.Class(MonadRWS())
51import Control.Monad.Writer.Class(MonadWriter(..))
52import Control.Monad.State.Class(MonadState(..))
53import Control.Monad.Trans.Class (MonadTrans (lift))
54import Control.Monad.IO.Unlift (MonadIO (liftIO), MonadUnliftIO, withRunInIO)
55import Control.Monad.Primitive (PrimMonad, PrimState, primitive)
56import Data.Void (Void, absurd)
57import Data.Monoid (Monoid (mappend, mempty))
58import Data.Semigroup (Semigroup ((<>)))
59import Control.Monad.Trans.Resource
60import qualified GHC.Exts
61import qualified Control.Exception as E
62
63-- | The underlying datatype for all the types in this package.  In has six
64-- type parameters:
65--
66-- * /l/ is the type of values that may be left over from this @Pipe@. A @Pipe@
67-- with no leftovers would use @Void@ here, and one with leftovers would use
68-- the same type as the /i/ parameter. Leftovers are automatically provided to
69-- the next @Pipe@ in the monadic chain.
70--
71-- * /i/ is the type of values for this @Pipe@'s input stream.
72--
73-- * /o/ is the type of values for this @Pipe@'s output stream.
74--
75-- * /u/ is the result type from the upstream @Pipe@.
76--
77-- * /m/ is the underlying monad.
78--
79-- * /r/ is the result type.
80--
81-- A basic intuition is that every @Pipe@ produces a stream of output values
82-- (/o/), and eventually indicates that this stream is terminated by sending a
83-- result (/r/). On the receiving end of a @Pipe@, these become the /i/ and /u/
84-- parameters.
85--
86-- Since 0.5.0
87data Pipe l i o u m r =
88    -- | Provide new output to be sent downstream. This constructor has two
89    -- fields: the next @Pipe@ to be used and the output value.
90    HaveOutput (Pipe l i o u m r) o
91    -- | Request more input from upstream. The first field takes a new input
92    -- value and provides a new @Pipe@. The second takes an upstream result
93    -- value, which indicates that upstream is producing no more results.
94  | NeedInput (i -> Pipe l i o u m r) (u -> Pipe l i o u m r)
95    -- | Processing with this @Pipe@ is complete, providing the final result.
96  | Done r
97    -- | Require running of a monadic action to get the next @Pipe@.
98  | PipeM (m (Pipe l i o u m r))
99    -- | Return leftover input, which should be provided to future operations.
100  | Leftover (Pipe l i o u m r) l
101
102instance Monad m => Functor (Pipe l i o u m) where
103    fmap = liftM
104    {-# INLINE fmap #-}
105
106instance Monad m => Applicative (Pipe l i o u m) where
107    pure = Done
108    {-# INLINE pure #-}
109    (<*>) = ap
110    {-# INLINE (<*>) #-}
111
112instance Monad m => Monad (Pipe l i o u m) where
113    return = pure
114    {-# INLINE return #-}
115
116    HaveOutput p o   >>= fp = HaveOutput (p >>= fp)            o
117    NeedInput p c    >>= fp = NeedInput  (p >=> fp)            (c >=> fp)
118    Done x           >>= fp = fp x
119    PipeM mp         >>= fp = PipeM      ((>>= fp) `liftM` mp)
120    Leftover p i     >>= fp = Leftover   (p >>= fp)            i
121
122instance MonadTrans (Pipe l i o u) where
123    lift mr = PipeM (Done `liftM` mr)
124    {-# INLINE [1] lift #-}
125
126instance MonadIO m => MonadIO (Pipe l i o u m) where
127    liftIO = lift . liftIO
128    {-# INLINE liftIO #-}
129
130instance MonadThrow m => MonadThrow (Pipe l i o u m) where
131    throwM = lift . throwM
132    {-# INLINE throwM #-}
133
134
135instance Monad m => Semigroup (Pipe l i o u m ()) where
136    (<>) = (>>)
137    {-# INLINE (<>) #-}
138
139instance Monad m => Monoid (Pipe l i o u m ()) where
140    mempty = return ()
141    {-# INLINE mempty #-}
142#if !(MIN_VERSION_base(4,11,0))
143    mappend = (<>)
144    {-# INLINE mappend #-}
145#endif
146
147instance PrimMonad m => PrimMonad (Pipe l i o u m) where
148  type PrimState (Pipe l i o u m) = PrimState m
149  primitive = lift . primitive
150
151instance MonadResource m => MonadResource (Pipe l i o u m) where
152    liftResourceT = lift . liftResourceT
153    {-# INLINE liftResourceT #-}
154
155instance MonadReader r m => MonadReader r (Pipe l i o u m) where
156    ask = lift ask
157    {-# INLINE ask #-}
158    local f (HaveOutput p o) = HaveOutput (local f p) o
159    local f (NeedInput p c) = NeedInput (\i -> local f (p i)) (\u -> local f (c u))
160    local _ (Done x) = Done x
161    local f (PipeM mp) = PipeM (liftM (local f) $ local f mp)
162    local f (Leftover p i) = Leftover (local f p) i
163
164-- Provided for doctest
165#ifndef MIN_VERSION_mtl
166#define MIN_VERSION_mtl(x, y, z) 0
167#endif
168
169instance MonadWriter w m => MonadWriter w (Pipe l i o u m) where
170#if MIN_VERSION_mtl(2, 1, 0)
171    writer = lift . writer
172#endif
173
174    tell = lift . tell
175
176    listen (HaveOutput p o) = HaveOutput (listen p) o
177    listen (NeedInput p c) = NeedInput (\i -> listen (p i)) (\u -> listen (c u))
178    listen (Done x) = Done (x,mempty)
179    listen (PipeM mp) =
180      PipeM $
181      do (p,w) <- listen mp
182         return $ do (x,w') <- listen p
183                     return (x, w `mappend` w')
184    listen (Leftover p i) = Leftover (listen p) i
185
186    pass (HaveOutput p o) = HaveOutput (pass p) o
187    pass (NeedInput p c) = NeedInput (\i -> pass (p i)) (\u -> pass (c u))
188    pass (PipeM mp) = PipeM $ mp >>= (return . pass)
189    pass (Done (x,_)) = Done x
190    pass (Leftover p i) = Leftover (pass p) i
191
192instance MonadState s m => MonadState s (Pipe l i o u m) where
193    get = lift get
194    put = lift . put
195#if MIN_VERSION_mtl(2, 1, 0)
196    state = lift . state
197#endif
198
199instance MonadRWS r w s m => MonadRWS r w s (Pipe l i o u m)
200
201instance MonadError e m => MonadError e (Pipe l i o u m) where
202    throwError = lift . throwError
203    catchError (HaveOutput p o) f = HaveOutput (catchError p f) o
204    catchError (NeedInput p c) f = NeedInput (\i -> catchError (p i) f) (\u -> catchError (c u) f)
205    catchError (Done x) _ = Done x
206    catchError (PipeM mp) f =
207      PipeM $ catchError (liftM (flip catchError f) mp) (\e -> return (f e))
208    catchError (Leftover p i) f = Leftover (catchError p f) i
209
210-- | Wait for a single input value from upstream.
211--
212-- Since 0.5.0
213await :: Pipe l i o u m (Maybe i)
214await = NeedInput (Done . Just) (\_ -> Done Nothing)
215{-# RULES "conduit: CI.await >>= maybe" forall x y. await >>= maybe x y = NeedInput y (const x) #-}
216{-# INLINE [1] await #-}
217
218-- | This is similar to @await@, but will return the upstream result value as
219-- @Left@ if available.
220--
221-- Since 0.5.0
222awaitE :: Pipe l i o u m (Either u i)
223awaitE = NeedInput (Done . Right) (Done . Left)
224{-# RULES "conduit: awaitE >>= either" forall x y. awaitE >>= either x y = NeedInput y x #-}
225{-# INLINE [1] awaitE #-}
226
227-- | Wait for input forever, calling the given inner @Pipe@ for each piece of
228-- new input. Returns the upstream result type.
229--
230-- Since 0.5.0
231awaitForever :: Monad m => (i -> Pipe l i o r m r') -> Pipe l i o r m r
232awaitForever inner =
233    self
234  where
235    self = awaitE >>= either return (\i -> inner i >> self)
236{-# INLINE [1] awaitForever #-}
237
238-- | Send a single output value downstream. If the downstream @Pipe@
239-- terminates, this @Pipe@ will terminate as well.
240--
241-- Since 0.5.0
242yield :: Monad m
243      => o -- ^ output value
244      -> Pipe l i o u m ()
245yield = HaveOutput (Done ())
246{-# INLINE [1] yield #-}
247
248yieldM :: Monad m => m o -> Pipe l i o u m ()
249yieldM = PipeM . liftM (HaveOutput (Done ()))
250{-# INLINE [1] yieldM #-}
251
252{-# RULES
253    "CI.yield o >> p" forall o (p :: Pipe l i o u m r). yield o >> p = HaveOutput p o
254  #-}
255
256  -- Rule does not fire due to inlining of lift
257  -- ; "lift m >>= CI.yield" forall m. lift m >>= yield = yieldM m
258
259  -- FIXME: Too much inlining on mapM_, can't enforce; "mapM_ CI.yield" mapM_ yield = sourceList
260  -- Maybe we can get a rewrite rule on foldr instead? Need a benchmark to back this up.
261
262-- | Provide a single piece of leftover input to be consumed by the next pipe
263-- in the current monadic binding.
264--
265-- /Note/: it is highly encouraged to only return leftover values from input
266-- already consumed from upstream.
267--
268-- Since 0.5.0
269leftover :: l -> Pipe l i o u m ()
270leftover = Leftover (Done ())
271{-# INLINE [1] leftover #-}
272{-# RULES "conduit: leftover l >> p" forall l (p :: Pipe l i o u m r). leftover l >> p = Leftover p l #-}
273
274-- | Bracket a pipe computation between allocation and release of a resource.
275-- We guarantee, via the @MonadResource@ context, that the resource
276-- finalization is exception safe. However, it will not necessarily be
277-- /prompt/, in that running a finalizer may wait until the @ResourceT@ block
278-- exits.
279--
280-- Since 0.5.0
281bracketP :: MonadResource m
282         => IO a
283            -- ^ computation to run first (\"acquire resource\")
284         -> (a -> IO ())
285            -- ^ computation to run last (\"release resource\")
286         -> (a -> Pipe l i o u m r)
287            -- ^ computation to run in-between
288         -> Pipe l i o u m r
289            -- returns the value from the in-between computation
290bracketP alloc free inside = do
291  (key, seed) <- allocate alloc free
292  res <- inside seed
293  release key
294  return res
295
296-- | The identity @Pipe@.
297--
298-- Since 0.5.0
299idP :: Monad m => Pipe l a a r m r
300idP = NeedInput (HaveOutput idP) Done
301
302-- | Compose a left and right pipe together into a complete pipe.
303--
304-- Since 0.5.0
305pipe :: Monad m => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2
306pipe =
307    goRight
308  where
309    goRight left right =
310        case right of
311            HaveOutput p o   -> HaveOutput (recurse p) o
312            NeedInput rp rc  -> goLeft rp rc left
313            Done r2          -> Done r2
314            PipeM mp         -> PipeM (liftM recurse mp)
315            Leftover _ i     -> absurd i
316      where
317        recurse = goRight left
318
319    goLeft rp rc left =
320        case left of
321            HaveOutput left' o        -> goRight left' (rp o)
322            NeedInput left' lc        -> NeedInput (recurse . left') (recurse . lc)
323            Done r1                   -> goRight (Done r1) (rc r1)
324            PipeM mp                  -> PipeM (liftM recurse mp)
325            Leftover left' i          -> Leftover (recurse left') i
326      where
327        recurse = goLeft rp rc
328
329-- | Same as 'pipe', but automatically applies 'injectLeftovers' to the right @Pipe@.
330--
331-- Since 0.5.0
332pipeL :: Monad m => Pipe l a b r0 m r1 -> Pipe b b c r1 m r2 -> Pipe l a c r0 m r2
333-- Note: The following should be equivalent to the simpler:
334--
335--     pipeL l r = l `pipe` injectLeftovers r
336--
337-- However, this version tested as being significantly more efficient.
338pipeL =
339    goRight
340  where
341    goRight left right =
342        case right of
343            HaveOutput p o    -> HaveOutput (recurse p) o
344            NeedInput rp rc   -> goLeft rp rc left
345            Done r2           -> Done r2
346            PipeM mp          -> PipeM (liftM recurse mp)
347            Leftover right' i -> goRight (HaveOutput left i) right'
348      where
349        recurse = goRight left
350
351    goLeft rp rc left =
352        case left of
353            HaveOutput left' o        -> goRight left' (rp o)
354            NeedInput left' lc        -> NeedInput (recurse . left') (recurse . lc)
355            Done r1                   -> goRight (Done r1) (rc r1)
356            PipeM mp                  -> PipeM (liftM recurse mp)
357            Leftover left' i          -> Leftover (recurse left') i
358      where
359        recurse = goLeft rp rc
360
361-- | Run a pipeline until processing completes.
362--
363-- Since 0.5.0
364runPipe :: Monad m => Pipe Void () Void () m r -> m r
365runPipe (HaveOutput _ o) = absurd o
366runPipe (NeedInput _ c) = runPipe (c ())
367runPipe (Done r) = return r
368runPipe (PipeM mp) = mp >>= runPipe
369runPipe (Leftover _ i) = absurd i
370
371-- | Transforms a @Pipe@ that provides leftovers to one which does not,
372-- allowing it to be composed.
373--
374-- This function will provide any leftover values within this @Pipe@ to any
375-- calls to @await@. If there are more leftover values than are demanded, the
376-- remainder are discarded.
377--
378-- Since 0.5.0
379injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r
380injectLeftovers =
381    go []
382  where
383    go ls (HaveOutput p o) = HaveOutput (go ls p) o
384    go (l:ls) (NeedInput p _) = go ls $ p l
385    go [] (NeedInput p c) = NeedInput (go [] . p) (go [] . c)
386    go _ (Done r) = Done r
387    go ls (PipeM mp) = PipeM (liftM (go ls) mp)
388    go ls (Leftover p l) = go (l:ls) p
389
390-- | Transform the monad that a @Pipe@ lives in.
391--
392-- Note that the monad transforming function will be run multiple times,
393-- resulting in unintuitive behavior in some cases. For a fuller treatment,
394-- please see:
395--
396-- <https://github.com/snoyberg/conduit/wiki/Dealing-with-monad-transformers>
397--
398-- This function is just a synonym for 'hoist'.
399--
400-- Since 0.4.0
401transPipe :: Monad m => (forall a. m a -> n a) -> Pipe l i o u m r -> Pipe l i o u n r
402transPipe f (HaveOutput p o) = HaveOutput (transPipe f p) o
403transPipe f (NeedInput p c) = NeedInput (transPipe f . p) (transPipe f . c)
404transPipe _ (Done r) = Done r
405transPipe f (PipeM mp) =
406    PipeM (f $ liftM (transPipe f) $ collapse mp)
407  where
408    -- Combine a series of monadic actions into a single action.  Since we
409    -- throw away side effects between different actions, an arbitrary break
410    -- between actions will lead to a violation of the monad transformer laws.
411    -- Example available at:
412    --
413    -- http://hpaste.org/75520
414    collapse mpipe = do
415        pipe' <- mpipe
416        case pipe' of
417            PipeM mpipe' -> collapse mpipe'
418            _ -> return pipe'
419transPipe f (Leftover p i) = Leftover (transPipe f p) i
420
421-- | Apply a function to all the output values of a @Pipe@.
422--
423-- This mimics the behavior of `fmap` for a `Source` and `Conduit` in pre-0.4
424-- days.
425--
426-- Since 0.4.1
427mapOutput :: Monad m => (o1 -> o2) -> Pipe l i o1 u m r -> Pipe l i o2 u m r
428mapOutput f =
429    go
430  where
431    go (HaveOutput p o) = HaveOutput (go p) (f o)
432    go (NeedInput p c) = NeedInput (go . p) (go . c)
433    go (Done r) = Done r
434    go (PipeM mp) = PipeM (liftM (go) mp)
435    go (Leftover p i) = Leftover (go p) i
436{-# INLINE mapOutput #-}
437
438-- | Same as 'mapOutput', but use a function that returns @Maybe@ values.
439--
440-- Since 0.5.0
441mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> Pipe l i o1 u m r -> Pipe l i o2 u m r
442mapOutputMaybe f =
443    go
444  where
445    go (HaveOutput p o) = maybe id (\o' p' -> HaveOutput p' o') (f o) (go p)
446    go (NeedInput p c) = NeedInput (go . p) (go . c)
447    go (Done r) = Done r
448    go (PipeM mp) = PipeM (liftM (go) mp)
449    go (Leftover p i) = Leftover (go p) i
450{-# INLINE mapOutputMaybe #-}
451
452-- | Apply a function to all the input values of a @Pipe@.
453--
454-- Since 0.5.0
455mapInput :: Monad m
456         => (i1 -> i2) -- ^ map initial input to new input
457         -> (l2 -> Maybe l1) -- ^ map new leftovers to initial leftovers
458         -> Pipe l2 i2 o u m r
459         -> Pipe l1 i1 o u m r
460mapInput f f' (HaveOutput p o) = HaveOutput (mapInput f f' p) o
461mapInput f f' (NeedInput p c)    = NeedInput (mapInput f f' . p . f) (mapInput f f' . c)
462mapInput _ _  (Done r)           = Done r
463mapInput f f' (PipeM mp)         = PipeM (liftM (mapInput f f') mp)
464mapInput f f' (Leftover p i)     = maybe id (flip Leftover) (f' i) $ mapInput f f' p
465
466enumFromTo :: (Enum o, Eq o, Monad m)
467           => o
468           -> o
469           -> Pipe l i o u m ()
470enumFromTo start stop =
471    loop start
472  where
473    loop i
474        | i == stop = HaveOutput (Done ()) i
475        | otherwise = HaveOutput (loop (succ i)) i
476{-# INLINE enumFromTo #-}
477
478-- | Convert a list into a source.
479--
480-- Since 0.3.0
481sourceList :: Monad m => [a] -> Pipe l i a u m ()
482sourceList =
483    go
484  where
485    go [] = Done ()
486    go (o:os) = HaveOutput (go os) o
487{-# INLINE [1] sourceList #-}
488
489-- | The equivalent of @GHC.Exts.build@ for @Pipe@.
490--
491-- Since 0.4.2
492build :: Monad m => (forall b. (o -> b -> b) -> b -> b) -> Pipe l i o u m ()
493build g = g (\o p -> HaveOutput p o) (return ())
494
495{-# RULES
496    "sourceList/build" forall (f :: (forall b. (a -> b -> b) -> b -> b)). sourceList (GHC.Exts.build f) = build f #-}
497
498-- | Returns a tuple of the upstream and downstream results. Note that this
499-- will force consumption of the entire input stream.
500--
501-- Since 0.5.0
502withUpstream :: Monad m
503             => Pipe l i o u m r
504             -> Pipe l i o u m (u, r)
505withUpstream down =
506    down >>= go
507  where
508    go r =
509        loop
510      where
511        loop = awaitE >>= either (\u -> return (u, r)) (\_ -> loop)
512
513infixr 9 <+<
514infixl 9 >+>
515
516-- | Fuse together two @Pipe@s, connecting the output from the left to the
517-- input of the right.
518--
519-- Notice that the /leftover/ parameter for the @Pipe@s must be @Void@. This
520-- ensures that there is no accidental data loss of leftovers during fusion. If
521-- you have a @Pipe@ with leftovers, you must first call 'injectLeftovers'.
522--
523-- Since 0.5.0
524(>+>) :: Monad m => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2
525(>+>) = pipe
526{-# INLINE (>+>) #-}
527
528-- | Same as '>+>', but reverse the order of the arguments.
529--
530-- Since 0.5.0
531(<+<) :: Monad m => Pipe Void b c r1 m r2 -> Pipe l a b r0 m r1 -> Pipe l a c r0 m r2
532(<+<) = flip pipe
533{-# INLINE (<+<) #-}
534
535-- | See 'catchC' for more details.
536--
537-- Since 1.0.11
538catchP :: (MonadUnliftIO m, E.Exception e)
539       => Pipe l i o u m r
540       -> (e -> Pipe l i o u m r)
541       -> Pipe l i o u m r
542catchP p0 onErr =
543    go p0
544  where
545    go (Done r) = Done r
546    go (PipeM mp) = PipeM $ withRunInIO $ \run ->
547      E.catch (run (liftM go mp)) (return . onErr)
548    go (Leftover p i) = Leftover (go p) i
549    go (NeedInput x y) = NeedInput (go . x) (go . y)
550    go (HaveOutput p o) = HaveOutput (go p) o
551{-# INLINABLE catchP #-}
552
553-- | The same as @flip catchP@.
554--
555-- Since 1.0.11
556handleP :: (MonadUnliftIO m, E.Exception e)
557        => (e -> Pipe l i o u m r)
558        -> Pipe l i o u m r
559        -> Pipe l i o u m r
560handleP = flip catchP
561{-# INLINE handleP #-}
562
563-- | See 'tryC' for more details.
564--
565-- Since 1.0.11
566tryP :: (MonadUnliftIO m, E.Exception e)
567     => Pipe l i o u m r
568     -> Pipe l i o u m (Either e r)
569tryP p = (fmap Right p) `catchP` (return . Left)
570{-# INLINABLE tryP #-}
571
572-- | Generalize the upstream return value for a @Pipe@ from unit to any type.
573--
574-- Since 1.1.5
575generalizeUpstream :: Monad m => Pipe l i o () m r -> Pipe l i o u m r
576generalizeUpstream =
577    go
578  where
579    go (HaveOutput p o) = HaveOutput (go p) o
580    go (NeedInput x y) = NeedInput (go . x) (\_ -> go (y ()))
581    go (Done r) = Done r
582    go (PipeM mp) = PipeM (liftM go mp)
583    go (Leftover p l) = Leftover (go p) l
584{-# INLINE generalizeUpstream #-}
585
586{- Rules don't fire due to inlining of lift
587{-# RULES "conduit: Pipe: lift x >>= f" forall m f. lift m >>= f = PipeM (liftM f m) #-}
588{-# RULES "conduit: Pipe: lift x >> f" forall m f. lift m >> f = PipeM (liftM (\_ -> f) m) #-}
589-}
590