1{-# LANGUAGE ExistentialQuantification #-} 2{-# LANGUAGE RankNTypes #-} 3{-# LANGUAGE BangPatterns #-} 4{-# LANGUAGE DeriveFunctor #-} 5{-# LANGUAGE Trustworthy #-} 6{-# LANGUAGE ScopedTypeVariables #-} 7module Data.Conduit.Internal.Fusion 8 ( -- ** Types 9 Step (..) 10 , Stream (..) 11 , ConduitWithStream 12 , StreamConduitT 13 , StreamConduit 14 , StreamSource 15 , StreamProducer 16 , StreamSink 17 , StreamConsumer 18 -- ** Functions 19 , streamConduit 20 , streamSource 21 , streamSourcePure 22 , unstream 23 ) where 24 25import Data.Conduit.Internal.Conduit 26import Data.Conduit.Internal.Pipe (Pipe (..)) 27import Data.Functor.Identity (Identity (runIdentity)) 28import Data.Void (Void, absurd) 29import Control.Monad.Trans.Resource (runResourceT) 30 31-- | This is the same as stream fusion\'s Step. Constructors are renamed to 32-- avoid confusion with conduit names. 33data Step s o r 34 = Emit s o 35 | Skip s 36 | Stop r 37 deriving Functor 38 39data Stream m o r = forall s. Stream 40 (s -> m (Step s o r)) 41 (m s) 42 43data ConduitWithStream i o m r = ConduitWithStream 44 (ConduitT i o m r) 45 (StreamConduitT i o m r) 46 47type StreamConduitT i o m r = Stream m i () -> Stream m o r 48 49type StreamConduit i m o = StreamConduitT i o m () 50 51type StreamSource m o = StreamConduitT () o m () 52 53type StreamProducer m o = forall i. StreamConduitT i o m () 54 55type StreamSink i m r = StreamConduitT i Void m r 56 57type StreamConsumer i m r = forall o. StreamConduitT i o m r 58 59unstream :: ConduitWithStream i o m r -> ConduitT i o m r 60unstream (ConduitWithStream c _) = c 61{-# INLINE [0] unstream #-} 62 63fuseStream :: Monad m 64 => ConduitWithStream a b m () 65 -> ConduitWithStream b c m r 66 -> ConduitWithStream a c m r 67fuseStream (ConduitWithStream a x) (ConduitWithStream b y) = 68 ConduitWithStream (a .| b) (y . x) 69{-# INLINE fuseStream #-} 70 71{-# RULES "conduit: fuseStream (.|)" forall left right. 72 unstream left .| unstream right = unstream (fuseStream left right) 73 #-} 74{-# RULES "conduit: fuseStream (fuse)" forall left right. 75 fuse (unstream left) (unstream right) = unstream (fuseStream left right) 76 #-} 77{-# RULES "conduit: fuseStream (=$=)" forall left right. 78 unstream left =$= unstream right = unstream (fuseStream left right) 79 #-} 80 81runStream :: Monad m 82 => ConduitWithStream () Void m r 83 -> m r 84runStream (ConduitWithStream _ f) = 85 run $ f $ Stream emptyStep (return ()) 86 where 87 emptyStep _ = return $ Stop () 88 run (Stream step ms0) = 89 ms0 >>= loop 90 where 91 loop s = do 92 res <- step s 93 case res of 94 Stop r -> return r 95 Skip s' -> loop s' 96 Emit _ o -> absurd o 97{-# INLINE runStream #-} 98 99{-# RULES "conduit: runStream" forall stream. 100 runConduit (unstream stream) = runStream stream 101 #-} 102{-# RULES "conduit: runStream (pure)" forall stream. 103 runConduitPure (unstream stream) = runIdentity (runStream stream) 104 #-} 105{-# RULES "conduit: runStream (ResourceT)" forall stream. 106 runConduitRes (unstream stream) = runResourceT (runStream stream) 107 #-} 108 109connectStream :: Monad m 110 => ConduitWithStream () i m () 111 -> ConduitWithStream i Void m r 112 -> m r 113connectStream (ConduitWithStream _ stream) (ConduitWithStream _ f) = 114 run $ f $ stream $ Stream emptyStep (return ()) 115 where 116 emptyStep _ = return $ Stop () 117 run (Stream step ms0) = 118 ms0 >>= loop 119 where 120 loop s = do 121 res <- step s 122 case res of 123 Stop r -> return r 124 Skip s' -> loop s' 125 Emit _ o -> absurd o 126{-# INLINE connectStream #-} 127 128{-# RULES "conduit: connectStream ($$)" forall left right. 129 unstream left $$ unstream right = connectStream left right 130 #-} 131 132connectStream1 :: Monad m 133 => ConduitWithStream () i m () 134 -> ConduitT i Void m r 135 -> m r 136connectStream1 (ConduitWithStream _ fstream) (ConduitT sink0) = 137 case fstream $ Stream (const $ return $ Stop ()) (return ()) of 138 Stream step ms0 -> 139 let loop _ (Done r) _ = return r 140 loop ls (PipeM mp) s = mp >>= flip (loop ls) s 141 loop ls (Leftover p l) s = loop (l:ls) p s 142 loop _ (HaveOutput _ o) _ = absurd o 143 loop (l:ls) (NeedInput p _) s = loop ls (p l) s 144 loop [] (NeedInput p c) s = do 145 res <- step s 146 case res of 147 Stop () -> loop [] (c ()) s 148 Skip s' -> loop [] (NeedInput p c) s' 149 Emit s' i -> loop [] (p i) s' 150 in ms0 >>= loop [] (sink0 Done) 151{-# INLINE connectStream1 #-} 152 153{-# RULES "conduit: connectStream1 ($$)" forall left right. 154 unstream left $$ right = connectStream1 left right 155 #-} 156 157{-# RULES "conduit: connectStream1 (runConduit/.|)" forall left right. 158 runConduit (unstream left .| right) = connectStream1 left right 159 #-} 160{-# RULES "conduit: connectStream1 (runConduit/=$=)" forall left right. 161 runConduit (unstream left =$= right) = connectStream1 left right 162 #-} 163{-# RULES "conduit: connectStream1 (runConduit/fuse)" forall left right. 164 runConduit (fuse (unstream left) right) = connectStream1 left right 165 #-} 166 167{-# RULES "conduit: connectStream1 (runConduitPure/.|)" forall left right. 168 runConduitPure (unstream left .| right) = runIdentity (connectStream1 left right) 169 #-} 170{-# RULES "conduit: connectStream1 (runConduitPure/=$=)" forall left right. 171 runConduitPure (unstream left =$= right) = runIdentity (connectStream1 left right) 172 #-} 173{-# RULES "conduit: connectStream1 (runConduitPure/fuse)" forall left right. 174 runConduitPure (fuse (unstream left) right) = runIdentity (connectStream1 left right) 175 #-} 176 177{-# RULES "conduit: connectStream1 (runConduitRes/.|)" forall left right. 178 runConduitRes (unstream left .| right) = runResourceT (connectStream1 left right) 179 #-} 180{-# RULES "conduit: connectStream1 (runConduitRes/=$=)" forall left right. 181 runConduitRes (unstream left =$= right) = runResourceT (connectStream1 left right) 182 #-} 183{-# RULES "conduit: connectStream1 (runConduitRes/fuse)" forall left right. 184 runConduitRes (fuse (unstream left) right) = runResourceT (connectStream1 left right) 185 #-} 186 187connectStream2 :: forall i m r. Monad m 188 => ConduitT () i m () 189 -> ConduitWithStream i Void m r 190 -> m r 191connectStream2 (ConduitT src0) (ConduitWithStream _ fstream) = 192 run $ fstream $ Stream step' $ return (src0 Done) 193 where 194 step' :: Pipe () () i () m () -> m (Step (Pipe () () i () m ()) i ()) 195 step' (Done ()) = return $ Stop () 196 step' (HaveOutput pipe o) = return $ Emit pipe o 197 step' (NeedInput _ c) = return $ Skip $ c () 198 step' (PipeM mp) = Skip <$> mp 199 step' (Leftover p ()) = return $ Skip p 200 {-# INLINE step' #-} 201 202 run (Stream step ms0) = 203 ms0 >>= loop 204 where 205 loop s = do 206 res <- step s 207 case res of 208 Stop r -> return r 209 Emit _ o -> absurd o 210 Skip s' -> loop s' 211{-# INLINE connectStream2 #-} 212 213{-# RULES "conduit: connectStream2 ($$)" forall left right. 214 left $$ unstream right = connectStream2 left right 215 #-} 216 217{-# RULES "conduit: connectStream2 (runConduit/.|)" forall left right. 218 runConduit (left .| unstream right) = connectStream2 left right 219 #-} 220{-# RULES "conduit: connectStream2 (runConduit/fuse)" forall left right. 221 runConduit (fuse left (unstream right)) = connectStream2 left right 222 #-} 223{-# RULES "conduit: connectStream2 (runConduit/=$=)" forall left right. 224 runConduit (left =$= unstream right) = connectStream2 left right 225 #-} 226 227{-# RULES "conduit: connectStream2 (runConduitPure/.|)" forall left right. 228 runConduitPure (left .| unstream right) = runIdentity (connectStream2 left right) 229 #-} 230{-# RULES "conduit: connectStream2 (runConduitPure/fuse)" forall left right. 231 runConduitPure (fuse left (unstream right)) = runIdentity (connectStream2 left right) 232 #-} 233{-# RULES "conduit: connectStream2 (runConduitPure/=$=)" forall left right. 234 runConduitPure (left =$= unstream right) = runIdentity (connectStream2 left right) 235 #-} 236 237{-# RULES "conduit: connectStream2 (runConduitRes/.|)" forall left right. 238 runConduitRes (left .| unstream right) = runResourceT (connectStream2 left right) 239 #-} 240{-# RULES "conduit: connectStream2 (runConduitRes/fuse)" forall left right. 241 runConduitRes (fuse left (unstream right)) = runResourceT (connectStream2 left right) 242 #-} 243{-# RULES "conduit: connectStream2 (runConduitRes/=$=)" forall left right. 244 runConduitRes (left =$= unstream right) = runResourceT (connectStream2 left right) 245 #-} 246 247streamConduit :: ConduitT i o m r 248 -> (Stream m i () -> Stream m o r) 249 -> ConduitWithStream i o m r 250streamConduit = ConduitWithStream 251{-# INLINE CONLIKE streamConduit #-} 252 253streamSource 254 :: Monad m 255 => Stream m o () 256 -> ConduitWithStream i o m () 257streamSource str@(Stream step ms0) = 258 ConduitWithStream con (const str) 259 where 260 con = ConduitT $ \rest -> PipeM $ do 261 s0 <- ms0 262 let loop s = do 263 res <- step s 264 case res of 265 Stop () -> return $ rest () 266 Emit s' o -> return $ HaveOutput (PipeM $ loop s') o 267 Skip s' -> loop s' 268 loop s0 269{-# INLINE streamSource #-} 270 271streamSourcePure 272 :: Monad m 273 => Stream Identity o () 274 -> ConduitWithStream i o m () 275streamSourcePure (Stream step ms0) = 276 ConduitWithStream con (const $ Stream (return . runIdentity . step) (return s0)) 277 where 278 s0 = runIdentity ms0 279 con = ConduitT $ \rest -> 280 let loop s = 281 case runIdentity $ step s of 282 Stop () -> rest () 283 Emit s' o -> HaveOutput (loop s') o 284 Skip s' -> loop s' 285 in loop s0 286{-# INLINE streamSourcePure #-} 287