1{-# OPTIONS_HADDOCK not-home #-} 2{-# LANGUAGE FlexibleInstances #-} 3{-# LANGUAGE FlexibleContexts #-} 4{-# LANGUAGE CPP #-} 5{-# LANGUAGE MultiParamTypeClasses #-} 6{-# LANGUAGE UndecidableInstances #-} 7{-# LANGUAGE RankNTypes #-} 8{-# LANGUAGE TupleSections #-} 9{-# LANGUAGE Trustworthy #-} 10{-# LANGUAGE TypeFamilies #-} 11module Data.Conduit.Internal.Pipe 12 ( -- ** Types 13 Pipe (..) 14 -- ** Primitives 15 , await 16 , awaitE 17 , awaitForever 18 , yield 19 , yieldM 20 , leftover 21 -- ** Finalization 22 , bracketP 23 -- ** Composition 24 , idP 25 , pipe 26 , pipeL 27 , runPipe 28 , injectLeftovers 29 , (>+>) 30 , (<+<) 31 -- ** Exceptions 32 , catchP 33 , handleP 34 , tryP 35 -- ** Utilities 36 , transPipe 37 , mapOutput 38 , mapOutputMaybe 39 , mapInput 40 , sourceList 41 , withUpstream 42 , Data.Conduit.Internal.Pipe.enumFromTo 43 , generalizeUpstream 44 ) where 45 46import Control.Applicative (Applicative (..)) 47import Control.Monad ((>=>), liftM, ap) 48import Control.Monad.Error.Class(MonadError(..)) 49import Control.Monad.Reader.Class(MonadReader(..)) 50import Control.Monad.RWS.Class(MonadRWS()) 51import Control.Monad.Writer.Class(MonadWriter(..)) 52import Control.Monad.State.Class(MonadState(..)) 53import Control.Monad.Trans.Class (MonadTrans (lift)) 54import Control.Monad.IO.Unlift (MonadIO (liftIO), MonadUnliftIO, withRunInIO) 55import Control.Monad.Primitive (PrimMonad, PrimState, primitive) 56import Data.Void (Void, absurd) 57import Data.Monoid (Monoid (mappend, mempty)) 58import Data.Semigroup (Semigroup ((<>))) 59import Control.Monad.Trans.Resource 60import qualified GHC.Exts 61import qualified Control.Exception as E 62 63-- | The underlying datatype for all the types in this package. In has six 64-- type parameters: 65-- 66-- * /l/ is the type of values that may be left over from this @Pipe@. A @Pipe@ 67-- with no leftovers would use @Void@ here, and one with leftovers would use 68-- the same type as the /i/ parameter. Leftovers are automatically provided to 69-- the next @Pipe@ in the monadic chain. 70-- 71-- * /i/ is the type of values for this @Pipe@'s input stream. 72-- 73-- * /o/ is the type of values for this @Pipe@'s output stream. 74-- 75-- * /u/ is the result type from the upstream @Pipe@. 76-- 77-- * /m/ is the underlying monad. 78-- 79-- * /r/ is the result type. 80-- 81-- A basic intuition is that every @Pipe@ produces a stream of output values 82-- (/o/), and eventually indicates that this stream is terminated by sending a 83-- result (/r/). On the receiving end of a @Pipe@, these become the /i/ and /u/ 84-- parameters. 85-- 86-- Since 0.5.0 87data Pipe l i o u m r = 88 -- | Provide new output to be sent downstream. This constructor has two 89 -- fields: the next @Pipe@ to be used and the output value. 90 HaveOutput (Pipe l i o u m r) o 91 -- | Request more input from upstream. The first field takes a new input 92 -- value and provides a new @Pipe@. The second takes an upstream result 93 -- value, which indicates that upstream is producing no more results. 94 | NeedInput (i -> Pipe l i o u m r) (u -> Pipe l i o u m r) 95 -- | Processing with this @Pipe@ is complete, providing the final result. 96 | Done r 97 -- | Require running of a monadic action to get the next @Pipe@. 98 | PipeM (m (Pipe l i o u m r)) 99 -- | Return leftover input, which should be provided to future operations. 100 | Leftover (Pipe l i o u m r) l 101 102instance Monad m => Functor (Pipe l i o u m) where 103 fmap = liftM 104 {-# INLINE fmap #-} 105 106instance Monad m => Applicative (Pipe l i o u m) where 107 pure = Done 108 {-# INLINE pure #-} 109 (<*>) = ap 110 {-# INLINE (<*>) #-} 111 112instance Monad m => Monad (Pipe l i o u m) where 113 return = pure 114 {-# INLINE return #-} 115 116 HaveOutput p o >>= fp = HaveOutput (p >>= fp) o 117 NeedInput p c >>= fp = NeedInput (p >=> fp) (c >=> fp) 118 Done x >>= fp = fp x 119 PipeM mp >>= fp = PipeM ((>>= fp) `liftM` mp) 120 Leftover p i >>= fp = Leftover (p >>= fp) i 121 122instance MonadTrans (Pipe l i o u) where 123 lift mr = PipeM (Done `liftM` mr) 124 {-# INLINE [1] lift #-} 125 126instance MonadIO m => MonadIO (Pipe l i o u m) where 127 liftIO = lift . liftIO 128 {-# INLINE liftIO #-} 129 130instance MonadThrow m => MonadThrow (Pipe l i o u m) where 131 throwM = lift . throwM 132 {-# INLINE throwM #-} 133 134 135instance Monad m => Semigroup (Pipe l i o u m ()) where 136 (<>) = (>>) 137 {-# INLINE (<>) #-} 138 139instance Monad m => Monoid (Pipe l i o u m ()) where 140 mempty = return () 141 {-# INLINE mempty #-} 142#if !(MIN_VERSION_base(4,11,0)) 143 mappend = (<>) 144 {-# INLINE mappend #-} 145#endif 146 147instance PrimMonad m => PrimMonad (Pipe l i o u m) where 148 type PrimState (Pipe l i o u m) = PrimState m 149 primitive = lift . primitive 150 151instance MonadResource m => MonadResource (Pipe l i o u m) where 152 liftResourceT = lift . liftResourceT 153 {-# INLINE liftResourceT #-} 154 155instance MonadReader r m => MonadReader r (Pipe l i o u m) where 156 ask = lift ask 157 {-# INLINE ask #-} 158 local f (HaveOutput p o) = HaveOutput (local f p) o 159 local f (NeedInput p c) = NeedInput (\i -> local f (p i)) (\u -> local f (c u)) 160 local _ (Done x) = Done x 161 local f (PipeM mp) = PipeM (liftM (local f) $ local f mp) 162 local f (Leftover p i) = Leftover (local f p) i 163 164-- Provided for doctest 165#ifndef MIN_VERSION_mtl 166#define MIN_VERSION_mtl(x, y, z) 0 167#endif 168 169instance MonadWriter w m => MonadWriter w (Pipe l i o u m) where 170#if MIN_VERSION_mtl(2, 1, 0) 171 writer = lift . writer 172#endif 173 174 tell = lift . tell 175 176 listen (HaveOutput p o) = HaveOutput (listen p) o 177 listen (NeedInput p c) = NeedInput (\i -> listen (p i)) (\u -> listen (c u)) 178 listen (Done x) = Done (x,mempty) 179 listen (PipeM mp) = 180 PipeM $ 181 do (p,w) <- listen mp 182 return $ do (x,w') <- listen p 183 return (x, w `mappend` w') 184 listen (Leftover p i) = Leftover (listen p) i 185 186 pass (HaveOutput p o) = HaveOutput (pass p) o 187 pass (NeedInput p c) = NeedInput (\i -> pass (p i)) (\u -> pass (c u)) 188 pass (PipeM mp) = PipeM $ mp >>= (return . pass) 189 pass (Done (x,_)) = Done x 190 pass (Leftover p i) = Leftover (pass p) i 191 192instance MonadState s m => MonadState s (Pipe l i o u m) where 193 get = lift get 194 put = lift . put 195#if MIN_VERSION_mtl(2, 1, 0) 196 state = lift . state 197#endif 198 199instance MonadRWS r w s m => MonadRWS r w s (Pipe l i o u m) 200 201instance MonadError e m => MonadError e (Pipe l i o u m) where 202 throwError = lift . throwError 203 catchError (HaveOutput p o) f = HaveOutput (catchError p f) o 204 catchError (NeedInput p c) f = NeedInput (\i -> catchError (p i) f) (\u -> catchError (c u) f) 205 catchError (Done x) _ = Done x 206 catchError (PipeM mp) f = 207 PipeM $ catchError (liftM (flip catchError f) mp) (\e -> return (f e)) 208 catchError (Leftover p i) f = Leftover (catchError p f) i 209 210-- | Wait for a single input value from upstream. 211-- 212-- Since 0.5.0 213await :: Pipe l i o u m (Maybe i) 214await = NeedInput (Done . Just) (\_ -> Done Nothing) 215{-# RULES "conduit: CI.await >>= maybe" forall x y. await >>= maybe x y = NeedInput y (const x) #-} 216{-# INLINE [1] await #-} 217 218-- | This is similar to @await@, but will return the upstream result value as 219-- @Left@ if available. 220-- 221-- Since 0.5.0 222awaitE :: Pipe l i o u m (Either u i) 223awaitE = NeedInput (Done . Right) (Done . Left) 224{-# RULES "conduit: awaitE >>= either" forall x y. awaitE >>= either x y = NeedInput y x #-} 225{-# INLINE [1] awaitE #-} 226 227-- | Wait for input forever, calling the given inner @Pipe@ for each piece of 228-- new input. Returns the upstream result type. 229-- 230-- Since 0.5.0 231awaitForever :: Monad m => (i -> Pipe l i o r m r') -> Pipe l i o r m r 232awaitForever inner = 233 self 234 where 235 self = awaitE >>= either return (\i -> inner i >> self) 236{-# INLINE [1] awaitForever #-} 237 238-- | Send a single output value downstream. If the downstream @Pipe@ 239-- terminates, this @Pipe@ will terminate as well. 240-- 241-- Since 0.5.0 242yield :: Monad m 243 => o -- ^ output value 244 -> Pipe l i o u m () 245yield = HaveOutput (Done ()) 246{-# INLINE [1] yield #-} 247 248yieldM :: Monad m => m o -> Pipe l i o u m () 249yieldM = PipeM . liftM (HaveOutput (Done ())) 250{-# INLINE [1] yieldM #-} 251 252{-# RULES 253 "CI.yield o >> p" forall o (p :: Pipe l i o u m r). yield o >> p = HaveOutput p o 254 #-} 255 256 -- Rule does not fire due to inlining of lift 257 -- ; "lift m >>= CI.yield" forall m. lift m >>= yield = yieldM m 258 259 -- FIXME: Too much inlining on mapM_, can't enforce; "mapM_ CI.yield" mapM_ yield = sourceList 260 -- Maybe we can get a rewrite rule on foldr instead? Need a benchmark to back this up. 261 262-- | Provide a single piece of leftover input to be consumed by the next pipe 263-- in the current monadic binding. 264-- 265-- /Note/: it is highly encouraged to only return leftover values from input 266-- already consumed from upstream. 267-- 268-- Since 0.5.0 269leftover :: l -> Pipe l i o u m () 270leftover = Leftover (Done ()) 271{-# INLINE [1] leftover #-} 272{-# RULES "conduit: leftover l >> p" forall l (p :: Pipe l i o u m r). leftover l >> p = Leftover p l #-} 273 274-- | Bracket a pipe computation between allocation and release of a resource. 275-- We guarantee, via the @MonadResource@ context, that the resource 276-- finalization is exception safe. However, it will not necessarily be 277-- /prompt/, in that running a finalizer may wait until the @ResourceT@ block 278-- exits. 279-- 280-- Since 0.5.0 281bracketP :: MonadResource m 282 => IO a 283 -- ^ computation to run first (\"acquire resource\") 284 -> (a -> IO ()) 285 -- ^ computation to run last (\"release resource\") 286 -> (a -> Pipe l i o u m r) 287 -- ^ computation to run in-between 288 -> Pipe l i o u m r 289 -- returns the value from the in-between computation 290bracketP alloc free inside = do 291 (key, seed) <- allocate alloc free 292 res <- inside seed 293 release key 294 return res 295 296-- | The identity @Pipe@. 297-- 298-- Since 0.5.0 299idP :: Monad m => Pipe l a a r m r 300idP = NeedInput (HaveOutput idP) Done 301 302-- | Compose a left and right pipe together into a complete pipe. 303-- 304-- Since 0.5.0 305pipe :: Monad m => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2 306pipe = 307 goRight 308 where 309 goRight left right = 310 case right of 311 HaveOutput p o -> HaveOutput (recurse p) o 312 NeedInput rp rc -> goLeft rp rc left 313 Done r2 -> Done r2 314 PipeM mp -> PipeM (liftM recurse mp) 315 Leftover _ i -> absurd i 316 where 317 recurse = goRight left 318 319 goLeft rp rc left = 320 case left of 321 HaveOutput left' o -> goRight left' (rp o) 322 NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) 323 Done r1 -> goRight (Done r1) (rc r1) 324 PipeM mp -> PipeM (liftM recurse mp) 325 Leftover left' i -> Leftover (recurse left') i 326 where 327 recurse = goLeft rp rc 328 329-- | Same as 'pipe', but automatically applies 'injectLeftovers' to the right @Pipe@. 330-- 331-- Since 0.5.0 332pipeL :: Monad m => Pipe l a b r0 m r1 -> Pipe b b c r1 m r2 -> Pipe l a c r0 m r2 333-- Note: The following should be equivalent to the simpler: 334-- 335-- pipeL l r = l `pipe` injectLeftovers r 336-- 337-- However, this version tested as being significantly more efficient. 338pipeL = 339 goRight 340 where 341 goRight left right = 342 case right of 343 HaveOutput p o -> HaveOutput (recurse p) o 344 NeedInput rp rc -> goLeft rp rc left 345 Done r2 -> Done r2 346 PipeM mp -> PipeM (liftM recurse mp) 347 Leftover right' i -> goRight (HaveOutput left i) right' 348 where 349 recurse = goRight left 350 351 goLeft rp rc left = 352 case left of 353 HaveOutput left' o -> goRight left' (rp o) 354 NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) 355 Done r1 -> goRight (Done r1) (rc r1) 356 PipeM mp -> PipeM (liftM recurse mp) 357 Leftover left' i -> Leftover (recurse left') i 358 where 359 recurse = goLeft rp rc 360 361-- | Run a pipeline until processing completes. 362-- 363-- Since 0.5.0 364runPipe :: Monad m => Pipe Void () Void () m r -> m r 365runPipe (HaveOutput _ o) = absurd o 366runPipe (NeedInput _ c) = runPipe (c ()) 367runPipe (Done r) = return r 368runPipe (PipeM mp) = mp >>= runPipe 369runPipe (Leftover _ i) = absurd i 370 371-- | Transforms a @Pipe@ that provides leftovers to one which does not, 372-- allowing it to be composed. 373-- 374-- This function will provide any leftover values within this @Pipe@ to any 375-- calls to @await@. If there are more leftover values than are demanded, the 376-- remainder are discarded. 377-- 378-- Since 0.5.0 379injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r 380injectLeftovers = 381 go [] 382 where 383 go ls (HaveOutput p o) = HaveOutput (go ls p) o 384 go (l:ls) (NeedInput p _) = go ls $ p l 385 go [] (NeedInput p c) = NeedInput (go [] . p) (go [] . c) 386 go _ (Done r) = Done r 387 go ls (PipeM mp) = PipeM (liftM (go ls) mp) 388 go ls (Leftover p l) = go (l:ls) p 389 390-- | Transform the monad that a @Pipe@ lives in. 391-- 392-- Note that the monad transforming function will be run multiple times, 393-- resulting in unintuitive behavior in some cases. For a fuller treatment, 394-- please see: 395-- 396-- <https://github.com/snoyberg/conduit/wiki/Dealing-with-monad-transformers> 397-- 398-- This function is just a synonym for 'hoist'. 399-- 400-- Since 0.4.0 401transPipe :: Monad m => (forall a. m a -> n a) -> Pipe l i o u m r -> Pipe l i o u n r 402transPipe f (HaveOutput p o) = HaveOutput (transPipe f p) o 403transPipe f (NeedInput p c) = NeedInput (transPipe f . p) (transPipe f . c) 404transPipe _ (Done r) = Done r 405transPipe f (PipeM mp) = 406 PipeM (f $ liftM (transPipe f) $ collapse mp) 407 where 408 -- Combine a series of monadic actions into a single action. Since we 409 -- throw away side effects between different actions, an arbitrary break 410 -- between actions will lead to a violation of the monad transformer laws. 411 -- Example available at: 412 -- 413 -- http://hpaste.org/75520 414 collapse mpipe = do 415 pipe' <- mpipe 416 case pipe' of 417 PipeM mpipe' -> collapse mpipe' 418 _ -> return pipe' 419transPipe f (Leftover p i) = Leftover (transPipe f p) i 420 421-- | Apply a function to all the output values of a @Pipe@. 422-- 423-- This mimics the behavior of `fmap` for a `Source` and `Conduit` in pre-0.4 424-- days. 425-- 426-- Since 0.4.1 427mapOutput :: Monad m => (o1 -> o2) -> Pipe l i o1 u m r -> Pipe l i o2 u m r 428mapOutput f = 429 go 430 where 431 go (HaveOutput p o) = HaveOutput (go p) (f o) 432 go (NeedInput p c) = NeedInput (go . p) (go . c) 433 go (Done r) = Done r 434 go (PipeM mp) = PipeM (liftM (go) mp) 435 go (Leftover p i) = Leftover (go p) i 436{-# INLINE mapOutput #-} 437 438-- | Same as 'mapOutput', but use a function that returns @Maybe@ values. 439-- 440-- Since 0.5.0 441mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> Pipe l i o1 u m r -> Pipe l i o2 u m r 442mapOutputMaybe f = 443 go 444 where 445 go (HaveOutput p o) = maybe id (\o' p' -> HaveOutput p' o') (f o) (go p) 446 go (NeedInput p c) = NeedInput (go . p) (go . c) 447 go (Done r) = Done r 448 go (PipeM mp) = PipeM (liftM (go) mp) 449 go (Leftover p i) = Leftover (go p) i 450{-# INLINE mapOutputMaybe #-} 451 452-- | Apply a function to all the input values of a @Pipe@. 453-- 454-- Since 0.5.0 455mapInput :: Monad m 456 => (i1 -> i2) -- ^ map initial input to new input 457 -> (l2 -> Maybe l1) -- ^ map new leftovers to initial leftovers 458 -> Pipe l2 i2 o u m r 459 -> Pipe l1 i1 o u m r 460mapInput f f' (HaveOutput p o) = HaveOutput (mapInput f f' p) o 461mapInput f f' (NeedInput p c) = NeedInput (mapInput f f' . p . f) (mapInput f f' . c) 462mapInput _ _ (Done r) = Done r 463mapInput f f' (PipeM mp) = PipeM (liftM (mapInput f f') mp) 464mapInput f f' (Leftover p i) = maybe id (flip Leftover) (f' i) $ mapInput f f' p 465 466enumFromTo :: (Enum o, Eq o, Monad m) 467 => o 468 -> o 469 -> Pipe l i o u m () 470enumFromTo start stop = 471 loop start 472 where 473 loop i 474 | i == stop = HaveOutput (Done ()) i 475 | otherwise = HaveOutput (loop (succ i)) i 476{-# INLINE enumFromTo #-} 477 478-- | Convert a list into a source. 479-- 480-- Since 0.3.0 481sourceList :: Monad m => [a] -> Pipe l i a u m () 482sourceList = 483 go 484 where 485 go [] = Done () 486 go (o:os) = HaveOutput (go os) o 487{-# INLINE [1] sourceList #-} 488 489-- | The equivalent of @GHC.Exts.build@ for @Pipe@. 490-- 491-- Since 0.4.2 492build :: Monad m => (forall b. (o -> b -> b) -> b -> b) -> Pipe l i o u m () 493build g = g (\o p -> HaveOutput p o) (return ()) 494 495{-# RULES 496 "sourceList/build" forall (f :: (forall b. (a -> b -> b) -> b -> b)). sourceList (GHC.Exts.build f) = build f #-} 497 498-- | Returns a tuple of the upstream and downstream results. Note that this 499-- will force consumption of the entire input stream. 500-- 501-- Since 0.5.0 502withUpstream :: Monad m 503 => Pipe l i o u m r 504 -> Pipe l i o u m (u, r) 505withUpstream down = 506 down >>= go 507 where 508 go r = 509 loop 510 where 511 loop = awaitE >>= either (\u -> return (u, r)) (\_ -> loop) 512 513infixr 9 <+< 514infixl 9 >+> 515 516-- | Fuse together two @Pipe@s, connecting the output from the left to the 517-- input of the right. 518-- 519-- Notice that the /leftover/ parameter for the @Pipe@s must be @Void@. This 520-- ensures that there is no accidental data loss of leftovers during fusion. If 521-- you have a @Pipe@ with leftovers, you must first call 'injectLeftovers'. 522-- 523-- Since 0.5.0 524(>+>) :: Monad m => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2 525(>+>) = pipe 526{-# INLINE (>+>) #-} 527 528-- | Same as '>+>', but reverse the order of the arguments. 529-- 530-- Since 0.5.0 531(<+<) :: Monad m => Pipe Void b c r1 m r2 -> Pipe l a b r0 m r1 -> Pipe l a c r0 m r2 532(<+<) = flip pipe 533{-# INLINE (<+<) #-} 534 535-- | See 'catchC' for more details. 536-- 537-- Since 1.0.11 538catchP :: (MonadUnliftIO m, E.Exception e) 539 => Pipe l i o u m r 540 -> (e -> Pipe l i o u m r) 541 -> Pipe l i o u m r 542catchP p0 onErr = 543 go p0 544 where 545 go (Done r) = Done r 546 go (PipeM mp) = PipeM $ withRunInIO $ \run -> 547 E.catch (run (liftM go mp)) (return . onErr) 548 go (Leftover p i) = Leftover (go p) i 549 go (NeedInput x y) = NeedInput (go . x) (go . y) 550 go (HaveOutput p o) = HaveOutput (go p) o 551{-# INLINABLE catchP #-} 552 553-- | The same as @flip catchP@. 554-- 555-- Since 1.0.11 556handleP :: (MonadUnliftIO m, E.Exception e) 557 => (e -> Pipe l i o u m r) 558 -> Pipe l i o u m r 559 -> Pipe l i o u m r 560handleP = flip catchP 561{-# INLINE handleP #-} 562 563-- | See 'tryC' for more details. 564-- 565-- Since 1.0.11 566tryP :: (MonadUnliftIO m, E.Exception e) 567 => Pipe l i o u m r 568 -> Pipe l i o u m (Either e r) 569tryP p = (fmap Right p) `catchP` (return . Left) 570{-# INLINABLE tryP #-} 571 572-- | Generalize the upstream return value for a @Pipe@ from unit to any type. 573-- 574-- Since 1.1.5 575generalizeUpstream :: Monad m => Pipe l i o () m r -> Pipe l i o u m r 576generalizeUpstream = 577 go 578 where 579 go (HaveOutput p o) = HaveOutput (go p) o 580 go (NeedInput x y) = NeedInput (go . x) (\_ -> go (y ())) 581 go (Done r) = Done r 582 go (PipeM mp) = PipeM (liftM go mp) 583 go (Leftover p l) = Leftover (go p) l 584{-# INLINE generalizeUpstream #-} 585 586{- Rules don't fire due to inlining of lift 587{-# RULES "conduit: Pipe: lift x >>= f" forall m f. lift m >>= f = PipeM (liftM f m) #-} 588{-# RULES "conduit: Pipe: lift x >> f" forall m f. lift m >> f = PipeM (liftM (\_ -> f) m) #-} 589-} 590