1{-# LANGUAGE ExistentialQuantification #-}
2{-# LANGUAGE RankNTypes #-}
3{-# LANGUAGE BangPatterns #-}
4{-# LANGUAGE DeriveFunctor #-}
5{-# LANGUAGE Trustworthy #-}
6{-# LANGUAGE ScopedTypeVariables #-}
7module Data.Conduit.Internal.Fusion
8    ( -- ** Types
9      Step (..)
10    , Stream (..)
11    , ConduitWithStream
12    , StreamConduitT
13    , StreamConduit
14    , StreamSource
15    , StreamProducer
16    , StreamSink
17    , StreamConsumer
18      -- ** Functions
19    , streamConduit
20    , streamSource
21    , streamSourcePure
22    , unstream
23    ) where
24
25import Data.Conduit.Internal.Conduit
26import Data.Conduit.Internal.Pipe (Pipe (..))
27import Data.Functor.Identity (Identity (runIdentity))
28import Data.Void (Void, absurd)
29import Control.Monad.Trans.Resource (runResourceT)
30
31-- | This is the same as stream fusion\'s Step. Constructors are renamed to
32-- avoid confusion with conduit names.
33data Step s o r
34    = Emit s o
35    | Skip s
36    | Stop r
37    deriving Functor
38
39data Stream m o r = forall s. Stream
40    (s -> m (Step s o r))
41    (m s)
42
43data ConduitWithStream i o m r = ConduitWithStream
44    (ConduitT i o m r)
45    (StreamConduitT i o m r)
46
47type StreamConduitT i o m r = Stream m i () -> Stream m o r
48
49type StreamConduit i m o = StreamConduitT i o m ()
50
51type StreamSource m o = StreamConduitT () o m ()
52
53type StreamProducer m o = forall i. StreamConduitT i o m ()
54
55type StreamSink i m r = StreamConduitT i Void m r
56
57type StreamConsumer i m r = forall o. StreamConduitT i o m r
58
59unstream :: ConduitWithStream i o m r -> ConduitT i o m r
60unstream (ConduitWithStream c _) = c
61{-# INLINE [0] unstream #-}
62
63fuseStream :: Monad m
64           => ConduitWithStream a b m ()
65           -> ConduitWithStream b c m r
66           -> ConduitWithStream a c m r
67fuseStream (ConduitWithStream a x) (ConduitWithStream b y) =
68  ConduitWithStream (a .| b) (y . x)
69{-# INLINE fuseStream #-}
70
71{-# RULES "conduit: fuseStream (.|)" forall left right.
72        unstream left .| unstream right = unstream (fuseStream left right)
73  #-}
74{-# RULES "conduit: fuseStream (fuse)" forall left right.
75        fuse (unstream left) (unstream right) = unstream (fuseStream left right)
76  #-}
77{-# RULES "conduit: fuseStream (=$=)" forall left right.
78        unstream left =$= unstream right = unstream (fuseStream left right)
79  #-}
80
81runStream :: Monad m
82          => ConduitWithStream () Void m r
83          -> m r
84runStream (ConduitWithStream _ f) =
85    run $ f $ Stream emptyStep (return ())
86  where
87    emptyStep _ = return $ Stop ()
88    run (Stream step ms0) =
89        ms0 >>= loop
90      where
91        loop s = do
92            res <- step s
93            case res of
94                Stop r -> return r
95                Skip s' -> loop s'
96                Emit _ o -> absurd o
97{-# INLINE runStream #-}
98
99{-# RULES "conduit: runStream" forall stream.
100        runConduit (unstream stream) = runStream stream
101  #-}
102{-# RULES "conduit: runStream (pure)" forall stream.
103        runConduitPure (unstream stream) = runIdentity (runStream stream)
104  #-}
105{-# RULES "conduit: runStream (ResourceT)" forall stream.
106        runConduitRes (unstream stream) = runResourceT (runStream stream)
107  #-}
108
109connectStream :: Monad m
110              => ConduitWithStream () i    m ()
111              -> ConduitWithStream i  Void m r
112              -> m r
113connectStream (ConduitWithStream _ stream) (ConduitWithStream _ f) =
114    run $ f $ stream $ Stream emptyStep (return ())
115  where
116    emptyStep _ = return $ Stop ()
117    run (Stream step ms0) =
118        ms0 >>= loop
119      where
120        loop s = do
121            res <- step s
122            case res of
123                Stop r -> return r
124                Skip s' -> loop s'
125                Emit _ o -> absurd o
126{-# INLINE connectStream #-}
127
128{-# RULES "conduit: connectStream ($$)" forall left right.
129        unstream left $$ unstream right = connectStream left right
130  #-}
131
132connectStream1 :: Monad m
133               => ConduitWithStream () i    m ()
134               -> ConduitT          i  Void m r
135               -> m r
136connectStream1 (ConduitWithStream _ fstream) (ConduitT sink0) =
137    case fstream $ Stream (const $ return $ Stop ()) (return ()) of
138        Stream step ms0 ->
139            let loop _ (Done r) _ = return r
140                loop ls (PipeM mp) s = mp >>= flip (loop ls) s
141                loop ls (Leftover p l) s = loop (l:ls) p s
142                loop _ (HaveOutput _ o) _ = absurd o
143                loop (l:ls) (NeedInput p _) s = loop ls (p l) s
144                loop [] (NeedInput p c) s = do
145                    res <- step s
146                    case res of
147                        Stop () -> loop [] (c ()) s
148                        Skip s' -> loop [] (NeedInput p c) s'
149                        Emit s' i -> loop [] (p i) s'
150             in ms0 >>= loop [] (sink0 Done)
151{-# INLINE connectStream1 #-}
152
153{-# RULES "conduit: connectStream1 ($$)" forall left right.
154        unstream left $$ right = connectStream1 left right
155  #-}
156
157{-# RULES "conduit: connectStream1 (runConduit/.|)" forall left right.
158        runConduit (unstream left .| right) = connectStream1 left right
159  #-}
160{-# RULES "conduit: connectStream1 (runConduit/=$=)" forall left right.
161        runConduit (unstream left =$= right) = connectStream1 left right
162  #-}
163{-# RULES "conduit: connectStream1 (runConduit/fuse)" forall left right.
164        runConduit (fuse (unstream left) right) = connectStream1 left right
165  #-}
166
167{-# RULES "conduit: connectStream1 (runConduitPure/.|)" forall left right.
168        runConduitPure (unstream left .| right) = runIdentity (connectStream1 left right)
169  #-}
170{-# RULES "conduit: connectStream1 (runConduitPure/=$=)" forall left right.
171        runConduitPure (unstream left =$= right) = runIdentity (connectStream1 left right)
172  #-}
173{-# RULES "conduit: connectStream1 (runConduitPure/fuse)" forall left right.
174        runConduitPure (fuse (unstream left) right) = runIdentity (connectStream1 left right)
175  #-}
176
177{-# RULES "conduit: connectStream1 (runConduitRes/.|)" forall left right.
178        runConduitRes (unstream left .| right) = runResourceT (connectStream1 left right)
179  #-}
180{-# RULES "conduit: connectStream1 (runConduitRes/=$=)" forall left right.
181        runConduitRes (unstream left =$= right) = runResourceT (connectStream1 left right)
182  #-}
183{-# RULES "conduit: connectStream1 (runConduitRes/fuse)" forall left right.
184        runConduitRes (fuse (unstream left) right) = runResourceT (connectStream1 left right)
185  #-}
186
187connectStream2 :: forall i m r. Monad m
188               => ConduitT          () i    m ()
189               -> ConduitWithStream i  Void m r
190               -> m r
191connectStream2 (ConduitT src0) (ConduitWithStream _ fstream) =
192    run $ fstream $ Stream step' $ return (src0 Done)
193  where
194    step' :: Pipe () () i () m () -> m (Step (Pipe () () i () m ()) i ())
195    step' (Done ()) = return $ Stop ()
196    step' (HaveOutput pipe o) = return $ Emit pipe o
197    step' (NeedInput _ c) = return $ Skip $ c ()
198    step' (PipeM mp) = Skip <$> mp
199    step' (Leftover p ()) = return $ Skip p
200    {-# INLINE step' #-}
201
202    run (Stream step ms0) =
203        ms0 >>= loop
204      where
205        loop s = do
206            res <- step s
207            case res of
208                Stop r -> return r
209                Emit _ o -> absurd o
210                Skip s' -> loop s'
211{-# INLINE connectStream2 #-}
212
213{-# RULES "conduit: connectStream2 ($$)" forall left right.
214        left $$ unstream right = connectStream2 left right
215  #-}
216
217{-# RULES "conduit: connectStream2 (runConduit/.|)" forall left right.
218        runConduit (left .| unstream right) = connectStream2 left right
219  #-}
220{-# RULES "conduit: connectStream2 (runConduit/fuse)" forall left right.
221        runConduit (fuse left (unstream right)) = connectStream2 left right
222  #-}
223{-# RULES "conduit: connectStream2 (runConduit/=$=)" forall left right.
224        runConduit (left =$= unstream right) = connectStream2 left right
225  #-}
226
227{-# RULES "conduit: connectStream2 (runConduitPure/.|)" forall left right.
228        runConduitPure (left .| unstream right) = runIdentity (connectStream2 left right)
229  #-}
230{-# RULES "conduit: connectStream2 (runConduitPure/fuse)" forall left right.
231        runConduitPure (fuse left (unstream right)) = runIdentity (connectStream2 left right)
232  #-}
233{-# RULES "conduit: connectStream2 (runConduitPure/=$=)" forall left right.
234        runConduitPure (left =$= unstream right) = runIdentity (connectStream2 left right)
235  #-}
236
237{-# RULES "conduit: connectStream2 (runConduitRes/.|)" forall left right.
238        runConduitRes (left .| unstream right) = runResourceT (connectStream2 left right)
239  #-}
240{-# RULES "conduit: connectStream2 (runConduitRes/fuse)" forall left right.
241        runConduitRes (fuse left (unstream right)) = runResourceT (connectStream2 left right)
242  #-}
243{-# RULES "conduit: connectStream2 (runConduitRes/=$=)" forall left right.
244        runConduitRes (left =$= unstream right) = runResourceT (connectStream2 left right)
245  #-}
246
247streamConduit :: ConduitT i o m r
248              -> (Stream m i () -> Stream m o r)
249              -> ConduitWithStream i o m r
250streamConduit = ConduitWithStream
251{-# INLINE CONLIKE streamConduit #-}
252
253streamSource
254    :: Monad m
255    => Stream m o ()
256    -> ConduitWithStream i o m ()
257streamSource str@(Stream step ms0) =
258    ConduitWithStream con (const str)
259  where
260    con = ConduitT $ \rest -> PipeM $ do
261        s0 <- ms0
262        let loop s = do
263                res <- step s
264                case res of
265                    Stop () -> return $ rest ()
266                    Emit s' o -> return $ HaveOutput (PipeM $ loop s') o
267                    Skip s' -> loop s'
268        loop s0
269{-# INLINE streamSource #-}
270
271streamSourcePure
272    :: Monad m
273    => Stream Identity o ()
274    -> ConduitWithStream i o m ()
275streamSourcePure (Stream step ms0) =
276    ConduitWithStream con (const $ Stream (return . runIdentity . step) (return s0))
277  where
278    s0 = runIdentity ms0
279    con = ConduitT $ \rest ->
280        let loop s =
281                case runIdentity $ step s of
282                    Stop () -> rest ()
283                    Emit s' o -> HaveOutput (loop s') o
284                    Skip s' -> loop s'
285         in loop s0
286{-# INLINE streamSourcePure #-}
287