1{-# OPTIONS_HADDOCK not-home #-} 2{-# LANGUAGE DeriveFunctor #-} 3{-# LANGUAGE FlexibleInstances #-} 4{-# LANGUAGE FlexibleContexts #-} 5{-# LANGUAGE CPP #-} 6{-# LANGUAGE MultiParamTypeClasses #-} 7{-# LANGUAGE UndecidableInstances #-} 8{-# LANGUAGE RankNTypes #-} 9{-# LANGUAGE TupleSections #-} 10{-# LANGUAGE Trustworthy #-} 11{-# LANGUAGE TypeFamilies #-} 12module Data.Conduit.Internal.Conduit 13 ( -- ** Types 14 ConduitT (..) 15 , ConduitM 16 , Source 17 , Producer 18 , Sink 19 , Consumer 20 , Conduit 21 , Flush (..) 22 -- *** Newtype wrappers 23 , ZipSource (..) 24 , ZipSink (..) 25 , ZipConduit (..) 26 -- ** Sealed 27 , SealedConduitT (..) 28 , sealConduitT 29 , unsealConduitT 30 -- ** Primitives 31 , await 32 , awaitForever 33 , yield 34 , yieldM 35 , leftover 36 , runConduit 37 , runConduitPure 38 , runConduitRes 39 , fuse 40 , connect 41 -- ** Composition 42 , connectResume 43 , connectResumeConduit 44 , fuseLeftovers 45 , fuseReturnLeftovers 46 , ($$+) 47 , ($$++) 48 , ($$+-) 49 , ($=+) 50 , (=$$+) 51 , (=$$++) 52 , (=$$+-) 53 , ($$) 54 , ($=) 55 , (=$) 56 , (=$=) 57 , (.|) 58 -- ** Generalizing 59 , sourceToPipe 60 , sinkToPipe 61 , conduitToPipe 62 , toProducer 63 , toConsumer 64 -- ** Cleanup 65 , bracketP 66 -- ** Exceptions 67 , catchC 68 , handleC 69 , tryC 70 -- ** Utilities 71 , Data.Conduit.Internal.Conduit.transPipe 72 , Data.Conduit.Internal.Conduit.mapOutput 73 , Data.Conduit.Internal.Conduit.mapOutputMaybe 74 , Data.Conduit.Internal.Conduit.mapInput 75 , Data.Conduit.Internal.Conduit.mapInputM 76 , zipSinks 77 , zipSources 78 , zipSourcesApp 79 , zipConduitApp 80 , mergeSource 81 , passthroughSink 82 , sourceToList 83 , fuseBoth 84 , fuseBothMaybe 85 , fuseUpstream 86 , sequenceSources 87 , sequenceSinks 88 , sequenceConduits 89 ) where 90 91import Control.Applicative (Applicative (..)) 92import Control.Exception (Exception) 93import qualified Control.Exception as E (catch) 94import Control.Monad (liftM, liftM2, ap) 95import Control.Monad.Fail(MonadFail(..)) 96import Control.Monad.Error.Class(MonadError(..)) 97import Control.Monad.Reader.Class(MonadReader(..)) 98import Control.Monad.RWS.Class(MonadRWS()) 99import Control.Monad.Writer.Class(MonadWriter(..), censor) 100import Control.Monad.State.Class(MonadState(..)) 101import Control.Monad.Trans.Class (MonadTrans (lift)) 102import Control.Monad.IO.Unlift (MonadIO (liftIO), MonadUnliftIO, withRunInIO) 103import Control.Monad.Primitive (PrimMonad, PrimState, primitive) 104import Data.Functor.Identity (Identity, runIdentity) 105import Data.Void (Void, absurd) 106import Data.Monoid (Monoid (mappend, mempty)) 107import Data.Semigroup (Semigroup ((<>))) 108import Control.Monad.Trans.Resource 109import Data.Conduit.Internal.Pipe hiding (yield, mapOutput, leftover, yieldM, await, awaitForever, bracketP) 110import qualified Data.Conduit.Internal.Pipe as CI 111import Control.Monad (forever) 112import Data.Traversable (Traversable (..)) 113 114-- | Core datatype of the conduit package. This type represents a general 115-- component which can consume a stream of input values @i@, produce a stream 116-- of output values @o@, perform actions in the @m@ monad, and produce a final 117-- result @r@. The type synonyms provided here are simply wrappers around this 118-- type. 119-- 120-- Since 1.3.0 121newtype ConduitT i o m r = ConduitT 122 { unConduitT :: forall b. 123 (r -> Pipe i i o () m b) -> Pipe i i o () m b 124 } 125 126-- | In order to provide for efficient monadic composition, the 127-- @ConduitT@ type is implemented internally using a technique known 128-- as the codensity transform. This allows for cheap appending, but 129-- makes one case much more expensive: partially running a @ConduitT@ 130-- and that capturing the new state. 131-- 132-- This data type is the same as @ConduitT@, but does not use the 133-- codensity transform technique. 134-- 135-- @since 1.3.0 136newtype SealedConduitT i o m r = SealedConduitT (Pipe i i o () m r) 137 138-- | Same as 'ConduitT', for backwards compat 139type ConduitM = ConduitT 140 141instance Functor (ConduitT i o m) where 142 fmap f (ConduitT c) = ConduitT $ \rest -> c (rest . f) 143 144instance Applicative (ConduitT i o m) where 145 pure x = ConduitT ($ x) 146 {-# INLINE pure #-} 147 (<*>) = ap 148 {-# INLINE (<*>) #-} 149 150instance Monad (ConduitT i o m) where 151 return = pure 152 ConduitT f >>= g = ConduitT $ \h -> f $ \a -> unConduitT (g a) h 153 154-- | @since 1.3.1 155instance MonadFail m => MonadFail (ConduitT i o m) where 156 fail = lift . Control.Monad.Fail.fail 157 158instance MonadThrow m => MonadThrow (ConduitT i o m) where 159 throwM = lift . throwM 160 161instance MonadIO m => MonadIO (ConduitT i o m) where 162 liftIO = lift . liftIO 163 {-# INLINE liftIO #-} 164 165instance MonadReader r m => MonadReader r (ConduitT i o m) where 166 ask = lift ask 167 {-# INLINE ask #-} 168 169 local f (ConduitT c0) = ConduitT $ \rest -> 170 let go (HaveOutput p o) = HaveOutput (go p) o 171 go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u)) 172 go (Done x) = rest x 173 go (PipeM mp) = PipeM (liftM go $ local f mp) 174 go (Leftover p i) = Leftover (go p) i 175 in go (c0 Done) 176 177#ifndef MIN_VERSION_mtl 178#define MIN_VERSION_mtl(x, y, z) 0 179#endif 180 181instance MonadWriter w m => MonadWriter w (ConduitT i o m) where 182#if MIN_VERSION_mtl(2, 1, 0) 183 writer = lift . writer 184#endif 185 tell = lift . tell 186 187 listen (ConduitT c0) = ConduitT $ \rest -> 188 let go front (HaveOutput p o) = HaveOutput (go front p) o 189 go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u)) 190 go front (Done x) = rest (x, front) 191 go front (PipeM mp) = PipeM $ do 192 (p,w) <- listen mp 193 return $ go (front `mappend` w) p 194 go front (Leftover p i) = Leftover (go front p) i 195 in go mempty (c0 Done) 196 197 pass (ConduitT c0) = ConduitT $ \rest -> 198 let go front (HaveOutput p o) = HaveOutput (go front p) o 199 go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u)) 200 go front (PipeM mp) = PipeM $ do 201 (p,w) <- censor (const mempty) (listen mp) 202 return $ go (front `mappend` w) p 203 go front (Done (x,f)) = PipeM $ do 204 tell (f front) 205 return $ rest x 206 go front (Leftover p i) = Leftover (go front p) i 207 in go mempty (c0 Done) 208 209instance MonadState s m => MonadState s (ConduitT i o m) where 210 get = lift get 211 put = lift . put 212#if MIN_VERSION_mtl(2, 1, 0) 213 state = lift . state 214#endif 215 216instance MonadRWS r w s m => MonadRWS r w s (ConduitT i o m) 217 218instance MonadError e m => MonadError e (ConduitT i o m) where 219 throwError = lift . throwError 220 catchError (ConduitT c0) f = ConduitT $ \rest -> 221 let go (HaveOutput p o) = HaveOutput (go p) o 222 go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u)) 223 go (Done x) = rest x 224 go (PipeM mp) = 225 PipeM $ catchError (liftM go mp) $ \e -> do 226 return $ unConduitT (f e) rest 227 go (Leftover p i) = Leftover (go p) i 228 in go (c0 Done) 229 230instance MonadTrans (ConduitT i o) where 231 lift mr = ConduitT $ \rest -> PipeM (liftM rest mr) 232 {-# INLINE [1] lift #-} 233 234instance MonadResource m => MonadResource (ConduitT i o m) where 235 liftResourceT = lift . liftResourceT 236 {-# INLINE liftResourceT #-} 237 238instance Monad m => Semigroup (ConduitT i o m ()) where 239 (<>) = (>>) 240 {-# INLINE (<>) #-} 241 242instance Monad m => Monoid (ConduitT i o m ()) where 243 mempty = return () 244 {-# INLINE mempty #-} 245#if !(MIN_VERSION_base(4,11,0)) 246 mappend = (<>) 247 {-# INLINE mappend #-} 248#endif 249 250instance PrimMonad m => PrimMonad (ConduitT i o m) where 251 type PrimState (ConduitT i o m) = PrimState m 252 primitive = lift . primitive 253 254-- | Provides a stream of output values, without consuming any input or 255-- producing a final result. 256-- 257-- Since 0.5.0 258type Source m o = ConduitT () o m () 259{-# DEPRECATED Source "Use ConduitT directly" #-} 260 261-- | A component which produces a stream of output values, regardless of the 262-- input stream. A @Producer@ is a generalization of a @Source@, and can be 263-- used as either a @Source@ or a @Conduit@. 264-- 265-- Since 1.0.0 266type Producer m o = forall i. ConduitT i o m () 267{-# DEPRECATED Producer "Use ConduitT directly" #-} 268 269-- | Consumes a stream of input values and produces a final result, without 270-- producing any output. 271-- 272-- > type Sink i m r = ConduitT i Void m r 273-- 274-- Since 0.5.0 275type Sink i = ConduitT i Void 276{-# DEPRECATED Sink "Use ConduitT directly" #-} 277 278-- | A component which consumes a stream of input values and produces a final 279-- result, regardless of the output stream. A @Consumer@ is a generalization of 280-- a @Sink@, and can be used as either a @Sink@ or a @Conduit@. 281-- 282-- Since 1.0.0 283type Consumer i m r = forall o. ConduitT i o m r 284{-# DEPRECATED Consumer "Use ConduitT directly" #-} 285 286-- | Consumes a stream of input values and produces a stream of output values, 287-- without producing a final result. 288-- 289-- Since 0.5.0 290type Conduit i m o = ConduitT i o m () 291{-# DEPRECATED Conduit "Use ConduitT directly" #-} 292 293sealConduitT :: ConduitT i o m r -> SealedConduitT i o m r 294sealConduitT (ConduitT f) = SealedConduitT (f Done) 295 296unsealConduitT :: Monad m => SealedConduitT i o m r -> ConduitT i o m r 297unsealConduitT (SealedConduitT f) = ConduitT (f >>=) 298 299-- | Connect a @Source@ to a @Sink@ until the latter closes. Returns both the 300-- most recent state of the @Source@ and the result of the @Sink@. 301-- 302-- Since 0.5.0 303connectResume :: Monad m 304 => SealedConduitT () a m () 305 -> ConduitT a Void m r 306 -> m (SealedConduitT () a m (), r) 307connectResume (SealedConduitT left0) (ConduitT right0) = 308 goRight left0 (right0 Done) 309 where 310 goRight left right = 311 case right of 312 HaveOutput _ o -> absurd o 313 NeedInput rp rc -> goLeft rp rc left 314 Done r2 -> return (SealedConduitT left, r2) 315 PipeM mp -> mp >>= goRight left 316 Leftover p i -> goRight (HaveOutput left i) p 317 318 goLeft rp rc left = 319 case left of 320 HaveOutput left' o -> goRight left' (rp o) 321 NeedInput _ lc -> recurse (lc ()) 322 Done () -> goRight (Done ()) (rc ()) 323 PipeM mp -> mp >>= recurse 324 Leftover p () -> recurse p 325 where 326 recurse = goLeft rp rc 327 328sourceToPipe :: Monad m => Source m o -> Pipe l i o u m () 329sourceToPipe = 330 go . flip unConduitT Done 331 where 332 go (HaveOutput p o) = HaveOutput (go p) o 333 go (NeedInput _ c) = go $ c () 334 go (Done ()) = Done () 335 go (PipeM mp) = PipeM (liftM go mp) 336 go (Leftover p ()) = go p 337 338sinkToPipe :: Monad m => Sink i m r -> Pipe l i o u m r 339sinkToPipe = 340 go . injectLeftovers . flip unConduitT Done 341 where 342 go (HaveOutput _ o) = absurd o 343 go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ()) 344 go (Done r) = Done r 345 go (PipeM mp) = PipeM (liftM go mp) 346 go (Leftover _ l) = absurd l 347 348conduitToPipe :: Monad m => Conduit i m o -> Pipe l i o u m () 349conduitToPipe = 350 go . injectLeftovers . flip unConduitT Done 351 where 352 go (HaveOutput p o) = HaveOutput (go p) o 353 go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ()) 354 go (Done ()) = Done () 355 go (PipeM mp) = PipeM (liftM go mp) 356 go (Leftover _ l) = absurd l 357 358-- | Generalize a 'Source' to a 'Producer'. 359-- 360-- Since 1.0.0 361toProducer :: Monad m => Source m a -> Producer m a 362toProducer (ConduitT c0) = ConduitT $ \rest -> let 363 go (HaveOutput p o) = HaveOutput (go p) o 364 go (NeedInput _ c) = go (c ()) 365 go (Done r) = rest r 366 go (PipeM mp) = PipeM (liftM go mp) 367 go (Leftover p ()) = go p 368 in go (c0 Done) 369 370-- | Generalize a 'Sink' to a 'Consumer'. 371-- 372-- Since 1.0.0 373toConsumer :: Monad m => Sink a m b -> Consumer a m b 374toConsumer (ConduitT c0) = ConduitT $ \rest -> let 375 go (HaveOutput _ o) = absurd o 376 go (NeedInput p c) = NeedInput (go . p) (go . c) 377 go (Done r) = rest r 378 go (PipeM mp) = PipeM (liftM go mp) 379 go (Leftover p l) = Leftover (go p) l 380 in go (c0 Done) 381 382-- | Catch all exceptions thrown by the current component of the pipeline. 383-- 384-- Note: this will /not/ catch exceptions thrown by other components! For 385-- example, if an exception is thrown in a @Source@ feeding to a @Sink@, and 386-- the @Sink@ uses @catchC@, the exception will /not/ be caught. 387-- 388-- Due to this behavior (as well as lack of async exception safety), you 389-- should not try to implement combinators such as @onException@ in terms of this 390-- primitive function. 391-- 392-- Note also that the exception handling will /not/ be applied to any 393-- finalizers generated by this conduit. 394-- 395-- Since 1.0.11 396catchC :: (MonadUnliftIO m, Exception e) 397 => ConduitT i o m r 398 -> (e -> ConduitT i o m r) 399 -> ConduitT i o m r 400catchC (ConduitT p0) onErr = ConduitT $ \rest -> let 401 go (Done r) = rest r 402 go (PipeM mp) = PipeM $ withRunInIO $ \run -> E.catch (run (liftM go mp)) 403 (return . flip unConduitT rest . onErr) 404 go (Leftover p i) = Leftover (go p) i 405 go (NeedInput x y) = NeedInput (go . x) (go . y) 406 go (HaveOutput p o) = HaveOutput (go p) o 407 in go (p0 Done) 408{-# INLINE catchC #-} 409 410-- | The same as @flip catchC@. 411-- 412-- Since 1.0.11 413handleC :: (MonadUnliftIO m, Exception e) 414 => (e -> ConduitT i o m r) 415 -> ConduitT i o m r 416 -> ConduitT i o m r 417handleC = flip catchC 418{-# INLINE handleC #-} 419 420-- | A version of @try@ for use within a pipeline. See the comments in @catchC@ 421-- for more details. 422-- 423-- Since 1.0.11 424tryC :: (MonadUnliftIO m, Exception e) 425 => ConduitT i o m r 426 -> ConduitT i o m (Either e r) 427tryC c = fmap Right c `catchC` (return . Left) 428{-# INLINE tryC #-} 429 430-- | Combines two sinks. The new sink will complete when both input sinks have 431-- completed. 432-- 433-- Any leftovers are discarded. 434-- 435-- Since 0.4.1 436zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r') 437zipSinks (ConduitT x0) (ConduitT y0) = ConduitT $ \rest -> let 438 Leftover _ i >< _ = absurd i 439 _ >< Leftover _ i = absurd i 440 HaveOutput _ o >< _ = absurd o 441 _ >< HaveOutput _ o = absurd o 442 443 PipeM mx >< y = PipeM (liftM (>< y) mx) 444 x >< PipeM my = PipeM (liftM (x ><) my) 445 Done x >< Done y = rest (x, y) 446 NeedInput px cx >< NeedInput py cy = NeedInput (\i -> px i >< py i) (\() -> cx () >< cy ()) 447 NeedInput px cx >< y@Done{} = NeedInput (\i -> px i >< y) (\u -> cx u >< y) 448 x@Done{} >< NeedInput py cy = NeedInput (\i -> x >< py i) (\u -> x >< cy u) 449 in injectLeftovers (x0 Done) >< injectLeftovers (y0 Done) 450 451-- | Combines two sources. The new source will stop producing once either 452-- source has been exhausted. 453-- 454-- Since 1.0.13 455zipSources :: Monad m => Source m a -> Source m b -> Source m (a, b) 456zipSources (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let 457 go (Leftover left ()) right = go left right 458 go left (Leftover right ()) = go left right 459 go (Done ()) (Done ()) = rest () 460 go (Done ()) (HaveOutput _ _) = rest () 461 go (HaveOutput _ _) (Done ()) = rest () 462 go (Done ()) (PipeM _) = rest () 463 go (PipeM _) (Done ()) = rest () 464 go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my) 465 go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx) 466 go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my) 467 go (HaveOutput srcx x) (HaveOutput srcy y) = HaveOutput (go srcx srcy) (x, y) 468 go (NeedInput _ c) right = go (c ()) right 469 go left (NeedInput _ c) = go left (c ()) 470 in go (left0 Done) (right0 Done) 471 472-- | Combines two sources. The new source will stop producing once either 473-- source has been exhausted. 474-- 475-- Since 1.0.13 476zipSourcesApp :: Monad m => Source m (a -> b) -> Source m a -> Source m b 477zipSourcesApp (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let 478 go (Leftover left ()) right = go left right 479 go left (Leftover right ()) = go left right 480 go (Done ()) (Done ()) = rest () 481 go (Done ()) (HaveOutput _ _) = rest () 482 go (HaveOutput _ _) (Done ()) = rest () 483 go (Done ()) (PipeM _) = rest () 484 go (PipeM _) (Done ()) = rest () 485 go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my) 486 go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx) 487 go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my) 488 go (HaveOutput srcx x) (HaveOutput srcy y) = HaveOutput (go srcx srcy) (x y) 489 go (NeedInput _ c) right = go (c ()) right 490 go left (NeedInput _ c) = go left (c ()) 491 in go (left0 Done) (right0 Done) 492 493-- | 494-- 495-- Since 1.0.17 496zipConduitApp 497 :: Monad m 498 => ConduitT i o m (x -> y) 499 -> ConduitT i o m x 500 -> ConduitT i o m y 501zipConduitApp (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let 502 go (Done f) (Done x) = rest (f x) 503 go (PipeM mx) y = PipeM (flip go y `liftM` mx) 504 go x (PipeM my) = PipeM (go x `liftM` my) 505 go (HaveOutput x o) y = HaveOutput (go x y) o 506 go x (HaveOutput y o) = HaveOutput (go x y) o 507 go (Leftover _ i) _ = absurd i 508 go _ (Leftover _ i) = absurd i 509 go (NeedInput px cx) (NeedInput py cy) = NeedInput 510 (\i -> go (px i) (py i)) 511 (\u -> go (cx u) (cy u)) 512 go (NeedInput px cx) (Done y) = NeedInput 513 (\i -> go (px i) (Done y)) 514 (\u -> go (cx u) (Done y)) 515 go (Done x) (NeedInput py cy) = NeedInput 516 (\i -> go (Done x) (py i)) 517 (\u -> go (Done x) (cy u)) 518 in go (injectLeftovers $ left0 Done) (injectLeftovers $ right0 Done) 519 520-- | Same as normal fusion (e.g. @=$=@), except instead of discarding leftovers 521-- from the downstream component, return them. 522-- 523-- Since 1.0.17 524fuseReturnLeftovers :: Monad m 525 => ConduitT a b m () 526 -> ConduitT b c m r 527 -> ConduitT a c m (r, [b]) 528fuseReturnLeftovers (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let 529 goRight bs left right = 530 case right of 531 HaveOutput p o -> HaveOutput (recurse p) o 532 NeedInput rp rc -> 533 case bs of 534 [] -> goLeft rp rc left 535 b:bs' -> goRight bs' left (rp b) 536 Done r2 -> rest (r2, bs) 537 PipeM mp -> PipeM (liftM recurse mp) 538 Leftover p b -> goRight (b:bs) left p 539 where 540 recurse = goRight bs left 541 542 goLeft rp rc left = 543 case left of 544 HaveOutput left' o -> goRight [] left' (rp o) 545 NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) 546 Done r1 -> goRight [] (Done r1) (rc r1) 547 PipeM mp -> PipeM (liftM recurse mp) 548 Leftover left' i -> Leftover (recurse left') i 549 where 550 recurse = goLeft rp rc 551 in goRight [] (left0 Done) (right0 Done) 552 553-- | Similar to @fuseReturnLeftovers@, but use the provided function to convert 554-- downstream leftovers to upstream leftovers. 555-- 556-- Since 1.0.17 557fuseLeftovers 558 :: Monad m 559 => ([b] -> [a]) 560 -> ConduitT a b m () 561 -> ConduitT b c m r 562 -> ConduitT a c m r 563fuseLeftovers f left right = do 564 (r, bs) <- fuseReturnLeftovers left right 565 mapM_ leftover $ reverse $ f bs 566 return r 567 568-- | Connect a 'Conduit' to a sink and return the output of the sink 569-- together with a new 'Conduit'. 570-- 571-- Since 1.0.17 572connectResumeConduit 573 :: Monad m 574 => SealedConduitT i o m () 575 -> ConduitT o Void m r 576 -> ConduitT i Void m (SealedConduitT i o m (), r) 577connectResumeConduit (SealedConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let 578 goRight left right = 579 case right of 580 HaveOutput _ o -> absurd o 581 NeedInput rp rc -> goLeft rp rc left 582 Done r2 -> rest (SealedConduitT left, r2) 583 PipeM mp -> PipeM (liftM (goRight left) mp) 584 Leftover p i -> goRight (HaveOutput left i) p 585 586 goLeft rp rc left = 587 case left of 588 HaveOutput left' o -> goRight left' (rp o) 589 NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) 590 Done () -> goRight (Done ()) (rc ()) 591 PipeM mp -> PipeM (liftM recurse mp) 592 Leftover left' i -> Leftover (recurse left') i -- recurse p 593 where 594 recurse = goLeft rp rc 595 in goRight left0 (right0 Done) 596 597-- | Merge a @Source@ into a @Conduit@. 598-- The new conduit will stop processing once either source or upstream have been exhausted. 599mergeSource 600 :: Monad m 601 => Source m i 602 -> Conduit a m (i, a) 603mergeSource = loop . sealConduitT 604 where 605 loop :: Monad m => SealedConduitT () i m () -> Conduit a m (i, a) 606 loop src0 = await >>= maybe (return ()) go 607 where 608 go a = do 609 (src1, mi) <- lift $ src0 $$++ await 610 case mi of 611 Nothing -> return () 612 Just i -> yield (i, a) >> loop src1 613 614 615-- | Turn a @Sink@ into a @Conduit@ in the following way: 616-- 617-- * All input passed to the @Sink@ is yielded downstream. 618-- 619-- * When the @Sink@ finishes processing, the result is passed to the provided to the finalizer function. 620-- 621-- Note that the @Sink@ will stop receiving input as soon as the downstream it 622-- is connected to shuts down. 623-- 624-- An example usage would be to write the result of a @Sink@ to some mutable 625-- variable while allowing other processing to continue. 626-- 627-- Since 1.1.0 628passthroughSink :: Monad m 629 => Sink i m r 630 -> (r -> m ()) -- ^ finalizer 631 -> Conduit i m i 632passthroughSink (ConduitT sink0) final = ConduitT $ \rest -> let 633 -- A bit of explanation is in order, this function is 634 -- non-obvious. The purpose of go is to keep track of the sink 635 -- we're passing values to, and then yield values downstream. The 636 -- third argument to go is the current state of that sink. That's 637 -- relatively straightforward. 638 -- 639 -- The second value is the leftover buffer. These are values that 640 -- the sink itself has called leftover on, and must be provided 641 -- back to the sink the next time it awaits. _However_, these 642 -- values should _not_ be reyielded downstream: we have already 643 -- yielded them downstream ourself, and it is the responsibility 644 -- of the functions wrapping around passthroughSink to handle the 645 -- leftovers from downstream. 646 -- 647 -- The trickiest bit is the first argument, which is a solution to 648 -- bug https://github.com/snoyberg/conduit/issues/304. The issue 649 -- is that, once we get a value, we need to provide it to both the 650 -- inner sink _and_ yield it downstream. The obvious thing to do 651 -- is yield first and then recursively call go. Unfortunately, 652 -- this doesn't work in all cases: if the downstream component 653 -- never calls await again, our yield call will never return, and 654 -- our sink will not get the last value. This results is confusing 655 -- behavior where the sink and downstream component receive a 656 -- different number of values. 657 -- 658 -- Solution: keep a buffer of the next value to yield downstream, 659 -- and only yield it downstream in one of two cases: our sink is 660 -- asking for another value, or our sink is done. This way, we 661 -- ensure that, in all cases, we pass exactly the same number of 662 -- values to the inner sink as to downstream. 663 664 go mbuf _ (Done r) = do 665 maybe (return ()) CI.yield mbuf 666 lift $ final r 667 unConduitT (awaitForever yield) rest 668 go mbuf is (Leftover sink i) = go mbuf (i:is) sink 669 go _ _ (HaveOutput _ o) = absurd o 670 go mbuf is (PipeM mx) = do 671 x <- lift mx 672 go mbuf is x 673 go mbuf (i:is) (NeedInput next _) = go mbuf is (next i) 674 go mbuf [] (NeedInput next done) = do 675 maybe (return ()) CI.yield mbuf 676 mx <- CI.await 677 case mx of 678 Nothing -> go Nothing [] (done ()) 679 Just x -> go (Just x) [] (next x) 680 in go Nothing [] (sink0 Done) 681 682-- | Convert a @Source@ into a list. The basic functionality can be explained as: 683-- 684-- > sourceToList src = src $$ Data.Conduit.List.consume 685-- 686-- However, @sourceToList@ is able to produce its results lazily, which cannot 687-- be done when running a conduit pipeline in general. Unlike the 688-- @Data.Conduit.Lazy@ module (in conduit-extra), this function performs no 689-- unsafe I\/O operations, and therefore can only be as lazily as the 690-- underlying monad. 691-- 692-- Since 1.2.6 693sourceToList :: Monad m => Source m a -> m [a] 694sourceToList = 695 go . flip unConduitT Done 696 where 697 go (Done _) = return [] 698 go (HaveOutput src x) = liftM (x:) (go src) 699 go (PipeM msrc) = msrc >>= go 700 go (NeedInput _ c) = go (c ()) 701 go (Leftover p _) = go p 702 703-- Define fixity of all our operators 704infixr 0 $$ 705infixl 1 $= 706infixr 2 =$ 707infixr 2 =$= 708infixr 0 $$+ 709infixr 0 $$++ 710infixr 0 $$+- 711infixl 1 $=+ 712infixr 2 .| 713 714-- | Equivalent to using 'runConduit' and '.|' together. 715-- 716-- Since 1.2.3 717connect :: Monad m 718 => ConduitT () a m () 719 -> ConduitT a Void m r 720 -> m r 721connect = ($$) 722 723-- | Named function synonym for '.|' 724-- 725-- Equivalent to '.|' and '=$='. However, the latter is 726-- deprecated and will be removed in a future version. 727-- 728-- Since 1.2.3 729fuse :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r 730fuse = (=$=) 731 732-- | Combine two @Conduit@s together into a new @Conduit@ (aka 'fuse'). 733-- 734-- Output from the upstream (left) conduit will be fed into the 735-- downstream (right) conduit. Processing will terminate when 736-- downstream (right) returns. 737-- Leftover data returned from the right @Conduit@ will be discarded. 738-- 739-- Equivalent to 'fuse' and '=$=', however the latter is deprecated and will 740-- be removed in a future version. 741-- 742-- Note that, while this operator looks like categorical composition 743-- (from "Control.Category"), there are a few reasons it's different: 744-- 745-- * The position of the type parameters to 'ConduitT' do not 746-- match. We would need to change @ConduitT i o m r@ to @ConduitT r 747-- m i o@, which would preclude a 'Monad' or 'MonadTrans' instance. 748-- 749-- * The result value from upstream and downstream are allowed to 750-- differ between upstream and downstream. In other words, we would 751-- need the type signature here to look like @ConduitT a b m r -> 752-- ConduitT b c m r -> ConduitT a c m r@. 753-- 754-- * Due to leftovers, we do not have a left identity in Conduit. This 755-- can be achieved with the underlying @Pipe@ datatype, but this is 756-- not generally recommended. See <https://stackoverflow.com/a/15263700>. 757-- 758-- @since 1.2.8 759(.|) :: Monad m 760 => ConduitM a b m () -- ^ upstream 761 -> ConduitM b c m r -- ^ downstream 762 -> ConduitM a c m r 763(.|) = fuse 764{-# INLINE (.|) #-} 765 766-- | The connect operator, which pulls data from a source and pushes to a sink. 767-- If you would like to keep the @Source@ open to be used for other 768-- operations, use the connect-and-resume operator '$$+'. 769-- 770-- Since 0.4.0 771($$) :: Monad m => Source m a -> Sink a m b -> m b 772src $$ sink = do 773 (rsrc, res) <- src $$+ sink 774 rsrc $$+- return () 775 return res 776{-# INLINE [1] ($$) #-} 777{-# DEPRECATED ($$) "Use runConduit and .|" #-} 778 779-- | A synonym for '=$=' for backwards compatibility. 780-- 781-- Since 0.4.0 782($=) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r 783($=) = (=$=) 784{-# INLINE [0] ($=) #-} 785{-# RULES "conduit: $= is =$=" ($=) = (=$=) #-} 786{-# DEPRECATED ($=) "Use .|" #-} 787 788-- | A synonym for '=$=' for backwards compatibility. 789-- 790-- Since 0.4.0 791(=$) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r 792(=$) = (=$=) 793{-# INLINE [0] (=$) #-} 794{-# RULES "conduit: =$ is =$=" (=$) = (=$=) #-} 795{-# DEPRECATED (=$) "Use .|" #-} 796 797-- | Deprecated fusion operator. 798-- 799-- Since 0.4.0 800(=$=) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r 801ConduitT left0 =$= ConduitT right0 = ConduitT $ \rest -> 802 let goRight left right = 803 case right of 804 HaveOutput p o -> HaveOutput (recurse p) o 805 NeedInput rp rc -> goLeft rp rc left 806 Done r2 -> rest r2 807 PipeM mp -> PipeM (liftM recurse mp) 808 Leftover right' i -> goRight (HaveOutput left i) right' 809 where 810 recurse = goRight left 811 812 goLeft rp rc left = 813 case left of 814 HaveOutput left' o -> goRight left' (rp o) 815 NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) 816 Done r1 -> goRight (Done r1) (rc r1) 817 PipeM mp -> PipeM (liftM recurse mp) 818 Leftover left' i -> Leftover (recurse left') i 819 where 820 recurse = goLeft rp rc 821 in goRight (left0 Done) (right0 Done) 822{-# INLINE [1] (=$=) #-} 823{-# DEPRECATED (=$=) "Use .|" #-} 824 825-- | Wait for a single input value from upstream. If no data is available, 826-- returns @Nothing@. Once @await@ returns @Nothing@, subsequent calls will 827-- also return @Nothing@. 828-- 829-- Since 0.5.0 830await :: Monad m => Consumer i m (Maybe i) 831await = ConduitT $ \f -> NeedInput (f . Just) (const $ f Nothing) 832{-# INLINE [0] await #-} 833 834await' :: Monad m 835 => ConduitT i o m r 836 -> (i -> ConduitT i o m r) 837 -> ConduitT i o m r 838await' f g = ConduitT $ \rest -> NeedInput 839 (\i -> unConduitT (g i) rest) 840 (const $ unConduitT f rest) 841{-# INLINE await' #-} 842{-# RULES "conduit: await >>= maybe" forall x y. await >>= maybe x y = await' x y #-} 843 844-- | Send a value downstream to the next component to consume. If the 845-- downstream component terminates, this call will never return control. 846-- 847-- Since 0.5.0 848yield :: Monad m 849 => o -- ^ output value 850 -> ConduitT i o m () 851yield o = ConduitT $ \rest -> HaveOutput (rest ()) o 852{-# INLINE yield #-} 853 854-- | Send a monadic value downstream for the next component to consume. 855-- 856-- @since 1.2.7 857yieldM :: Monad m => m o -> ConduitT i o m () 858yieldM mo = lift mo >>= yield 859{-# INLINE yieldM #-} 860 861 -- FIXME rule won't fire, see FIXME in .Pipe; "mapM_ yield" mapM_ yield = ConduitT . sourceList 862 863-- | Provide a single piece of leftover input to be consumed by the next 864-- component in the current monadic binding. 865-- 866-- /Note/: it is highly encouraged to only return leftover values from input 867-- already consumed from upstream. 868-- 869-- @since 0.5.0 870leftover :: i -> ConduitT i o m () 871leftover i = ConduitT $ \rest -> Leftover (rest ()) i 872{-# INLINE leftover #-} 873 874-- | Run a pipeline until processing completes. 875-- 876-- Since 1.2.1 877runConduit :: Monad m => ConduitT () Void m r -> m r 878runConduit (ConduitT p) = runPipe $ injectLeftovers $ p Done 879{-# INLINE [0] runConduit #-} 880 881-- | Bracket a conduit computation between allocation and release of a 882-- resource. Two guarantees are given about resource finalization: 883-- 884-- 1. It will be /prompt/. The finalization will be run as early as possible. 885-- 886-- 2. It is exception safe. Due to usage of @resourcet@, the finalization will 887-- be run in the event of any exceptions. 888-- 889-- Since 0.5.0 890bracketP :: MonadResource m 891 892 => IO a 893 -- ^ computation to run first (\"acquire resource\") 894 -> (a -> IO ()) 895 -- ^ computation to run last (\"release resource\") 896 -> (a -> ConduitT i o m r) 897 -- ^ computation to run in-between 898 -> ConduitT i o m r 899 -- returns the value from the in-between computation 900bracketP alloc free inside = ConduitT $ \rest -> do 901 (key, seed) <- allocate alloc free 902 unConduitT (inside seed) $ \res -> do 903 release key 904 rest res 905 906-- | Wait for input forever, calling the given inner component for each piece of 907-- new input. 908-- 909-- This function is provided as a convenience for the common pattern of 910-- @await@ing input, checking if it's @Just@ and then looping. 911-- 912-- Since 0.5.0 913awaitForever :: Monad m => (i -> ConduitT i o m r) -> ConduitT i o m () 914awaitForever f = ConduitT $ \rest -> 915 let go = NeedInput (\i -> unConduitT (f i) (const go)) rest 916 in go 917 918-- | Transform the monad that a @ConduitT@ lives in. 919-- 920-- Note that the monad transforming function will be run multiple times, 921-- resulting in unintuitive behavior in some cases. For a fuller treatment, 922-- please see: 923-- 924-- <https://github.com/snoyberg/conduit/wiki/Dealing-with-monad-transformers> 925-- 926-- Since 0.4.0 927transPipe :: Monad m => (forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r 928transPipe f (ConduitT c0) = ConduitT $ \rest -> let 929 go (HaveOutput p o) = HaveOutput (go p) o 930 go (NeedInput p c) = NeedInput (go . p) (go . c) 931 go (Done r) = rest r 932 go (PipeM mp) = 933 PipeM (f $ liftM go $ collapse mp) 934 where 935 -- Combine a series of monadic actions into a single action. Since we 936 -- throw away side effects between different actions, an arbitrary break 937 -- between actions will lead to a violation of the monad transformer laws. 938 -- Example available at: 939 -- 940 -- http://hpaste.org/75520 941 collapse mpipe = do 942 pipe' <- mpipe 943 case pipe' of 944 PipeM mpipe' -> collapse mpipe' 945 _ -> return pipe' 946 go (Leftover p i) = Leftover (go p) i 947 in go (c0 Done) 948 949-- | Apply a function to all the output values of a @ConduitT@. 950-- 951-- This mimics the behavior of `fmap` for a `Source` and `Conduit` in pre-0.4 952-- days. It can also be simulated by fusing with the @map@ conduit from 953-- "Data.Conduit.List". 954-- 955-- Since 0.4.1 956mapOutput :: Monad m => (o1 -> o2) -> ConduitT i o1 m r -> ConduitT i o2 m r 957mapOutput f (ConduitT c0) = ConduitT $ \rest -> let 958 go (HaveOutput p o) = HaveOutput (go p) (f o) 959 go (NeedInput p c) = NeedInput (go . p) (go . c) 960 go (Done r) = rest r 961 go (PipeM mp) = PipeM (liftM (go) mp) 962 go (Leftover p i) = Leftover (go p) i 963 in go (c0 Done) 964 965-- | Same as 'mapOutput', but use a function that returns @Maybe@ values. 966-- 967-- Since 0.5.0 968mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> ConduitT i o1 m r -> ConduitT i o2 m r 969mapOutputMaybe f (ConduitT c0) = ConduitT $ \rest -> let 970 go (HaveOutput p o) = maybe id (\o' p' -> HaveOutput p' o') (f o) (go p) 971 go (NeedInput p c) = NeedInput (go . p) (go . c) 972 go (Done r) = rest r 973 go (PipeM mp) = PipeM (liftM (go) mp) 974 go (Leftover p i) = Leftover (go p) i 975 in go (c0 Done) 976 977-- | Apply a function to all the input values of a @ConduitT@. 978-- 979-- Since 0.5.0 980mapInput :: Monad m 981 => (i1 -> i2) -- ^ map initial input to new input 982 -> (i2 -> Maybe i1) -- ^ map new leftovers to initial leftovers 983 -> ConduitT i2 o m r 984 -> ConduitT i1 o m r 985mapInput f f' (ConduitT c0) = ConduitT $ \rest -> let 986 go (HaveOutput p o) = HaveOutput (go p) o 987 go (NeedInput p c) = NeedInput (go . p . f) (go . c) 988 go (Done r) = rest r 989 go (PipeM mp) = PipeM $ liftM go mp 990 go (Leftover p i) = maybe id (flip Leftover) (f' i) (go p) 991 in go (c0 Done) 992 993-- | Apply a monadic action to all the input values of a @ConduitT@. 994-- 995-- Since 1.3.2 996mapInputM :: Monad m 997 => (i1 -> m i2) -- ^ map initial input to new input 998 -> (i2 -> m (Maybe i1)) -- ^ map new leftovers to initial leftovers 999 -> ConduitT i2 o m r 1000 -> ConduitT i1 o m r 1001mapInputM f f' (ConduitT c0) = ConduitT $ \rest -> let 1002 go (HaveOutput p o) = HaveOutput (go p) o 1003 go (NeedInput p c) = NeedInput (\i -> PipeM $ go . p <$> f i) (go . c) 1004 go (Done r) = rest r 1005 go (PipeM mp) = PipeM $ fmap go mp 1006 go (Leftover p i) = PipeM $ (\x -> maybe id (flip Leftover) x (go p)) <$> f' i 1007 in go (c0 Done) 1008 1009-- | The connect-and-resume operator. This does not close the @Source@, but 1010-- instead returns it to be used again. This allows a @Source@ to be used 1011-- incrementally in a large program, without forcing the entire program to live 1012-- in the @Sink@ monad. 1013-- 1014-- Mnemonic: connect + do more. 1015-- 1016-- Since 0.5.0 1017($$+) :: Monad m => Source m a -> Sink a m b -> m (SealedConduitT () a m (), b) 1018src $$+ sink = connectResume (sealConduitT src) sink 1019{-# INLINE ($$+) #-} 1020 1021-- | Continue processing after usage of @$$+@. 1022-- 1023-- Since 0.5.0 1024($$++) :: Monad m => SealedConduitT () a m () -> Sink a m b -> m (SealedConduitT () a m (), b) 1025($$++) = connectResume 1026{-# INLINE ($$++) #-} 1027 1028-- | Same as @$$++@ and @connectResume@, but doesn't include the 1029-- updated @SealedConduitT@. 1030-- 1031-- /NOTE/ In previous versions, this would cause finalizers to 1032-- run. Since version 1.3.0, there are no finalizers in conduit. 1033-- 1034-- Since 0.5.0 1035($$+-) :: Monad m => SealedConduitT () a m () -> Sink a m b -> m b 1036rsrc $$+- sink = do 1037 (_, res) <- connectResume rsrc sink 1038 return res 1039{-# INLINE ($$+-) #-} 1040 1041-- | Left fusion for a sealed source. 1042-- 1043-- Since 1.0.16 1044($=+) :: Monad m => SealedConduitT () a m () -> Conduit a m b -> SealedConduitT () b m () 1045SealedConduitT src $=+ ConduitT sink = SealedConduitT (src `pipeL` sink Done) 1046 1047-- | Provide for a stream of data that can be flushed. 1048-- 1049-- A number of @Conduit@s (e.g., zlib compression) need the ability to flush 1050-- the stream at some point. This provides a single wrapper datatype to be used 1051-- in all such circumstances. 1052-- 1053-- Since 0.3.0 1054data Flush a = Chunk a | Flush 1055 deriving (Show, Eq, Ord) 1056instance Functor Flush where 1057 fmap _ Flush = Flush 1058 fmap f (Chunk a) = Chunk (f a) 1059 1060-- | A wrapper for defining an 'Applicative' instance for 'Source's which allows 1061-- to combine sources together, generalizing 'zipSources'. A combined source 1062-- will take input yielded from each of its @Source@s until any of them stop 1063-- producing output. 1064-- 1065-- Since 1.0.13 1066newtype ZipSource m o = ZipSource { getZipSource :: Source m o } 1067 1068instance Monad m => Functor (ZipSource m) where 1069 fmap f = ZipSource . mapOutput f . getZipSource 1070instance Monad m => Applicative (ZipSource m) where 1071 pure = ZipSource . forever . yield 1072 (ZipSource f) <*> (ZipSource x) = ZipSource $ zipSourcesApp f x 1073 1074-- | Coalesce all values yielded by all of the @Source@s. 1075-- 1076-- Implemented on top of @ZipSource@ and as such, it exhibits the same 1077-- short-circuiting behavior as @ZipSource@. See that data type for more 1078-- details. If you want to create a source that yields *all* values from 1079-- multiple sources, use `sequence_`. 1080-- 1081-- Since 1.0.13 1082sequenceSources :: (Traversable f, Monad m) => f (Source m o) -> Source m (f o) 1083sequenceSources = getZipSource . sequenceA . fmap ZipSource 1084 1085-- | A wrapper for defining an 'Applicative' instance for 'Sink's which allows 1086-- to combine sinks together, generalizing 'zipSinks'. A combined sink 1087-- distributes the input to all its participants and when all finish, produces 1088-- the result. This allows to define functions like 1089-- 1090-- @ 1091-- sequenceSinks :: (Monad m) 1092-- => [Sink i m r] -> Sink i m [r] 1093-- sequenceSinks = getZipSink . sequenceA . fmap ZipSink 1094-- @ 1095-- 1096-- Note that the standard 'Applicative' instance for conduits works 1097-- differently. It feeds one sink with input until it finishes, then switches 1098-- to another, etc., and at the end combines their results. 1099-- 1100-- This newtype is in fact a type constrained version of 'ZipConduit', and has 1101-- the same behavior. It's presented as a separate type since (1) it 1102-- historically predates @ZipConduit@, and (2) the type constraining can make 1103-- your code clearer (and thereby make your error messages more easily 1104-- understood). 1105-- 1106-- Since 1.0.13 1107newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r } 1108 1109instance Monad m => Functor (ZipSink i m) where 1110 fmap f (ZipSink x) = ZipSink (liftM f x) 1111instance Monad m => Applicative (ZipSink i m) where 1112 pure = ZipSink . return 1113 (ZipSink f) <*> (ZipSink x) = 1114 ZipSink $ liftM (uncurry ($)) $ zipSinks f x 1115 1116-- | Send incoming values to all of the @Sink@ providing, and ultimately 1117-- coalesce together all return values. 1118-- 1119-- Implemented on top of @ZipSink@, see that data type for more details. 1120-- 1121-- Since 1.0.13 1122sequenceSinks :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r) 1123sequenceSinks = getZipSink . sequenceA . fmap ZipSink 1124 1125-- | The connect-and-resume operator. This does not close the @Conduit@, but 1126-- instead returns it to be used again. This allows a @Conduit@ to be used 1127-- incrementally in a large program, without forcing the entire program to live 1128-- in the @Sink@ monad. 1129-- 1130-- Leftover data returned from the @Sink@ will be discarded. 1131-- 1132-- Mnemonic: connect + do more. 1133-- 1134-- Since 1.0.17 1135(=$$+) :: Monad m 1136 => ConduitT a b m () 1137 -> ConduitT b Void m r 1138 -> ConduitT a Void m (SealedConduitT a b m (), r) 1139(=$$+) conduit = connectResumeConduit (sealConduitT conduit) 1140{-# INLINE (=$$+) #-} 1141 1142-- | Continue processing after usage of '=$$+'. Connect a 'SealedConduitT' to 1143-- a sink and return the output of the sink together with a new 1144-- 'SealedConduitT'. 1145-- 1146-- Since 1.0.17 1147(=$$++) :: Monad m => SealedConduitT i o m () -> ConduitT o Void m r -> ConduitT i Void m (SealedConduitT i o m (), r) 1148(=$$++) = connectResumeConduit 1149{-# INLINE (=$$++) #-} 1150 1151-- | Same as @=$$++@, but doesn't include the updated 1152-- @SealedConduitT@. 1153-- 1154-- /NOTE/ In previous versions, this would cause finalizers to 1155-- run. Since version 1.3.0, there are no finalizers in conduit. 1156-- 1157-- Since 1.0.17 1158(=$$+-) :: Monad m => SealedConduitT i o m () -> ConduitT o Void m r -> ConduitT i Void m r 1159rsrc =$$+- sink = do 1160 (_, res) <- connectResumeConduit rsrc sink 1161 return res 1162{-# INLINE (=$$+-) #-} 1163 1164 1165infixr 0 =$$+ 1166infixr 0 =$$++ 1167infixr 0 =$$+- 1168 1169-- | Provides an alternative @Applicative@ instance for @ConduitT@. In this instance, 1170-- every incoming value is provided to all @ConduitT@s, and output is coalesced together. 1171-- Leftovers from individual @ConduitT@s will be used within that component, and then discarded 1172-- at the end of their computation. Output and finalizers will both be handled in a left-biased manner. 1173-- 1174-- As an example, take the following program: 1175-- 1176-- @ 1177-- main :: IO () 1178-- main = do 1179-- let src = mapM_ yield [1..3 :: Int] 1180-- conduit1 = CL.map (+1) 1181-- conduit2 = CL.concatMap (replicate 2) 1182-- conduit = getZipConduit $ ZipConduit conduit1 <* ZipConduit conduit2 1183-- sink = CL.mapM_ print 1184-- src $$ conduit =$ sink 1185-- @ 1186-- 1187-- It will produce the output: 2, 1, 1, 3, 2, 2, 4, 3, 3 1188-- 1189-- Since 1.0.17 1190newtype ZipConduit i o m r = ZipConduit { getZipConduit :: ConduitT i o m r } 1191 deriving Functor 1192instance Monad m => Applicative (ZipConduit i o m) where 1193 pure = ZipConduit . pure 1194 ZipConduit left <*> ZipConduit right = ZipConduit (zipConduitApp left right) 1195 1196-- | Provide identical input to all of the @Conduit@s and combine their outputs 1197-- into a single stream. 1198-- 1199-- Implemented on top of @ZipConduit@, see that data type for more details. 1200-- 1201-- Since 1.0.17 1202sequenceConduits :: (Traversable f, Monad m) => f (ConduitT i o m r) -> ConduitT i o m (f r) 1203sequenceConduits = getZipConduit . sequenceA . fmap ZipConduit 1204 1205-- | Fuse two @ConduitT@s together, and provide the return value of both. Note 1206-- that this will force the entire upstream @ConduitT@ to be run to produce the 1207-- result value, even if the downstream terminates early. 1208-- 1209-- Since 1.1.5 1210fuseBoth :: Monad m => ConduitT a b m r1 -> ConduitT b c m r2 -> ConduitT a c m (r1, r2) 1211fuseBoth (ConduitT up) (ConduitT down) = 1212 ConduitT (pipeL (up Done) (withUpstream $ generalizeUpstream $ down Done) >>=) 1213{-# INLINE fuseBoth #-} 1214 1215-- | Like 'fuseBoth', but does not force consumption of the @Producer@. 1216-- In the case that the @Producer@ terminates, the result value is 1217-- provided as a @Just@ value. If it does not terminate, then a 1218-- @Nothing@ value is returned. 1219-- 1220-- One thing to note here is that "termination" here only occurs if the 1221-- @Producer@ actually yields a @Nothing@ value. For example, with the 1222-- @Producer@ @mapM_ yield [1..5]@, if five values are requested, the 1223-- @Producer@ has not yet terminated. Termination only occurs when the 1224-- sixth value is awaited for and the @Producer@ signals termination. 1225-- 1226-- Since 1.2.4 1227fuseBothMaybe 1228 :: Monad m 1229 => ConduitT a b m r1 1230 -> ConduitT b c m r2 1231 -> ConduitT a c m (Maybe r1, r2) 1232fuseBothMaybe (ConduitT up) (ConduitT down) = 1233 ConduitT (pipeL (up Done) (go Nothing $ down Done) >>=) 1234 where 1235 go mup (Done r) = Done (mup, r) 1236 go mup (PipeM mp) = PipeM $ liftM (go mup) mp 1237 go mup (HaveOutput p o) = HaveOutput (go mup p) o 1238 go _ (NeedInput p c) = NeedInput 1239 (\i -> go Nothing (p i)) 1240 (\u -> go (Just u) (c ())) 1241 go mup (Leftover p i) = Leftover (go mup p) i 1242{-# INLINABLE fuseBothMaybe #-} 1243 1244-- | Same as @fuseBoth@, but ignore the return value from the downstream 1245-- @Conduit@. Same caveats of forced consumption apply. 1246-- 1247-- Since 1.1.5 1248fuseUpstream :: Monad m => ConduitT a b m r -> Conduit b m c -> ConduitT a c m r 1249fuseUpstream up down = fmap fst (fuseBoth up down) 1250{-# INLINE fuseUpstream #-} 1251 1252-- Rewrite rules 1253 1254{- FIXME 1255{-# RULES "conduit: ConduitT: lift x >>= f" forall m f. lift m >>= f = ConduitT (PipeM (liftM (unConduitT . f) m)) #-} 1256{-# RULES "conduit: ConduitT: lift x >> f" forall m f. lift m >> f = ConduitT (PipeM (liftM (\_ -> unConduitT f) m)) #-} 1257 1258{-# RULES "conduit: ConduitT: liftIO x >>= f" forall m (f :: MonadIO m => a -> ConduitT i o m r). liftIO m >>= f = ConduitT (PipeM (liftM (unConduitT . f) (liftIO m))) #-} 1259{-# RULES "conduit: ConduitT: liftIO x >> f" forall m (f :: MonadIO m => ConduitT i o m r). liftIO m >> f = ConduitT (PipeM (liftM (\_ -> unConduitT f) (liftIO m))) #-} 1260 1261{-# RULES "conduit: ConduitT: liftBase x >>= f" forall m (f :: MonadBase b m => a -> ConduitT i o m r). liftBase m >>= f = ConduitT (PipeM (liftM (unConduitT . f) (liftBase m))) #-} 1262{-# RULES "conduit: ConduitT: liftBase x >> f" forall m (f :: MonadBase b m => ConduitT i o m r). liftBase m >> f = ConduitT (PipeM (liftM (\_ -> unConduitT f) (liftBase m))) #-} 1263 1264{-# RULES 1265 "yield o >> p" forall o (p :: ConduitT i o m r). yield o >> p = ConduitT (HaveOutput (unConduitT p) o) 1266 ; "when yield next" forall b o p. when b (yield o) >> p = 1267 if b then ConduitT (HaveOutput (unConduitT p) o) else p 1268 ; "unless yield next" forall b o p. unless b (yield o) >> p = 1269 if b then p else ConduitT (HaveOutput (unConduitT p) o) 1270 ; "lift m >>= yield" forall m. lift m >>= yield = yieldM m 1271 #-} 1272{-# RULES "conduit: leftover l >> p" forall l (p :: ConduitT i o m r). leftover l >> p = 1273 ConduitT (Leftover (unConduitT p) l) #-} 1274 -} 1275 1276-- | Run a pure pipeline until processing completes, i.e. a pipeline 1277-- with @Identity@ as the base monad. This is equivalient to 1278-- @runIdentity . runConduit@. 1279-- 1280-- @since 1.2.8 1281runConduitPure :: ConduitT () Void Identity r -> r 1282runConduitPure = runIdentity . runConduit 1283{-# INLINE runConduitPure #-} 1284 1285-- | Run a pipeline which acquires resources with @ResourceT@, and 1286-- then run the @ResourceT@ transformer. This is equivalent to 1287-- @runResourceT . runConduit@. 1288-- 1289-- @since 1.2.8 1290runConduitRes :: MonadUnliftIO m 1291 => ConduitT () Void (ResourceT m) r 1292 -> m r 1293runConduitRes = runResourceT . runConduit 1294{-# INLINE runConduitRes #-} 1295