1{-# OPTIONS_HADDOCK not-home #-} 2{-# LANGUAGE DeriveFunctor #-} 3{-# LANGUAGE FlexibleInstances #-} 4{-# LANGUAGE FlexibleContexts #-} 5{-# LANGUAGE CPP #-} 6{-# LANGUAGE MultiParamTypeClasses #-} 7{-# LANGUAGE UndecidableInstances #-} 8{-# LANGUAGE RankNTypes #-} 9{-# LANGUAGE TupleSections #-} 10{-# LANGUAGE Trustworthy #-} 11{-# LANGUAGE TypeFamilies #-} 12module Data.Conduit.Internal.Conduit 13 ( -- ** Types 14 ConduitT (..) 15 , ConduitM 16 , Source 17 , Producer 18 , Sink 19 , Consumer 20 , Conduit 21 , Flush (..) 22 -- *** Newtype wrappers 23 , ZipSource (..) 24 , ZipSink (..) 25 , ZipConduit (..) 26 -- ** Sealed 27 , SealedConduitT (..) 28 , sealConduitT 29 , unsealConduitT 30 -- ** Primitives 31 , await 32 , awaitForever 33 , yield 34 , yieldM 35 , leftover 36 , runConduit 37 , runConduitPure 38 , runConduitRes 39 , fuse 40 , connect 41 , unconsM 42 , unconsEitherM 43 -- ** Composition 44 , connectResume 45 , connectResumeConduit 46 , fuseLeftovers 47 , fuseReturnLeftovers 48 , ($$+) 49 , ($$++) 50 , ($$+-) 51 , ($=+) 52 , (=$$+) 53 , (=$$++) 54 , (=$$+-) 55 , ($$) 56 , ($=) 57 , (=$) 58 , (=$=) 59 , (.|) 60 -- ** Generalizing 61 , sourceToPipe 62 , sinkToPipe 63 , conduitToPipe 64 , toProducer 65 , toConsumer 66 -- ** Cleanup 67 , bracketP 68 -- ** Exceptions 69 , catchC 70 , handleC 71 , tryC 72 -- ** Utilities 73 , Data.Conduit.Internal.Conduit.transPipe 74 , Data.Conduit.Internal.Conduit.mapOutput 75 , Data.Conduit.Internal.Conduit.mapOutputMaybe 76 , Data.Conduit.Internal.Conduit.mapInput 77 , Data.Conduit.Internal.Conduit.mapInputM 78 , zipSinks 79 , zipSources 80 , zipSourcesApp 81 , zipConduitApp 82 , mergeSource 83 , passthroughSink 84 , sourceToList 85 , fuseBoth 86 , fuseBothMaybe 87 , fuseUpstream 88 , sequenceSources 89 , sequenceSinks 90 , sequenceConduits 91 ) where 92 93import Control.Applicative (Applicative (..)) 94import Control.Exception (Exception) 95import qualified Control.Exception as E (catch) 96import Control.Monad (liftM, liftM2, ap) 97import Control.Monad.Fail(MonadFail(..)) 98import Control.Monad.Error.Class(MonadError(..)) 99import Control.Monad.Reader.Class(MonadReader(..)) 100import Control.Monad.RWS.Class(MonadRWS()) 101import Control.Monad.Writer.Class(MonadWriter(..), censor) 102import Control.Monad.State.Class(MonadState(..)) 103import Control.Monad.Trans.Class (MonadTrans (lift)) 104import Control.Monad.IO.Unlift (MonadIO (liftIO), MonadUnliftIO, withRunInIO) 105import Control.Monad.Primitive (PrimMonad, PrimState, primitive) 106import Data.Functor.Identity (Identity, runIdentity) 107import Data.Void (Void, absurd) 108import Data.Monoid (Monoid (mappend, mempty)) 109import Data.Semigroup (Semigroup ((<>))) 110import Control.Monad.Trans.Resource 111import Data.Conduit.Internal.Pipe hiding (yield, mapOutput, leftover, yieldM, await, awaitForever, bracketP, unconsM, unconsEitherM) 112import qualified Data.Conduit.Internal.Pipe as CI 113import Control.Monad (forever) 114import Data.Traversable (Traversable (..)) 115 116-- | Core datatype of the conduit package. This type represents a general 117-- component which can consume a stream of input values @i@, produce a stream 118-- of output values @o@, perform actions in the @m@ monad, and produce a final 119-- result @r@. The type synonyms provided here are simply wrappers around this 120-- type. 121-- 122-- Since 1.3.0 123newtype ConduitT i o m r = ConduitT 124 { unConduitT :: forall b. 125 (r -> Pipe i i o () m b) -> Pipe i i o () m b 126 } 127 128-- | In order to provide for efficient monadic composition, the 129-- @ConduitT@ type is implemented internally using a technique known 130-- as the codensity transform. This allows for cheap appending, but 131-- makes one case much more expensive: partially running a @ConduitT@ 132-- and that capturing the new state. 133-- 134-- This data type is the same as @ConduitT@, but does not use the 135-- codensity transform technique. 136-- 137-- @since 1.3.0 138newtype SealedConduitT i o m r = SealedConduitT (Pipe i i o () m r) 139 140-- | Same as 'ConduitT', for backwards compat 141type ConduitM = ConduitT 142 143instance Functor (ConduitT i o m) where 144 fmap f (ConduitT c) = ConduitT $ \rest -> c (rest . f) 145 146instance Applicative (ConduitT i o m) where 147 pure x = ConduitT ($ x) 148 {-# INLINE pure #-} 149 (<*>) = ap 150 {-# INLINE (<*>) #-} 151 152instance Monad (ConduitT i o m) where 153 return = pure 154 ConduitT f >>= g = ConduitT $ \h -> f $ \a -> unConduitT (g a) h 155 156-- | @since 1.3.1 157instance MonadFail m => MonadFail (ConduitT i o m) where 158 fail = lift . Control.Monad.Fail.fail 159 160instance MonadThrow m => MonadThrow (ConduitT i o m) where 161 throwM = lift . throwM 162 163instance MonadIO m => MonadIO (ConduitT i o m) where 164 liftIO = lift . liftIO 165 {-# INLINE liftIO #-} 166 167instance MonadReader r m => MonadReader r (ConduitT i o m) where 168 ask = lift ask 169 {-# INLINE ask #-} 170 171 local f (ConduitT c0) = ConduitT $ \rest -> 172 let go (HaveOutput p o) = HaveOutput (go p) o 173 go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u)) 174 go (Done x) = rest x 175 go (PipeM mp) = PipeM (liftM go $ local f mp) 176 go (Leftover p i) = Leftover (go p) i 177 in go (c0 Done) 178 179#ifndef MIN_VERSION_mtl 180#define MIN_VERSION_mtl(x, y, z) 0 181#endif 182 183instance MonadWriter w m => MonadWriter w (ConduitT i o m) where 184#if MIN_VERSION_mtl(2, 1, 0) 185 writer = lift . writer 186#endif 187 tell = lift . tell 188 189 listen (ConduitT c0) = ConduitT $ \rest -> 190 let go front (HaveOutput p o) = HaveOutput (go front p) o 191 go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u)) 192 go front (Done x) = rest (x, front) 193 go front (PipeM mp) = PipeM $ do 194 (p,w) <- listen mp 195 return $ go (front `mappend` w) p 196 go front (Leftover p i) = Leftover (go front p) i 197 in go mempty (c0 Done) 198 199 pass (ConduitT c0) = ConduitT $ \rest -> 200 let go front (HaveOutput p o) = HaveOutput (go front p) o 201 go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u)) 202 go front (PipeM mp) = PipeM $ do 203 (p,w) <- censor (const mempty) (listen mp) 204 return $ go (front `mappend` w) p 205 go front (Done (x,f)) = PipeM $ do 206 tell (f front) 207 return $ rest x 208 go front (Leftover p i) = Leftover (go front p) i 209 in go mempty (c0 Done) 210 211instance MonadState s m => MonadState s (ConduitT i o m) where 212 get = lift get 213 put = lift . put 214#if MIN_VERSION_mtl(2, 1, 0) 215 state = lift . state 216#endif 217 218instance MonadRWS r w s m => MonadRWS r w s (ConduitT i o m) 219 220instance MonadError e m => MonadError e (ConduitT i o m) where 221 throwError = lift . throwError 222 catchError (ConduitT c0) f = ConduitT $ \rest -> 223 let go (HaveOutput p o) = HaveOutput (go p) o 224 go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u)) 225 go (Done x) = rest x 226 go (PipeM mp) = 227 PipeM $ catchError (liftM go mp) $ \e -> do 228 return $ unConduitT (f e) rest 229 go (Leftover p i) = Leftover (go p) i 230 in go (c0 Done) 231 232instance MonadTrans (ConduitT i o) where 233 lift mr = ConduitT $ \rest -> PipeM (liftM rest mr) 234 {-# INLINE [1] lift #-} 235 236instance MonadResource m => MonadResource (ConduitT i o m) where 237 liftResourceT = lift . liftResourceT 238 {-# INLINE liftResourceT #-} 239 240instance Monad m => Semigroup (ConduitT i o m ()) where 241 (<>) = (>>) 242 {-# INLINE (<>) #-} 243 244instance Monad m => Monoid (ConduitT i o m ()) where 245 mempty = return () 246 {-# INLINE mempty #-} 247#if !(MIN_VERSION_base(4,11,0)) 248 mappend = (<>) 249 {-# INLINE mappend #-} 250#endif 251 252instance PrimMonad m => PrimMonad (ConduitT i o m) where 253 type PrimState (ConduitT i o m) = PrimState m 254 primitive = lift . primitive 255 256-- | Provides a stream of output values, without consuming any input or 257-- producing a final result. 258-- 259-- Since 0.5.0 260type Source m o = ConduitT () o m () 261{-# DEPRECATED Source "Use ConduitT directly" #-} 262 263-- | A component which produces a stream of output values, regardless of the 264-- input stream. A @Producer@ is a generalization of a @Source@, and can be 265-- used as either a @Source@ or a @Conduit@. 266-- 267-- Since 1.0.0 268type Producer m o = forall i. ConduitT i o m () 269{-# DEPRECATED Producer "Use ConduitT directly" #-} 270 271-- | Consumes a stream of input values and produces a final result, without 272-- producing any output. 273-- 274-- > type Sink i m r = ConduitT i Void m r 275-- 276-- Since 0.5.0 277type Sink i = ConduitT i Void 278{-# DEPRECATED Sink "Use ConduitT directly" #-} 279 280-- | A component which consumes a stream of input values and produces a final 281-- result, regardless of the output stream. A @Consumer@ is a generalization of 282-- a @Sink@, and can be used as either a @Sink@ or a @Conduit@. 283-- 284-- Since 1.0.0 285type Consumer i m r = forall o. ConduitT i o m r 286{-# DEPRECATED Consumer "Use ConduitT directly" #-} 287 288-- | Consumes a stream of input values and produces a stream of output values, 289-- without producing a final result. 290-- 291-- Since 0.5.0 292type Conduit i m o = ConduitT i o m () 293{-# DEPRECATED Conduit "Use ConduitT directly" #-} 294 295sealConduitT :: ConduitT i o m r -> SealedConduitT i o m r 296sealConduitT (ConduitT f) = SealedConduitT (f Done) 297 298unsealConduitT :: Monad m => SealedConduitT i o m r -> ConduitT i o m r 299unsealConduitT (SealedConduitT f) = ConduitT (f >>=) 300 301-- | Connect a @Source@ to a @Sink@ until the latter closes. Returns both the 302-- most recent state of the @Source@ and the result of the @Sink@. 303-- 304-- Since 0.5.0 305connectResume :: Monad m 306 => SealedConduitT () a m () 307 -> ConduitT a Void m r 308 -> m (SealedConduitT () a m (), r) 309connectResume (SealedConduitT left0) (ConduitT right0) = 310 goRight left0 (right0 Done) 311 where 312 goRight left right = 313 case right of 314 HaveOutput _ o -> absurd o 315 NeedInput rp rc -> goLeft rp rc left 316 Done r2 -> return (SealedConduitT left, r2) 317 PipeM mp -> mp >>= goRight left 318 Leftover p i -> goRight (HaveOutput left i) p 319 320 goLeft rp rc left = 321 case left of 322 HaveOutput left' o -> goRight left' (rp o) 323 NeedInput _ lc -> recurse (lc ()) 324 Done () -> goRight (Done ()) (rc ()) 325 PipeM mp -> mp >>= recurse 326 Leftover p () -> recurse p 327 where 328 recurse = goLeft rp rc 329 330sourceToPipe :: Monad m => Source m o -> Pipe l i o u m () 331sourceToPipe = 332 go . (`unConduitT` Done) 333 where 334 go (HaveOutput p o) = HaveOutput (go p) o 335 go (NeedInput _ c) = go $ c () 336 go (Done ()) = Done () 337 go (PipeM mp) = PipeM (liftM go mp) 338 go (Leftover p ()) = go p 339 340sinkToPipe :: Monad m => Sink i m r -> Pipe l i o u m r 341sinkToPipe = 342 go . injectLeftovers . (`unConduitT` Done) 343 where 344 go (HaveOutput _ o) = absurd o 345 go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ()) 346 go (Done r) = Done r 347 go (PipeM mp) = PipeM (liftM go mp) 348 go (Leftover _ l) = absurd l 349 350conduitToPipe :: Monad m => Conduit i m o -> Pipe l i o u m () 351conduitToPipe = 352 go . injectLeftovers . (`unConduitT` Done) 353 where 354 go (HaveOutput p o) = HaveOutput (go p) o 355 go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ()) 356 go (Done ()) = Done () 357 go (PipeM mp) = PipeM (liftM go mp) 358 go (Leftover _ l) = absurd l 359 360-- | Generalize a 'Source' to a 'Producer'. 361-- 362-- Since 1.0.0 363toProducer :: Monad m => Source m a -> Producer m a 364toProducer (ConduitT c0) = ConduitT $ \rest -> let 365 go (HaveOutput p o) = HaveOutput (go p) o 366 go (NeedInput _ c) = go (c ()) 367 go (Done r) = rest r 368 go (PipeM mp) = PipeM (liftM go mp) 369 go (Leftover p ()) = go p 370 in go (c0 Done) 371 372-- | Generalize a 'Sink' to a 'Consumer'. 373-- 374-- Since 1.0.0 375toConsumer :: Monad m => Sink a m b -> Consumer a m b 376toConsumer (ConduitT c0) = ConduitT $ \rest -> let 377 go (HaveOutput _ o) = absurd o 378 go (NeedInput p c) = NeedInput (go . p) (go . c) 379 go (Done r) = rest r 380 go (PipeM mp) = PipeM (liftM go mp) 381 go (Leftover p l) = Leftover (go p) l 382 in go (c0 Done) 383 384-- | Catch all exceptions thrown by the current component of the pipeline. 385-- 386-- Note: this will /not/ catch exceptions thrown by other components! For 387-- example, if an exception is thrown in a @Source@ feeding to a @Sink@, and 388-- the @Sink@ uses @catchC@, the exception will /not/ be caught. 389-- 390-- Due to this behavior (as well as lack of async exception safety), you 391-- should not try to implement combinators such as @onException@ in terms of this 392-- primitive function. 393-- 394-- Note also that the exception handling will /not/ be applied to any 395-- finalizers generated by this conduit. 396-- 397-- Since 1.0.11 398catchC :: (MonadUnliftIO m, Exception e) 399 => ConduitT i o m r 400 -> (e -> ConduitT i o m r) 401 -> ConduitT i o m r 402catchC (ConduitT p0) onErr = ConduitT $ \rest -> let 403 go (Done r) = rest r 404 go (PipeM mp) = PipeM $ withRunInIO $ \run -> E.catch (run (liftM go mp)) 405 (return . (`unConduitT`rest) . onErr) 406 go (Leftover p i) = Leftover (go p) i 407 go (NeedInput x y) = NeedInput (go . x) (go . y) 408 go (HaveOutput p o) = HaveOutput (go p) o 409 in go (p0 Done) 410{-# INLINE catchC #-} 411 412-- | The same as @flip catchC@. 413-- 414-- Since 1.0.11 415handleC :: (MonadUnliftIO m, Exception e) 416 => (e -> ConduitT i o m r) 417 -> ConduitT i o m r 418 -> ConduitT i o m r 419handleC = flip catchC 420{-# INLINE handleC #-} 421 422-- | A version of @try@ for use within a pipeline. See the comments in @catchC@ 423-- for more details. 424-- 425-- Since 1.0.11 426tryC :: (MonadUnliftIO m, Exception e) 427 => ConduitT i o m r 428 -> ConduitT i o m (Either e r) 429tryC c = fmap Right c `catchC` (return . Left) 430{-# INLINE tryC #-} 431 432-- | Combines two sinks. The new sink will complete when both input sinks have 433-- completed. 434-- 435-- Any leftovers are discarded. 436-- 437-- Since 0.4.1 438zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r') 439zipSinks (ConduitT x0) (ConduitT y0) = ConduitT $ \rest -> let 440 Leftover _ i >< _ = absurd i 441 _ >< Leftover _ i = absurd i 442 HaveOutput _ o >< _ = absurd o 443 _ >< HaveOutput _ o = absurd o 444 445 PipeM mx >< y = PipeM (liftM (>< y) mx) 446 x >< PipeM my = PipeM (liftM (x ><) my) 447 Done x >< Done y = rest (x, y) 448 NeedInput px cx >< NeedInput py cy = NeedInput (\i -> px i >< py i) (\() -> cx () >< cy ()) 449 NeedInput px cx >< y@Done{} = NeedInput (\i -> px i >< y) (\u -> cx u >< y) 450 x@Done{} >< NeedInput py cy = NeedInput (\i -> x >< py i) (\u -> x >< cy u) 451 in injectLeftovers (x0 Done) >< injectLeftovers (y0 Done) 452 453-- | Combines two sources. The new source will stop producing once either 454-- source has been exhausted. 455-- 456-- Since 1.0.13 457zipSources :: Monad m => Source m a -> Source m b -> Source m (a, b) 458zipSources (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let 459 go (Leftover left ()) right = go left right 460 go left (Leftover right ()) = go left right 461 go (Done ()) (Done ()) = rest () 462 go (Done ()) (HaveOutput _ _) = rest () 463 go (HaveOutput _ _) (Done ()) = rest () 464 go (Done ()) (PipeM _) = rest () 465 go (PipeM _) (Done ()) = rest () 466 go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my) 467 go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx) 468 go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my) 469 go (HaveOutput srcx x) (HaveOutput srcy y) = HaveOutput (go srcx srcy) (x, y) 470 go (NeedInput _ c) right = go (c ()) right 471 go left (NeedInput _ c) = go left (c ()) 472 in go (left0 Done) (right0 Done) 473 474-- | Combines two sources. The new source will stop producing once either 475-- source has been exhausted. 476-- 477-- Since 1.0.13 478zipSourcesApp :: Monad m => Source m (a -> b) -> Source m a -> Source m b 479zipSourcesApp (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let 480 go (Leftover left ()) right = go left right 481 go left (Leftover right ()) = go left right 482 go (Done ()) (Done ()) = rest () 483 go (Done ()) (HaveOutput _ _) = rest () 484 go (HaveOutput _ _) (Done ()) = rest () 485 go (Done ()) (PipeM _) = rest () 486 go (PipeM _) (Done ()) = rest () 487 go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my) 488 go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx) 489 go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my) 490 go (HaveOutput srcx x) (HaveOutput srcy y) = HaveOutput (go srcx srcy) (x y) 491 go (NeedInput _ c) right = go (c ()) right 492 go left (NeedInput _ c) = go left (c ()) 493 in go (left0 Done) (right0 Done) 494 495-- | 496-- 497-- Since 1.0.17 498zipConduitApp 499 :: Monad m 500 => ConduitT i o m (x -> y) 501 -> ConduitT i o m x 502 -> ConduitT i o m y 503zipConduitApp (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let 504 go (Done f) (Done x) = rest (f x) 505 go (PipeM mx) y = PipeM (flip go y `liftM` mx) 506 go x (PipeM my) = PipeM (go x `liftM` my) 507 go (HaveOutput x o) y = HaveOutput (go x y) o 508 go x (HaveOutput y o) = HaveOutput (go x y) o 509 go (Leftover _ i) _ = absurd i 510 go _ (Leftover _ i) = absurd i 511 go (NeedInput px cx) (NeedInput py cy) = NeedInput 512 (\i -> go (px i) (py i)) 513 (\u -> go (cx u) (cy u)) 514 go (NeedInput px cx) (Done y) = NeedInput 515 (\i -> go (px i) (Done y)) 516 (\u -> go (cx u) (Done y)) 517 go (Done x) (NeedInput py cy) = NeedInput 518 (\i -> go (Done x) (py i)) 519 (\u -> go (Done x) (cy u)) 520 in go (injectLeftovers $ left0 Done) (injectLeftovers $ right0 Done) 521 522-- | Same as normal fusion (e.g. @=$=@), except instead of discarding leftovers 523-- from the downstream component, return them. 524-- 525-- Since 1.0.17 526fuseReturnLeftovers :: Monad m 527 => ConduitT a b m () 528 -> ConduitT b c m r 529 -> ConduitT a c m (r, [b]) 530fuseReturnLeftovers (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let 531 goRight bs left right = 532 case right of 533 HaveOutput p o -> HaveOutput (recurse p) o 534 NeedInput rp rc -> 535 case bs of 536 [] -> goLeft rp rc left 537 b:bs' -> goRight bs' left (rp b) 538 Done r2 -> rest (r2, bs) 539 PipeM mp -> PipeM (liftM recurse mp) 540 Leftover p b -> goRight (b:bs) left p 541 where 542 recurse = goRight bs left 543 544 goLeft rp rc left = 545 case left of 546 HaveOutput left' o -> goRight [] left' (rp o) 547 NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) 548 Done r1 -> goRight [] (Done r1) (rc r1) 549 PipeM mp -> PipeM (liftM recurse mp) 550 Leftover left' i -> Leftover (recurse left') i 551 where 552 recurse = goLeft rp rc 553 in goRight [] (left0 Done) (right0 Done) 554 555-- | Similar to @fuseReturnLeftovers@, but use the provided function to convert 556-- downstream leftovers to upstream leftovers. 557-- 558-- Since 1.0.17 559fuseLeftovers 560 :: Monad m 561 => ([b] -> [a]) 562 -> ConduitT a b m () 563 -> ConduitT b c m r 564 -> ConduitT a c m r 565fuseLeftovers f left right = do 566 (r, bs) <- fuseReturnLeftovers left right 567 mapM_ leftover $ reverse $ f bs 568 return r 569 570-- | Connect a 'Conduit' to a sink and return the output of the sink 571-- together with a new 'Conduit'. 572-- 573-- Since 1.0.17 574connectResumeConduit 575 :: Monad m 576 => SealedConduitT i o m () 577 -> ConduitT o Void m r 578 -> ConduitT i Void m (SealedConduitT i o m (), r) 579connectResumeConduit (SealedConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let 580 goRight left right = 581 case right of 582 HaveOutput _ o -> absurd o 583 NeedInput rp rc -> goLeft rp rc left 584 Done r2 -> rest (SealedConduitT left, r2) 585 PipeM mp -> PipeM (liftM (goRight left) mp) 586 Leftover p i -> goRight (HaveOutput left i) p 587 588 goLeft rp rc left = 589 case left of 590 HaveOutput left' o -> goRight left' (rp o) 591 NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) 592 Done () -> goRight (Done ()) (rc ()) 593 PipeM mp -> PipeM (liftM recurse mp) 594 Leftover left' i -> Leftover (recurse left') i -- recurse p 595 where 596 recurse = goLeft rp rc 597 in goRight left0 (right0 Done) 598 599-- | Merge a @Source@ into a @Conduit@. 600-- The new conduit will stop processing once either source or upstream have been exhausted. 601mergeSource 602 :: Monad m 603 => Source m i 604 -> Conduit a m (i, a) 605mergeSource = loop . sealConduitT 606 where 607 loop :: Monad m => SealedConduitT () i m () -> Conduit a m (i, a) 608 loop src0 = await >>= maybe (return ()) go 609 where 610 go a = do 611 (src1, mi) <- lift $ src0 $$++ await 612 case mi of 613 Nothing -> return () 614 Just i -> yield (i, a) >> loop src1 615 616 617-- | Turn a @Sink@ into a @Conduit@ in the following way: 618-- 619-- * All input passed to the @Sink@ is yielded downstream. 620-- 621-- * When the @Sink@ finishes processing, the result is passed to the provided to the finalizer function. 622-- 623-- Note that the @Sink@ will stop receiving input as soon as the downstream it 624-- is connected to shuts down. 625-- 626-- An example usage would be to write the result of a @Sink@ to some mutable 627-- variable while allowing other processing to continue. 628-- 629-- Since 1.1.0 630passthroughSink :: Monad m 631 => Sink i m r 632 -> (r -> m ()) -- ^ finalizer 633 -> Conduit i m i 634passthroughSink (ConduitT sink0) final = ConduitT $ \rest -> let 635 -- A bit of explanation is in order, this function is 636 -- non-obvious. The purpose of go is to keep track of the sink 637 -- we're passing values to, and then yield values downstream. The 638 -- third argument to go is the current state of that sink. That's 639 -- relatively straightforward. 640 -- 641 -- The second value is the leftover buffer. These are values that 642 -- the sink itself has called leftover on, and must be provided 643 -- back to the sink the next time it awaits. _However_, these 644 -- values should _not_ be reyielded downstream: we have already 645 -- yielded them downstream ourself, and it is the responsibility 646 -- of the functions wrapping around passthroughSink to handle the 647 -- leftovers from downstream. 648 -- 649 -- The trickiest bit is the first argument, which is a solution to 650 -- bug https://github.com/snoyberg/conduit/issues/304. The issue 651 -- is that, once we get a value, we need to provide it to both the 652 -- inner sink _and_ yield it downstream. The obvious thing to do 653 -- is yield first and then recursively call go. Unfortunately, 654 -- this doesn't work in all cases: if the downstream component 655 -- never calls await again, our yield call will never return, and 656 -- our sink will not get the last value. This results is confusing 657 -- behavior where the sink and downstream component receive a 658 -- different number of values. 659 -- 660 -- Solution: keep a buffer of the next value to yield downstream, 661 -- and only yield it downstream in one of two cases: our sink is 662 -- asking for another value, or our sink is done. This way, we 663 -- ensure that, in all cases, we pass exactly the same number of 664 -- values to the inner sink as to downstream. 665 666 go mbuf _ (Done r) = do 667 maybe (return ()) CI.yield mbuf 668 lift $ final r 669 unConduitT (awaitForever yield) rest 670 go mbuf is (Leftover sink i) = go mbuf (i:is) sink 671 go _ _ (HaveOutput _ o) = absurd o 672 go mbuf is (PipeM mx) = do 673 x <- lift mx 674 go mbuf is x 675 go mbuf (i:is) (NeedInput next _) = go mbuf is (next i) 676 go mbuf [] (NeedInput next done) = do 677 maybe (return ()) CI.yield mbuf 678 mx <- CI.await 679 case mx of 680 Nothing -> go Nothing [] (done ()) 681 Just x -> go (Just x) [] (next x) 682 in go Nothing [] (sink0 Done) 683 684-- | Convert a @Source@ into a list. The basic functionality can be explained as: 685-- 686-- > sourceToList src = src $$ Data.Conduit.List.consume 687-- 688-- However, @sourceToList@ is able to produce its results lazily, which cannot 689-- be done when running a conduit pipeline in general. Unlike the 690-- @Data.Conduit.Lazy@ module (in conduit-extra), this function performs no 691-- unsafe I\/O operations, and therefore can only be as lazily as the 692-- underlying monad. 693-- 694-- Since 1.2.6 695sourceToList :: Monad m => Source m a -> m [a] 696sourceToList = 697 go . (`unConduitT` Done) 698 where 699 go (Done _) = return [] 700 go (HaveOutput src x) = liftM (x:) (go src) 701 go (PipeM msrc) = msrc >>= go 702 go (NeedInput _ c) = go (c ()) 703 go (Leftover p _) = go p 704 705-- Define fixity of all our operators 706infixr 0 $$ 707infixl 1 $= 708infixr 2 =$ 709infixr 2 =$= 710infixr 0 $$+ 711infixr 0 $$++ 712infixr 0 $$+- 713infixl 1 $=+ 714infixr 2 .| 715 716-- | Equivalent to using 'runConduit' and '.|' together. 717-- 718-- Since 1.2.3 719connect :: Monad m 720 => ConduitT () a m () 721 -> ConduitT a Void m r 722 -> m r 723connect = ($$) 724 725-- | Split a conduit into head and tail. 726-- 727-- Note that you have to 'sealConduitT' it first. 728-- 729-- Since 1.3.3 730unconsM :: Monad m 731 => SealedConduitT () o m () 732 -> m (Maybe (o, SealedConduitT () o m ())) 733unconsM (SealedConduitT p) = go p 734 where 735 -- This function is the same as @Pipe.unconsM@ but it ignores leftovers. 736 go (HaveOutput p o) = pure $ Just (o, SealedConduitT p) 737 go (NeedInput _ c) = go $ c () 738 go (Done ()) = pure Nothing 739 go (PipeM mp) = mp >>= go 740 go (Leftover p ()) = go p 741 742-- | Split a conduit into head and tail or return its result if it is done. 743-- 744-- Note that you have to 'sealConduitT' it first. 745-- 746-- Since 1.3.3 747unconsEitherM :: Monad m 748 => SealedConduitT () o m r 749 -> m (Either r (o, SealedConduitT () o m r)) 750unconsEitherM (SealedConduitT p) = go p 751 where 752 -- This function is the same as @Pipe.unconsEitherM@ but it ignores leftovers. 753 go (HaveOutput p o) = pure $ Right (o, SealedConduitT p) 754 go (NeedInput _ c) = go $ c () 755 go (Done r) = pure $ Left r 756 go (PipeM mp) = mp >>= go 757 go (Leftover p ()) = go p 758 759-- | Named function synonym for '.|' 760-- 761-- Equivalent to '.|' and '=$='. However, the latter is 762-- deprecated and will be removed in a future version. 763-- 764-- Since 1.2.3 765fuse :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r 766fuse = (=$=) 767 768-- | Combine two @Conduit@s together into a new @Conduit@ (aka 'fuse'). 769-- 770-- Output from the upstream (left) conduit will be fed into the 771-- downstream (right) conduit. Processing will terminate when 772-- downstream (right) returns. 773-- Leftover data returned from the right @Conduit@ will be discarded. 774-- 775-- Equivalent to 'fuse' and '=$=', however the latter is deprecated and will 776-- be removed in a future version. 777-- 778-- Note that, while this operator looks like categorical composition 779-- (from "Control.Category"), there are a few reasons it's different: 780-- 781-- * The position of the type parameters to 'ConduitT' do not 782-- match. We would need to change @ConduitT i o m r@ to @ConduitT r 783-- m i o@, which would preclude a 'Monad' or 'MonadTrans' instance. 784-- 785-- * The result value from upstream and downstream are allowed to 786-- differ between upstream and downstream. In other words, we would 787-- need the type signature here to look like @ConduitT a b m r -> 788-- ConduitT b c m r -> ConduitT a c m r@. 789-- 790-- * Due to leftovers, we do not have a left identity in Conduit. This 791-- can be achieved with the underlying @Pipe@ datatype, but this is 792-- not generally recommended. See <https://stackoverflow.com/a/15263700>. 793-- 794-- @since 1.2.8 795(.|) :: Monad m 796 => ConduitM a b m () -- ^ upstream 797 -> ConduitM b c m r -- ^ downstream 798 -> ConduitM a c m r 799(.|) = fuse 800{-# INLINE (.|) #-} 801 802-- | The connect operator, which pulls data from a source and pushes to a sink. 803-- If you would like to keep the @Source@ open to be used for other 804-- operations, use the connect-and-resume operator '$$+'. 805-- 806-- Since 0.4.0 807($$) :: Monad m => Source m a -> Sink a m b -> m b 808src $$ sink = do 809 (rsrc, res) <- src $$+ sink 810 rsrc $$+- return () 811 return res 812{-# INLINE [1] ($$) #-} 813{-# DEPRECATED ($$) "Use runConduit and .|" #-} 814 815-- | A synonym for '=$=' for backwards compatibility. 816-- 817-- Since 0.4.0 818($=) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r 819($=) = (=$=) 820{-# INLINE [0] ($=) #-} 821{-# RULES "conduit: $= is =$=" ($=) = (=$=) #-} 822{-# DEPRECATED ($=) "Use .|" #-} 823 824-- | A synonym for '=$=' for backwards compatibility. 825-- 826-- Since 0.4.0 827(=$) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r 828(=$) = (=$=) 829{-# INLINE [0] (=$) #-} 830{-# RULES "conduit: =$ is =$=" (=$) = (=$=) #-} 831{-# DEPRECATED (=$) "Use .|" #-} 832 833-- | Deprecated fusion operator. 834-- 835-- Since 0.4.0 836(=$=) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r 837ConduitT left0 =$= ConduitT right0 = ConduitT $ \rest -> 838 let goRight left right = 839 case right of 840 HaveOutput p o -> HaveOutput (recurse p) o 841 NeedInput rp rc -> goLeft rp rc left 842 Done r2 -> rest r2 843 PipeM mp -> PipeM (liftM recurse mp) 844 Leftover right' i -> goRight (HaveOutput left i) right' 845 where 846 recurse = goRight left 847 848 goLeft rp rc left = 849 case left of 850 HaveOutput left' o -> goRight left' (rp o) 851 NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) 852 Done r1 -> goRight (Done r1) (rc r1) 853 PipeM mp -> PipeM (liftM recurse mp) 854 Leftover left' i -> Leftover (recurse left') i 855 where 856 recurse = goLeft rp rc 857 in goRight (left0 Done) (right0 Done) 858{-# INLINE [1] (=$=) #-} 859{-# DEPRECATED (=$=) "Use .|" #-} 860 861-- | Wait for a single input value from upstream. If no data is available, 862-- returns @Nothing@. Once @await@ returns @Nothing@, subsequent calls will 863-- also return @Nothing@. 864-- 865-- Since 0.5.0 866await :: Monad m => Consumer i m (Maybe i) 867await = ConduitT $ \f -> NeedInput (f . Just) (const $ f Nothing) 868{-# INLINE [0] await #-} 869 870await' :: Monad m 871 => ConduitT i o m r 872 -> (i -> ConduitT i o m r) 873 -> ConduitT i o m r 874await' f g = ConduitT $ \rest -> NeedInput 875 (\i -> unConduitT (g i) rest) 876 (const $ unConduitT f rest) 877{-# INLINE await' #-} 878{-# RULES "conduit: await >>= maybe" forall x y. await >>= maybe x y = await' x y #-} 879 880-- | Send a value downstream to the next component to consume. If the 881-- downstream component terminates, this call will never return control. 882-- 883-- Since 0.5.0 884yield :: Monad m 885 => o -- ^ output value 886 -> ConduitT i o m () 887yield o = ConduitT $ \rest -> HaveOutput (rest ()) o 888{-# INLINE yield #-} 889 890-- | Send a monadic value downstream for the next component to consume. 891-- 892-- @since 1.2.7 893yieldM :: Monad m => m o -> ConduitT i o m () 894yieldM mo = lift mo >>= yield 895{-# INLINE yieldM #-} 896 897 -- FIXME rule won't fire, see FIXME in .Pipe; "mapM_ yield" mapM_ yield = ConduitT . sourceList 898 899-- | Provide a single piece of leftover input to be consumed by the next 900-- component in the current monadic binding. 901-- 902-- /Note/: it is highly encouraged to only return leftover values from input 903-- already consumed from upstream. 904-- 905-- @since 0.5.0 906leftover :: i -> ConduitT i o m () 907leftover i = ConduitT $ \rest -> Leftover (rest ()) i 908{-# INLINE leftover #-} 909 910-- | Run a pipeline until processing completes. 911-- 912-- Since 1.2.1 913runConduit :: Monad m => ConduitT () Void m r -> m r 914runConduit (ConduitT p) = runPipe $ injectLeftovers $ p Done 915{-# INLINE [0] runConduit #-} 916 917-- | Bracket a conduit computation between allocation and release of a 918-- resource. Two guarantees are given about resource finalization: 919-- 920-- 1. It will be /prompt/. The finalization will be run as early as possible. 921-- 922-- 2. It is exception safe. Due to usage of @resourcet@, the finalization will 923-- be run in the event of any exceptions. 924-- 925-- Since 0.5.0 926bracketP :: MonadResource m 927 928 => IO a 929 -- ^ computation to run first (\"acquire resource\") 930 -> (a -> IO ()) 931 -- ^ computation to run last (\"release resource\") 932 -> (a -> ConduitT i o m r) 933 -- ^ computation to run in-between 934 -> ConduitT i o m r 935 -- returns the value from the in-between computation 936bracketP alloc free inside = ConduitT $ \rest -> do 937 (key, seed) <- allocate alloc free 938 unConduitT (inside seed) $ \res -> do 939 release key 940 rest res 941 942-- | Wait for input forever, calling the given inner component for each piece of 943-- new input. 944-- 945-- This function is provided as a convenience for the common pattern of 946-- @await@ing input, checking if it's @Just@ and then looping. 947-- 948-- Since 0.5.0 949awaitForever :: Monad m => (i -> ConduitT i o m r) -> ConduitT i o m () 950awaitForever f = ConduitT $ \rest -> 951 let go = NeedInput (\i -> unConduitT (f i) (const go)) rest 952 in go 953 954-- | Transform the monad that a @ConduitT@ lives in. 955-- 956-- Note that the monad transforming function will be run multiple times, 957-- resulting in unintuitive behavior in some cases. For a fuller treatment, 958-- please see: 959-- 960-- <https://github.com/snoyberg/conduit/wiki/Dealing-with-monad-transformers> 961-- 962-- Since 0.4.0 963transPipe :: Monad m => (forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r 964transPipe f (ConduitT c0) = ConduitT $ \rest -> let 965 go (HaveOutput p o) = HaveOutput (go p) o 966 go (NeedInput p c) = NeedInput (go . p) (go . c) 967 go (Done r) = rest r 968 go (PipeM mp) = 969 PipeM (f $ liftM go $ collapse mp) 970 where 971 -- Combine a series of monadic actions into a single action. Since we 972 -- throw away side effects between different actions, an arbitrary break 973 -- between actions will lead to a violation of the monad transformer laws. 974 -- Example available at: 975 -- 976 -- http://hpaste.org/75520 977 collapse mpipe = do 978 pipe' <- mpipe 979 case pipe' of 980 PipeM mpipe' -> collapse mpipe' 981 _ -> return pipe' 982 go (Leftover p i) = Leftover (go p) i 983 in go (c0 Done) 984 985-- | Apply a function to all the output values of a @ConduitT@. 986-- 987-- This mimics the behavior of `fmap` for a `Source` and `Conduit` in pre-0.4 988-- days. It can also be simulated by fusing with the @map@ conduit from 989-- "Data.Conduit.List". 990-- 991-- Since 0.4.1 992mapOutput :: Monad m => (o1 -> o2) -> ConduitT i o1 m r -> ConduitT i o2 m r 993mapOutput f (ConduitT c0) = ConduitT $ \rest -> let 994 go (HaveOutput p o) = HaveOutput (go p) (f o) 995 go (NeedInput p c) = NeedInput (go . p) (go . c) 996 go (Done r) = rest r 997 go (PipeM mp) = PipeM (liftM (go) mp) 998 go (Leftover p i) = Leftover (go p) i 999 in go (c0 Done) 1000 1001-- | Same as 'mapOutput', but use a function that returns @Maybe@ values. 1002-- 1003-- Since 0.5.0 1004mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> ConduitT i o1 m r -> ConduitT i o2 m r 1005mapOutputMaybe f (ConduitT c0) = ConduitT $ \rest -> let 1006 go (HaveOutput p o) = maybe id (\o' p' -> HaveOutput p' o') (f o) (go p) 1007 go (NeedInput p c) = NeedInput (go . p) (go . c) 1008 go (Done r) = rest r 1009 go (PipeM mp) = PipeM (liftM (go) mp) 1010 go (Leftover p i) = Leftover (go p) i 1011 in go (c0 Done) 1012 1013-- | Apply a function to all the input values of a @ConduitT@. 1014-- 1015-- Since 0.5.0 1016mapInput :: Monad m 1017 => (i1 -> i2) -- ^ map initial input to new input 1018 -> (i2 -> Maybe i1) -- ^ map new leftovers to initial leftovers 1019 -> ConduitT i2 o m r 1020 -> ConduitT i1 o m r 1021mapInput f f' (ConduitT c0) = ConduitT $ \rest -> let 1022 go (HaveOutput p o) = HaveOutput (go p) o 1023 go (NeedInput p c) = NeedInput (go . p . f) (go . c) 1024 go (Done r) = rest r 1025 go (PipeM mp) = PipeM $ liftM go mp 1026 go (Leftover p i) = maybe id (flip Leftover) (f' i) (go p) 1027 in go (c0 Done) 1028 1029-- | Apply a monadic action to all the input values of a @ConduitT@. 1030-- 1031-- Since 1.3.2 1032mapInputM :: Monad m 1033 => (i1 -> m i2) -- ^ map initial input to new input 1034 -> (i2 -> m (Maybe i1)) -- ^ map new leftovers to initial leftovers 1035 -> ConduitT i2 o m r 1036 -> ConduitT i1 o m r 1037mapInputM f f' (ConduitT c0) = ConduitT $ \rest -> let 1038 go (HaveOutput p o) = HaveOutput (go p) o 1039 go (NeedInput p c) = NeedInput (\i -> PipeM $ go . p <$> f i) (go . c) 1040 go (Done r) = rest r 1041 go (PipeM mp) = PipeM $ fmap go mp 1042 go (Leftover p i) = PipeM $ (\x -> maybe id (flip Leftover) x (go p)) <$> f' i 1043 in go (c0 Done) 1044 1045-- | The connect-and-resume operator. This does not close the @Source@, but 1046-- instead returns it to be used again. This allows a @Source@ to be used 1047-- incrementally in a large program, without forcing the entire program to live 1048-- in the @Sink@ monad. 1049-- 1050-- Mnemonic: connect + do more. 1051-- 1052-- Since 0.5.0 1053($$+) :: Monad m => Source m a -> Sink a m b -> m (SealedConduitT () a m (), b) 1054src $$+ sink = connectResume (sealConduitT src) sink 1055{-# INLINE ($$+) #-} 1056 1057-- | Continue processing after usage of @$$+@. 1058-- 1059-- Since 0.5.0 1060($$++) :: Monad m => SealedConduitT () a m () -> Sink a m b -> m (SealedConduitT () a m (), b) 1061($$++) = connectResume 1062{-# INLINE ($$++) #-} 1063 1064-- | Same as @$$++@ and @connectResume@, but doesn't include the 1065-- updated @SealedConduitT@. 1066-- 1067-- /NOTE/ In previous versions, this would cause finalizers to 1068-- run. Since version 1.3.0, there are no finalizers in conduit. 1069-- 1070-- Since 0.5.0 1071($$+-) :: Monad m => SealedConduitT () a m () -> Sink a m b -> m b 1072rsrc $$+- sink = do 1073 (_, res) <- connectResume rsrc sink 1074 return res 1075{-# INLINE ($$+-) #-} 1076 1077-- | Left fusion for a sealed source. 1078-- 1079-- Since 1.0.16 1080($=+) :: Monad m => SealedConduitT () a m () -> Conduit a m b -> SealedConduitT () b m () 1081SealedConduitT src $=+ ConduitT sink = SealedConduitT (src `pipeL` sink Done) 1082 1083-- | Provide for a stream of data that can be flushed. 1084-- 1085-- A number of @Conduit@s (e.g., zlib compression) need the ability to flush 1086-- the stream at some point. This provides a single wrapper datatype to be used 1087-- in all such circumstances. 1088-- 1089-- Since 0.3.0 1090data Flush a = Chunk a | Flush 1091 deriving (Show, Eq, Ord) 1092instance Functor Flush where 1093 fmap _ Flush = Flush 1094 fmap f (Chunk a) = Chunk (f a) 1095 1096-- | A wrapper for defining an 'Applicative' instance for 'Source's which allows 1097-- to combine sources together, generalizing 'zipSources'. A combined source 1098-- will take input yielded from each of its @Source@s until any of them stop 1099-- producing output. 1100-- 1101-- Since 1.0.13 1102newtype ZipSource m o = ZipSource { getZipSource :: Source m o } 1103 1104instance Monad m => Functor (ZipSource m) where 1105 fmap f = ZipSource . mapOutput f . getZipSource 1106instance Monad m => Applicative (ZipSource m) where 1107 pure = ZipSource . forever . yield 1108 (ZipSource f) <*> (ZipSource x) = ZipSource $ zipSourcesApp f x 1109 1110-- | Coalesce all values yielded by all of the @Source@s. 1111-- 1112-- Implemented on top of @ZipSource@ and as such, it exhibits the same 1113-- short-circuiting behavior as @ZipSource@. See that data type for more 1114-- details. If you want to create a source that yields *all* values from 1115-- multiple sources, use `sequence_`. 1116-- 1117-- Since 1.0.13 1118sequenceSources :: (Traversable f, Monad m) => f (Source m o) -> Source m (f o) 1119sequenceSources = getZipSource . sequenceA . fmap ZipSource 1120 1121-- | A wrapper for defining an 'Applicative' instance for 'Sink's which allows 1122-- to combine sinks together, generalizing 'zipSinks'. A combined sink 1123-- distributes the input to all its participants and when all finish, produces 1124-- the result. This allows to define functions like 1125-- 1126-- @ 1127-- sequenceSinks :: (Monad m) 1128-- => [Sink i m r] -> Sink i m [r] 1129-- sequenceSinks = getZipSink . sequenceA . fmap ZipSink 1130-- @ 1131-- 1132-- Note that the standard 'Applicative' instance for conduits works 1133-- differently. It feeds one sink with input until it finishes, then switches 1134-- to another, etc., and at the end combines their results. 1135-- 1136-- This newtype is in fact a type constrained version of 'ZipConduit', and has 1137-- the same behavior. It's presented as a separate type since (1) it 1138-- historically predates @ZipConduit@, and (2) the type constraining can make 1139-- your code clearer (and thereby make your error messages more easily 1140-- understood). 1141-- 1142-- Since 1.0.13 1143newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r } 1144 1145instance Monad m => Functor (ZipSink i m) where 1146 fmap f (ZipSink x) = ZipSink (liftM f x) 1147instance Monad m => Applicative (ZipSink i m) where 1148 pure = ZipSink . return 1149 (ZipSink f) <*> (ZipSink x) = 1150 ZipSink $ liftM (uncurry ($)) $ zipSinks f x 1151 1152-- | Send incoming values to all of the @Sink@ providing, and ultimately 1153-- coalesce together all return values. 1154-- 1155-- Implemented on top of @ZipSink@, see that data type for more details. 1156-- 1157-- Since 1.0.13 1158sequenceSinks :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r) 1159sequenceSinks = getZipSink . sequenceA . fmap ZipSink 1160 1161-- | The connect-and-resume operator. This does not close the @Conduit@, but 1162-- instead returns it to be used again. This allows a @Conduit@ to be used 1163-- incrementally in a large program, without forcing the entire program to live 1164-- in the @Sink@ monad. 1165-- 1166-- Leftover data returned from the @Sink@ will be discarded. 1167-- 1168-- Mnemonic: connect + do more. 1169-- 1170-- Since 1.0.17 1171(=$$+) :: Monad m 1172 => ConduitT a b m () 1173 -> ConduitT b Void m r 1174 -> ConduitT a Void m (SealedConduitT a b m (), r) 1175(=$$+) conduit = connectResumeConduit (sealConduitT conduit) 1176{-# INLINE (=$$+) #-} 1177 1178-- | Continue processing after usage of '=$$+'. Connect a 'SealedConduitT' to 1179-- a sink and return the output of the sink together with a new 1180-- 'SealedConduitT'. 1181-- 1182-- Since 1.0.17 1183(=$$++) :: Monad m => SealedConduitT i o m () -> ConduitT o Void m r -> ConduitT i Void m (SealedConduitT i o m (), r) 1184(=$$++) = connectResumeConduit 1185{-# INLINE (=$$++) #-} 1186 1187-- | Same as @=$$++@, but doesn't include the updated 1188-- @SealedConduitT@. 1189-- 1190-- /NOTE/ In previous versions, this would cause finalizers to 1191-- run. Since version 1.3.0, there are no finalizers in conduit. 1192-- 1193-- Since 1.0.17 1194(=$$+-) :: Monad m => SealedConduitT i o m () -> ConduitT o Void m r -> ConduitT i Void m r 1195rsrc =$$+- sink = do 1196 (_, res) <- connectResumeConduit rsrc sink 1197 return res 1198{-# INLINE (=$$+-) #-} 1199 1200 1201infixr 0 =$$+ 1202infixr 0 =$$++ 1203infixr 0 =$$+- 1204 1205-- | Provides an alternative @Applicative@ instance for @ConduitT@. In this instance, 1206-- every incoming value is provided to all @ConduitT@s, and output is coalesced together. 1207-- Leftovers from individual @ConduitT@s will be used within that component, and then discarded 1208-- at the end of their computation. Output and finalizers will both be handled in a left-biased manner. 1209-- 1210-- As an example, take the following program: 1211-- 1212-- @ 1213-- main :: IO () 1214-- main = do 1215-- let src = mapM_ yield [1..3 :: Int] 1216-- conduit1 = CL.map (+1) 1217-- conduit2 = CL.concatMap (replicate 2) 1218-- conduit = getZipConduit $ ZipConduit conduit1 <* ZipConduit conduit2 1219-- sink = CL.mapM_ print 1220-- src $$ conduit =$ sink 1221-- @ 1222-- 1223-- It will produce the output: 2, 1, 1, 3, 2, 2, 4, 3, 3 1224-- 1225-- Since 1.0.17 1226newtype ZipConduit i o m r = ZipConduit { getZipConduit :: ConduitT i o m r } 1227 deriving Functor 1228instance Monad m => Applicative (ZipConduit i o m) where 1229 pure = ZipConduit . pure 1230 ZipConduit left <*> ZipConduit right = ZipConduit (zipConduitApp left right) 1231 1232-- | Provide identical input to all of the @Conduit@s and combine their outputs 1233-- into a single stream. 1234-- 1235-- Implemented on top of @ZipConduit@, see that data type for more details. 1236-- 1237-- Since 1.0.17 1238sequenceConduits :: (Traversable f, Monad m) => f (ConduitT i o m r) -> ConduitT i o m (f r) 1239sequenceConduits = getZipConduit . sequenceA . fmap ZipConduit 1240 1241-- | Fuse two @ConduitT@s together, and provide the return value of both. Note 1242-- that this will force the entire upstream @ConduitT@ to be run to produce the 1243-- result value, even if the downstream terminates early. 1244-- 1245-- Since 1.1.5 1246fuseBoth :: Monad m => ConduitT a b m r1 -> ConduitT b c m r2 -> ConduitT a c m (r1, r2) 1247fuseBoth (ConduitT up) (ConduitT down) = 1248 ConduitT (pipeL (up Done) (withUpstream $ generalizeUpstream $ down Done) >>=) 1249{-# INLINE fuseBoth #-} 1250 1251-- | Like 'fuseBoth', but does not force consumption of the @Producer@. 1252-- In the case that the @Producer@ terminates, the result value is 1253-- provided as a @Just@ value. If it does not terminate, then a 1254-- @Nothing@ value is returned. 1255-- 1256-- One thing to note here is that "termination" here only occurs if the 1257-- @Producer@ actually yields a @Nothing@ value. For example, with the 1258-- @Producer@ @mapM_ yield [1..5]@, if five values are requested, the 1259-- @Producer@ has not yet terminated. Termination only occurs when the 1260-- sixth value is awaited for and the @Producer@ signals termination. 1261-- 1262-- Since 1.2.4 1263fuseBothMaybe 1264 :: Monad m 1265 => ConduitT a b m r1 1266 -> ConduitT b c m r2 1267 -> ConduitT a c m (Maybe r1, r2) 1268fuseBothMaybe (ConduitT up) (ConduitT down) = 1269 ConduitT (pipeL (up Done) (go Nothing $ down Done) >>=) 1270 where 1271 go mup (Done r) = Done (mup, r) 1272 go mup (PipeM mp) = PipeM $ liftM (go mup) mp 1273 go mup (HaveOutput p o) = HaveOutput (go mup p) o 1274 go _ (NeedInput p c) = NeedInput 1275 (\i -> go Nothing (p i)) 1276 (\u -> go (Just u) (c ())) 1277 go mup (Leftover p i) = Leftover (go mup p) i 1278{-# INLINABLE fuseBothMaybe #-} 1279 1280-- | Same as @fuseBoth@, but ignore the return value from the downstream 1281-- @Conduit@. Same caveats of forced consumption apply. 1282-- 1283-- Since 1.1.5 1284fuseUpstream :: Monad m => ConduitT a b m r -> Conduit b m c -> ConduitT a c m r 1285fuseUpstream up down = fmap fst (fuseBoth up down) 1286{-# INLINE fuseUpstream #-} 1287 1288-- Rewrite rules 1289 1290{- FIXME 1291{-# RULES "conduit: ConduitT: lift x >>= f" forall m f. lift m >>= f = ConduitT (PipeM (liftM (unConduitT . f) m)) #-} 1292{-# RULES "conduit: ConduitT: lift x >> f" forall m f. lift m >> f = ConduitT (PipeM (liftM (\_ -> unConduitT f) m)) #-} 1293 1294{-# RULES "conduit: ConduitT: liftIO x >>= f" forall m (f :: MonadIO m => a -> ConduitT i o m r). liftIO m >>= f = ConduitT (PipeM (liftM (unConduitT . f) (liftIO m))) #-} 1295{-# RULES "conduit: ConduitT: liftIO x >> f" forall m (f :: MonadIO m => ConduitT i o m r). liftIO m >> f = ConduitT (PipeM (liftM (\_ -> unConduitT f) (liftIO m))) #-} 1296 1297{-# RULES "conduit: ConduitT: liftBase x >>= f" forall m (f :: MonadBase b m => a -> ConduitT i o m r). liftBase m >>= f = ConduitT (PipeM (liftM (unConduitT . f) (liftBase m))) #-} 1298{-# RULES "conduit: ConduitT: liftBase x >> f" forall m (f :: MonadBase b m => ConduitT i o m r). liftBase m >> f = ConduitT (PipeM (liftM (\_ -> unConduitT f) (liftBase m))) #-} 1299 1300{-# RULES 1301 "yield o >> p" forall o (p :: ConduitT i o m r). yield o >> p = ConduitT (HaveOutput (unConduitT p) o) 1302 ; "when yield next" forall b o p. when b (yield o) >> p = 1303 if b then ConduitT (HaveOutput (unConduitT p) o) else p 1304 ; "unless yield next" forall b o p. unless b (yield o) >> p = 1305 if b then p else ConduitT (HaveOutput (unConduitT p) o) 1306 ; "lift m >>= yield" forall m. lift m >>= yield = yieldM m 1307 #-} 1308{-# RULES "conduit: leftover l >> p" forall l (p :: ConduitT i o m r). leftover l >> p = 1309 ConduitT (Leftover (unConduitT p) l) #-} 1310 -} 1311 1312-- | Run a pure pipeline until processing completes, i.e. a pipeline 1313-- with @Identity@ as the base monad. This is equivalient to 1314-- @runIdentity . runConduit@. 1315-- 1316-- @since 1.2.8 1317runConduitPure :: ConduitT () Void Identity r -> r 1318runConduitPure = runIdentity . runConduit 1319{-# INLINE runConduitPure #-} 1320 1321-- | Run a pipeline which acquires resources with @ResourceT@, and 1322-- then run the @ResourceT@ transformer. This is equivalent to 1323-- @runResourceT . runConduit@. 1324-- 1325-- @since 1.2.8 1326runConduitRes :: MonadUnliftIO m 1327 => ConduitT () Void (ResourceT m) r 1328 -> m r 1329runConduitRes = runResourceT . runConduit 1330{-# INLINE runConduitRes #-} 1331