1{-# LANGUAGE BangPatterns #-}
2{-# LANGUAGE TupleSections #-}
3{-# LANGUAGE RankNTypes #-}
4{-# LANGUAGE Trustworthy #-}
5module Data.Conduit.Internal.List.Stream where
6
7import           Control.Monad (liftM)
8import           Data.Conduit.Internal.Fusion
9import qualified Data.Foldable as F
10
11--FIXME: Should streamSource / streamSourcePure be used for sources?
12
13unfoldS :: Monad m
14        => (b -> Maybe (a, b))
15        -> b
16        -> StreamProducer m a
17unfoldS f s0 _ =
18    Stream step (return s0)
19  where
20    step s = return $
21        case f s of
22            Nothing -> Stop ()
23            Just (x, s') -> Emit s' x
24{-# INLINE unfoldS #-}
25
26unfoldEitherS :: Monad m
27              => (b -> Either r (a, b))
28              -> b
29              -> StreamConduitT i a m r
30unfoldEitherS f s0 _ =
31    Stream step (return s0)
32  where
33    step s = return $
34        case f s of
35            Left r        -> Stop r
36            Right (x, s') -> Emit s' x
37{-# INLINE unfoldEitherS #-}
38
39unfoldMS :: Monad m
40         => (b -> m (Maybe (a, b)))
41         -> b
42         -> StreamProducer m a
43unfoldMS f s0 _ =
44    Stream step (return s0)
45  where
46    step s = do
47        ms' <- f s
48        return $ case ms' of
49            Nothing -> Stop ()
50            Just (x, s') -> Emit s' x
51{-# INLINE unfoldMS #-}
52
53unfoldEitherMS :: Monad m
54         => (b -> m (Either r (a, b)))
55         -> b
56         -> StreamConduitT i a m r
57unfoldEitherMS f s0 _ =
58    Stream step (return s0)
59  where
60    step s = do
61        ms' <- f s
62        return $ case ms' of
63            Left r        -> Stop r
64            Right (x, s') -> Emit s' x
65{-# INLINE unfoldEitherMS #-}
66sourceListS :: Monad m => [a] -> StreamProducer m a
67sourceListS xs0 _ =
68    Stream (return . step) (return xs0)
69  where
70    step [] = Stop ()
71    step (x:xs) = Emit xs x
72{-# INLINE sourceListS #-}
73
74enumFromToS :: (Enum a, Prelude.Ord a, Monad m)
75            => a
76            -> a
77            -> StreamProducer m a
78enumFromToS x0 y _ =
79    Stream step (return x0)
80  where
81    step x = return $ if x Prelude.> y
82        then Stop ()
83        else Emit (Prelude.succ x) x
84{-# INLINE [0] enumFromToS #-}
85
86enumFromToS_int :: (Prelude.Integral a, Monad m)
87                => a
88                -> a
89                -> StreamProducer m a
90enumFromToS_int x0 y _ = x0 `seq` y `seq` Stream step (return x0)
91  where
92    step x | x <= y    = return $ Emit (x Prelude.+ 1) x
93           | otherwise = return $ Stop ()
94{-# INLINE enumFromToS_int #-}
95
96{-# RULES "conduit: enumFromTo<Int>" forall f t.
97      enumFromToS f t = enumFromToS_int f t :: Monad m => StreamProducer m Int
98  #-}
99
100iterateS :: Monad m => (a -> a) -> a -> StreamProducer m a
101iterateS f x0 _ =
102    Stream (return . step) (return x0)
103  where
104    step x = Emit x' x
105      where
106        x' = f x
107{-# INLINE iterateS #-}
108
109replicateS :: Monad m => Int -> a -> StreamProducer m a
110replicateS cnt0 a _ =
111    Stream step (return cnt0)
112  where
113    step cnt
114        | cnt <= 0  = return $ Stop ()
115        | otherwise = return $ Emit (cnt - 1) a
116{-# INLINE replicateS #-}
117
118replicateMS :: Monad m => Int -> m a -> StreamProducer m a
119replicateMS cnt0 ma _ =
120    Stream step (return cnt0)
121  where
122    step cnt
123        | cnt <= 0  = return $ Stop ()
124        | otherwise = Emit (cnt - 1) `liftM` ma
125{-# INLINE replicateMS #-}
126
127foldS :: Monad m => (b -> a -> b) -> b -> StreamConsumer a m b
128foldS f b0 (Stream step ms0) =
129    Stream step' (liftM (b0, ) ms0)
130  where
131    step' (!b, s) = do
132        res <- step s
133        return $ case res of
134            Stop () -> Stop b
135            Skip s' -> Skip (b, s')
136            Emit s' a -> Skip (f b a, s')
137{-# INLINE foldS #-}
138
139foldMS :: Monad m => (b -> a -> m b) -> b -> StreamConsumer a m b
140foldMS f b0 (Stream step ms0) =
141    Stream step' (liftM (b0, ) ms0)
142  where
143    step' (!b, s) = do
144        res <- step s
145        case res of
146            Stop () -> return $ Stop b
147            Skip s' -> return $ Skip (b, s')
148            Emit s' a -> do
149                b' <- f b a
150                return $ Skip (b', s')
151{-# INLINE foldMS #-}
152
153mapM_S :: Monad m
154       => (a -> m ())
155       -> StreamConsumer a m ()
156mapM_S f (Stream step ms0) =
157    Stream step' ms0
158  where
159    step' s = do
160        res <- step s
161        case res of
162          Stop () -> return $ Stop ()
163          Skip s' -> return $ Skip s'
164          Emit s' x -> f x >> return (Skip s')
165{-# INLINE [1] mapM_S #-}
166
167dropS :: Monad m
168      => Int
169      -> StreamConsumer a m ()
170dropS n0 (Stream step ms0) =
171    Stream step' (liftM (, n0) ms0)
172  where
173    step' (_, n) | n <= 0 = return $ Stop ()
174    step' (s, n) = do
175        res <- step s
176        return $ case res of
177            Stop () -> Stop ()
178            Skip s' -> Skip (s', n)
179            Emit s' _ -> Skip (s', n - 1)
180{-# INLINE dropS #-}
181
182takeS :: Monad m
183      => Int
184      -> StreamConsumer a m [a]
185takeS n0 (Stream step s0) =
186    Stream step' (liftM (id, n0,) s0)
187  where
188    step' (output, n, _) | n <= 0 = return $ Stop (output [])
189    step' (output, n, s) = do
190        res <- step s
191        return $ case res of
192            Stop () -> Stop (output [])
193            Skip s' -> Skip (output, n, s')
194            Emit s' x -> Skip (output . (x:), n - 1, s')
195{-# INLINE takeS #-}
196
197headS :: Monad m => StreamConsumer a m (Maybe a)
198headS (Stream step s0) =
199    Stream step' s0
200  where
201    step' s = do
202        res <- step s
203        return $ case res of
204            Stop () -> Stop Nothing
205            Skip s' -> Skip s'
206            Emit _ x -> Stop (Just x)
207{-# INLINE headS #-}
208
209mapS :: Monad m => (a -> b) -> StreamConduit a m b
210mapS f (Stream step ms0) =
211    Stream step' ms0
212  where
213    step' s = do
214        res <- step s
215        return $ case res of
216            Stop r -> Stop r
217            Emit s' a -> Emit s' (f a)
218            Skip s' -> Skip s'
219{-# INLINE mapS #-}
220
221mapMS :: Monad m => (a -> m b) -> StreamConduit a m b
222mapMS f (Stream step ms0) =
223    Stream step' ms0
224  where
225    step' s = do
226        res <- step s
227        case res of
228            Stop r -> return $ Stop r
229            Emit s' a -> Emit s' `liftM` f a
230            Skip s' -> return $ Skip s'
231{-# INLINE mapMS #-}
232
233iterMS :: Monad m => (a -> m ()) -> StreamConduit a m a
234iterMS f (Stream step ms0) =
235    Stream step' ms0
236  where
237    step' s = do
238        res <- step s
239        case res of
240            Stop () -> return $ Stop ()
241            Skip s' -> return $ Skip s'
242            Emit s' x -> f x >> return (Emit s' x)
243{-# INLINE iterMS #-}
244
245mapMaybeS :: Monad m => (a -> Maybe b) -> StreamConduit a m b
246mapMaybeS f (Stream step ms0) =
247    Stream step' ms0
248  where
249    step' s = do
250        res <- step s
251        return $ case res of
252            Stop () -> Stop ()
253            Skip s' -> Skip s'
254            Emit s' x ->
255                case f x of
256                    Just y -> Emit s' y
257                    Nothing -> Skip s'
258{-# INLINE mapMaybeS #-}
259
260mapMaybeMS :: Monad m => (a -> m (Maybe b)) -> StreamConduit a m b
261mapMaybeMS f (Stream step ms0) =
262    Stream step' ms0
263  where
264    step' s = do
265        res <- step s
266        case res of
267            Stop () -> return $ Stop ()
268            Skip s' -> return $ Skip s'
269            Emit s' x -> do
270                my <- f x
271                case my of
272                    Just y -> return $ Emit s' y
273                    Nothing -> return $ Skip s'
274{-# INLINE mapMaybeMS #-}
275
276catMaybesS :: Monad m => StreamConduit (Maybe a) m a
277catMaybesS (Stream step ms0) =
278    Stream step' ms0
279  where
280    step' s = do
281        res <- step s
282        return $ case res of
283            Stop () -> Stop ()
284            Skip s' -> Skip s'
285            Emit s' Nothing -> Skip s'
286            Emit s' (Just x) -> Emit s' x
287{-# INLINE catMaybesS #-}
288
289concatS :: (Monad m, F.Foldable f) => StreamConduit (f a) m a
290concatS (Stream step ms0) =
291    Stream step' (liftM ([], ) ms0)
292  where
293    step' ([], s) = do
294        res <- step s
295        return $ case res of
296            Stop () -> Stop ()
297            Skip s' -> Skip ([], s')
298            Emit s' x -> Skip (F.toList x, s')
299    step' ((x:xs), s) = return (Emit (xs, s) x)
300{-# INLINE concatS #-}
301
302concatMapS :: Monad m => (a -> [b]) -> StreamConduit a m b
303concatMapS f (Stream step ms0) =
304    Stream step' (liftM ([], ) ms0)
305  where
306    step' ([], s) = do
307        res <- step s
308        return $ case res of
309            Stop () -> Stop ()
310            Skip s' -> Skip ([], s')
311            Emit s' x -> Skip (f x, s')
312    step' ((x:xs), s) = return (Emit (xs, s) x)
313{-# INLINE concatMapS #-}
314
315concatMapMS :: Monad m => (a -> m [b]) -> StreamConduit a m b
316concatMapMS f (Stream step ms0) =
317    Stream step' (liftM ([], ) ms0)
318  where
319    step' ([], s) = do
320        res <- step s
321        case res of
322            Stop () -> return $ Stop ()
323            Skip s' -> return $ Skip ([], s')
324            Emit s' x -> do
325                xs <- f x
326                return $ Skip (xs, s')
327    step' ((x:xs), s) = return (Emit (xs, s) x)
328{-# INLINE concatMapMS #-}
329
330concatMapAccumS :: Monad m => (a -> accum -> (accum, [b])) -> accum -> StreamConduit a m b
331concatMapAccumS f  initial (Stream step ms0) =
332    Stream step' (liftM (initial, [], ) ms0)
333  where
334    step' (accum, [], s) = do
335        res <- step s
336        return $ case res of
337            Stop () -> Stop ()
338            Skip s' -> Skip (accum, [], s')
339            Emit s' x ->
340                let (accum', xs) = f x accum
341                in Skip (accum', xs, s')
342    step' (accum, (x:xs), s) = return (Emit (accum, xs, s) x)
343{-# INLINE concatMapAccumS #-}
344
345mapAccumS :: Monad m => (a -> s -> (s, b)) -> s -> StreamConduitT a b m s
346mapAccumS f initial (Stream step ms0) =
347    Stream step' (liftM (initial, ) ms0)
348  where
349    step' (accum, s) = do
350        res <- step s
351        return $ case res of
352            Stop () -> Stop accum
353            Skip s' -> Skip (accum, s')
354            Emit s' x ->
355                let (accum', r) = f x accum
356                in Emit (accum', s') r
357{-# INLINE mapAccumS #-}
358
359mapAccumMS :: Monad m => (a -> s -> m (s, b)) -> s -> StreamConduitT a b m s
360mapAccumMS f initial (Stream step ms0) =
361    Stream step' (liftM (initial, ) ms0)
362  where
363    step' (accum, s) = do
364        res <- step s
365        case res of
366            Stop () -> return $ Stop accum
367            Skip s' -> return $ Skip (accum, s')
368            Emit s' x -> do
369                (accum', r) <- f x accum
370                return $ Emit (accum', s') r
371{-# INLINE mapAccumMS #-}
372
373concatMapAccumMS :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> StreamConduit a m b
374concatMapAccumMS f  initial (Stream step ms0) =
375    Stream step' (liftM (initial, [], ) ms0)
376  where
377    step' (accum, [], s) = do
378        res <- step s
379        case res of
380            Stop () -> return $ Stop ()
381            Skip s' -> return $ Skip (accum, [], s')
382            Emit s' x -> do
383                (accum', xs) <- f x accum
384                return $ Skip (accum', xs, s')
385    step' (accum, (x:xs), s) = return (Emit (accum, xs, s) x)
386{-# INLINE concatMapAccumMS #-}
387
388mapFoldableS :: (Monad m, F.Foldable f) => (a -> f b) -> StreamConduit a m b
389mapFoldableS f (Stream step ms0) =
390    Stream step' (liftM ([], ) ms0)
391  where
392    step' ([], s) = do
393        res <- step s
394        return $ case res of
395            Stop () -> Stop ()
396            Skip s' -> Skip ([], s')
397            Emit s' x -> Skip (F.toList (f x), s')
398    step' ((x:xs), s) = return (Emit (xs, s) x)
399{-# INLINE mapFoldableS #-}
400
401mapFoldableMS :: (Monad m, F.Foldable f) => (a -> m (f b)) -> StreamConduit a m b
402mapFoldableMS f (Stream step ms0) =
403    Stream step' (liftM ([], ) ms0)
404  where
405    step' ([], s) = do
406        res <- step s
407        case res of
408            Stop () -> return $ Stop ()
409            Skip s' -> return $ Skip ([], s')
410            Emit s' x -> do
411                y <- f x
412                return $ Skip (F.toList y, s')
413    step' ((x:xs), s) = return (Emit (xs, s) x)
414{-# INLINE mapFoldableMS #-}
415
416consumeS :: Monad m => StreamConsumer a m [a]
417consumeS (Stream step ms0) =
418    Stream step' (liftM (id,) ms0)
419  where
420    step' (front, s) = do
421        res <- step s
422        return $ case res of
423            Stop () -> Stop (front [])
424            Skip s' -> Skip (front, s')
425            Emit s' a -> Skip (front . (a:), s')
426{-# INLINE consumeS #-}
427
428groupByS :: Monad m => (a -> a -> Bool) -> StreamConduit a m [a]
429groupByS f = mapS (Prelude.uncurry (:)) . groupBy1S id f
430{-# INLINE groupByS #-}
431
432groupOn1S :: (Monad m, Eq b) => (a -> b) -> StreamConduit a m (a, [a])
433groupOn1S f = groupBy1S f (==)
434{-# INLINE groupOn1S #-}
435
436data GroupByState a b s
437     = GBStart s
438     | GBLoop ([a] -> [a]) a b s
439     | GBDone
440
441groupBy1S :: Monad m => (a -> b) -> (b -> b -> Bool) -> StreamConduit a m (a, [a])
442groupBy1S f eq (Stream step ms0) =
443    Stream step' (liftM GBStart ms0)
444  where
445    step' (GBStart s) = do
446        res <- step s
447        return $ case res of
448            Stop () -> Stop ()
449            Skip s' -> Skip (GBStart s')
450            Emit s' x0 -> Skip (GBLoop id x0 (f x0) s')
451    step' (GBLoop rest x0 fx0 s) = do
452        res <- step s
453        return $ case res of
454            Stop () -> Emit GBDone (x0, rest [])
455            Skip s' -> Skip (GBLoop rest x0 fx0 s')
456            Emit s' x
457                | fx0 `eq` f x -> Skip (GBLoop (rest . (x:)) x0 fx0 s')
458                | otherwise -> Emit (GBLoop id x (f x) s') (x0, rest [])
459    step' GBDone = return $ Stop ()
460{-# INLINE groupBy1S #-}
461
462isolateS :: Monad m => Int -> StreamConduit a m a
463isolateS count (Stream step ms0) =
464    Stream step' (liftM (count,) ms0)
465  where
466    step' (n, _) | n <= 0 = return $ Stop ()
467    step' (n, s) = do
468        res <- step s
469        return $ case res of
470            Stop () -> Stop ()
471            Skip s' -> Skip (n, s')
472            Emit s' x -> Emit (n - 1, s') x
473{-# INLINE isolateS #-}
474
475filterS :: Monad m => (a -> Bool) -> StreamConduit a m a
476filterS f (Stream step ms0) =
477    Stream step' ms0
478  where
479    step' s = do
480        res <- step s
481        return $ case res of
482            Stop () -> Stop ()
483            Skip s' -> Skip s'
484            Emit s' x
485                | f x -> Emit s' x
486                | otherwise -> Skip s'
487
488sinkNullS :: Monad m => StreamConsumer a m ()
489sinkNullS (Stream step ms0) =
490    Stream step' ms0
491  where
492    step' s = do
493        res <- step s
494        return $ case res of
495            Stop () -> Stop ()
496            Skip s' -> Skip s'
497            Emit s' _ -> Skip s'
498{-# INLINE sinkNullS #-}
499
500sourceNullS :: Monad m => StreamProducer m a
501sourceNullS _ = Stream (\_ -> return (Stop ())) (return ())
502{-# INLINE sourceNullS #-}
503