1{-# LANGUAGE CPP #-}
2{-# LANGUAGE RankNTypes #-}
3{-# LANGUAGE TupleSections #-}
4{-# LANGUAGE ViewPatterns #-}
5{-# LANGUAGE BangPatterns #-}
6{-# LANGUAGE TypeFamilies #-}
7-- | These are stream fusion versions of some of the functions in
8-- "Data.Conduit.Combinators".  Many functions don't have stream
9-- versions here because instead they have @RULES@ which inline a
10-- definition that fuses.
11module Data.Conduit.Combinators.Stream
12  ( yieldManyS
13  , repeatMS
14  , repeatWhileMS
15  , foldl1S
16  , allS
17  , anyS
18  , sinkLazyS
19  , sinkVectorS
20  , sinkVectorNS
21  , sinkLazyBuilderS
22  , lastS
23  , lastES
24  , findS
25  , concatMapS
26  , concatMapMS
27  , concatS
28  , scanlS
29  , scanlMS
30  , mapAccumWhileS
31  , mapAccumWhileMS
32  , intersperseS
33  , slidingWindowS
34  , filterMS
35  , splitOnUnboundedES
36  , initReplicateS
37  , initRepeatS
38  )
39  where
40
41-- BEGIN IMPORTS
42
43import           Control.Monad (liftM)
44import           Control.Monad.Primitive (PrimMonad)
45import qualified Data.ByteString.Lazy as BL
46import           Data.ByteString.Builder (Builder, toLazyByteString)
47import           Data.Conduit.Internal.Fusion
48import           Data.Conduit.Internal.List.Stream (foldS)
49import           Data.Maybe (isNothing, isJust)
50import           Data.MonoTraversable
51#if ! MIN_VERSION_base(4,8,0)
52import           Data.Monoid (Monoid (..))
53#endif
54import qualified Data.NonNull as NonNull
55import qualified Data.Sequences as Seq
56import qualified Data.Vector.Generic as V
57import qualified Data.Vector.Generic.Mutable as VM
58import           Prelude
59
60#if MIN_VERSION_mono_traversable(1,0,0)
61import           Data.Sequences (LazySequence (..))
62#else
63import           Data.Sequences.Lazy
64#endif
65
66-- END IMPORTS
67
68yieldManyS :: (Monad m, MonoFoldable mono)
69            => mono
70            -> StreamProducer m (Element mono)
71yieldManyS mono _ =
72    Stream (return . step) (return (otoList mono))
73  where
74    step [] = Stop ()
75    step (x:xs) = Emit xs x
76{-# INLINE yieldManyS #-}
77
78repeatMS :: Monad m
79         => m a
80         -> StreamProducer m a
81repeatMS m _ =
82    Stream step (return ())
83  where
84    step _ = liftM (Emit ()) m
85{-# INLINE repeatMS #-}
86
87repeatWhileMS :: Monad m
88              => m a
89              -> (a -> Bool)
90              -> StreamProducer m a
91repeatWhileMS m f _ =
92    Stream step (return ())
93  where
94    step _ = do
95        x <- m
96        return $ if f x
97            then Emit () x
98            else Stop ()
99{-# INLINE repeatWhileMS #-}
100
101foldl1S :: Monad m
102        => (a -> a -> a)
103        -> StreamConsumer a m (Maybe a)
104foldl1S f (Stream step ms0) =
105    Stream step' (liftM (Nothing, ) ms0)
106  where
107    step' (mprev, s) = do
108        res <- step s
109        return $ case res of
110            Stop () -> Stop mprev
111            Skip s' -> Skip (mprev, s')
112            Emit s' a -> Skip (Just $ maybe a (`f` a) mprev, s')
113{-# INLINE foldl1S #-}
114
115allS :: Monad m
116     => (a -> Bool)
117     -> StreamConsumer a m Bool
118allS f = fmapS isNothing (findS (Prelude.not . f))
119{-# INLINE allS #-}
120
121anyS :: Monad m
122     => (a -> Bool)
123     -> StreamConsumer a m Bool
124anyS f = fmapS isJust (findS f)
125{-# INLINE anyS #-}
126
127--TODO: use a definition like
128-- fmapS (fromChunks . ($ [])) <$> CL.fold (\front next -> front . (next:)) id
129
130sinkLazyS :: (Monad m, LazySequence lazy strict)
131          => StreamConsumer strict m lazy
132sinkLazyS = fmapS (fromChunks . ($ [])) $ foldS (\front next -> front . (next:)) id
133{-# INLINE sinkLazyS #-}
134
135sinkVectorS :: (V.Vector v a, PrimMonad m)
136            => StreamConsumer a m (v a)
137sinkVectorS (Stream step ms0) = do
138    Stream step' $ do
139        s0 <- ms0
140        mv0 <- VM.new initSize
141        return (initSize, 0, mv0, s0)
142  where
143    initSize = 10
144    step' (maxSize, i, mv, s) = do
145        res <- step s
146        case res of
147            Stop () -> liftM (Stop . V.slice 0 i) $ V.unsafeFreeze mv
148            Skip s' -> return $ Skip (maxSize, i, mv, s')
149            Emit s' x -> do
150                VM.write mv i x
151                let i' = i + 1
152                if i' >= maxSize
153                    then do
154                        let newMax = maxSize * 2
155                        mv' <- VM.grow mv maxSize
156                        return $ Skip (newMax, i', mv', s')
157                    else return $ Skip (maxSize, i', mv, s')
158{-# INLINE sinkVectorS #-}
159
160sinkVectorNS :: (V.Vector v a, PrimMonad m)
161             => Int -- ^ maximum allowed size
162             -> StreamConsumer a m (v a)
163sinkVectorNS maxSize (Stream step ms0) = do
164    Stream step' $ do
165        s0 <- ms0
166        mv0 <- VM.new maxSize
167        return (0, mv0, s0)
168  where
169    step' (i, mv, _) | i >= maxSize = liftM Stop $ V.unsafeFreeze mv
170    step' (i, mv, s) = do
171        res <- step s
172        case res of
173            Stop () -> liftM (Stop . V.slice 0 i) $ V.unsafeFreeze mv
174            Skip s' -> return $ Skip (i, mv, s')
175            Emit s' x -> do
176                VM.write mv i x
177                let i' = i + 1
178                return $ Skip (i', mv, s')
179{-# INLINE sinkVectorNS #-}
180
181sinkLazyBuilderS :: Monad m => StreamConsumer Builder m BL.ByteString
182sinkLazyBuilderS = fmapS toLazyByteString (foldS mappend mempty)
183{-# INLINE sinkLazyBuilderS #-}
184
185lastS :: Monad m
186      => StreamConsumer a m (Maybe a)
187lastS (Stream step ms0) =
188    Stream step' (liftM (Nothing,) ms0)
189  where
190    step' (mlast, s) = do
191        res <- step s
192        return $ case res of
193            Stop () -> Stop mlast
194            Skip s' -> Skip (mlast, s')
195            Emit s' x -> Skip (Just x, s')
196{-# INLINE lastS #-}
197
198lastES :: (Monad m, Seq.IsSequence seq)
199       => StreamConsumer seq m (Maybe (Element seq))
200lastES (Stream step ms0) =
201    Stream step' (liftM (Nothing, ) ms0)
202  where
203    step' (mlast, s) = do
204        res <- step s
205        return $ case res of
206            Stop () -> Stop (fmap NonNull.last mlast)
207            Skip s' -> Skip (mlast, s')
208            Emit s' (NonNull.fromNullable -> mlast'@(Just _)) -> Skip (mlast', s')
209            Emit s' _ -> Skip (mlast, s')
210{-# INLINE lastES #-}
211
212findS :: Monad m
213      => (a -> Bool) -> StreamConsumer a m (Maybe a)
214findS f (Stream step ms0) =
215    Stream step' ms0
216  where
217    step' s = do
218      res <- step s
219      return $ case res of
220          Stop () -> Stop Nothing
221          Skip s' -> Skip s'
222          Emit s' x ->
223              if f x
224                  then Stop (Just x)
225                  else Skip s'
226{-# INLINE findS #-}
227
228concatMapS :: (Monad m, MonoFoldable mono)
229           => (a -> mono)
230           -> StreamConduit a m (Element mono)
231concatMapS f (Stream step ms0) =
232    Stream step' (liftM ([], ) ms0)
233  where
234    step' ([], s) = do
235        res <- step s
236        return $ case res of
237            Stop () -> Stop ()
238            Skip s' -> Skip ([], s')
239            Emit s' x -> Skip (otoList (f x), s')
240    step' ((x:xs), s) = return (Emit (xs, s) x)
241{-# INLINE concatMapS #-}
242
243concatMapMS :: (Monad m, MonoFoldable mono)
244             => (a -> m mono)
245             -> StreamConduit a m (Element mono)
246concatMapMS f (Stream step ms0) =
247    Stream step' (liftM ([], ) ms0)
248  where
249    step' ([], s) = do
250        res <- step s
251        case res of
252            Stop () -> return $ Stop ()
253            Skip s' -> return $ Skip ([], s')
254            Emit s' x -> do
255                o <- f x
256                return $ Skip (otoList o, s')
257    step' ((x:xs), s) = return (Emit (xs, s) x)
258{-# INLINE concatMapMS #-}
259
260concatS :: (Monad m, MonoFoldable mono)
261         => StreamConduit mono m (Element mono)
262concatS = concatMapS id
263{-# INLINE concatS #-}
264
265data ScanState a s
266    = ScanEnded
267    | ScanContinues a s
268
269scanlS :: Monad m => (a -> b -> a) -> a -> StreamConduit b m a
270scanlS f seed0 (Stream step ms0) =
271    Stream step' (liftM (ScanContinues seed0) ms0)
272  where
273    step' ScanEnded = return $ Stop ()
274    step' (ScanContinues seed s) = do
275        res <- step s
276        return $ case res of
277            Stop () -> Emit ScanEnded seed
278            Skip s' -> Skip (ScanContinues seed s')
279            Emit s' x -> Emit (ScanContinues seed' s') seed
280              where
281                !seed' = f seed x
282{-# INLINE scanlS #-}
283
284scanlMS :: Monad m => (a -> b -> m a) -> a -> StreamConduit b m a
285scanlMS f seed0 (Stream step ms0) =
286    Stream step' (liftM (ScanContinues seed0) ms0)
287  where
288    step' ScanEnded = return $ Stop ()
289    step' (ScanContinues seed s) = do
290        res <- step s
291        case res of
292            Stop () -> return $ Emit ScanEnded seed
293            Skip s' -> return $ Skip (ScanContinues seed s')
294            Emit s' x -> do
295                !seed' <- f seed x
296                return $ Emit (ScanContinues seed' s') seed
297{-# INLINE scanlMS #-}
298
299mapAccumWhileS :: Monad m =>
300    (a -> s -> Either s (s, b)) -> s -> StreamConduitT a b m s
301mapAccumWhileS f initial (Stream step ms0) =
302    Stream step' (liftM (initial, ) ms0)
303  where
304    step' (!accum, s) = do
305        res <- step s
306        return $ case res of
307            Stop () -> Stop accum
308            Skip s' -> Skip (accum, s')
309            Emit s' x -> case f x accum of
310                Right (!accum', r) -> Emit (accum', s') r
311                Left   !accum'     -> Stop accum'
312{-# INLINE mapAccumWhileS #-}
313
314mapAccumWhileMS :: Monad m =>
315    (a -> s -> m (Either s (s, b))) -> s -> StreamConduitT a b m s
316mapAccumWhileMS f initial (Stream step ms0) =
317    Stream step' (liftM (initial, ) ms0)
318  where
319    step' (!accum, s) = do
320        res <- step s
321        case res of
322            Stop () -> return $ Stop accum
323            Skip s' -> return $ Skip (accum, s')
324            Emit s' x -> do
325                lr <- f x accum
326                return $ case lr of
327                    Right (!accum', r) -> Emit (accum', s') r
328                    Left   !accum'     -> Stop accum'
329{-# INLINE mapAccumWhileMS #-}
330
331data IntersperseState a s
332    = IFirstValue s
333    | IGotValue s a
334    | IEmitValue s a
335
336intersperseS :: Monad m => a -> StreamConduit a m a
337intersperseS sep (Stream step ms0) =
338    Stream step' (liftM IFirstValue ms0)
339  where
340    step' (IFirstValue s) = do
341        res <- step s
342        return $ case res of
343            Stop () -> Stop ()
344            Skip s' -> Skip (IFirstValue s')
345            Emit s' x -> Emit (IGotValue s' x) x
346    -- Emit the separator once we know it's not the end of the list.
347    step' (IGotValue s x) = do
348        res <- step s
349        return $ case res of
350            Stop () -> Stop ()
351            Skip s' -> Skip (IGotValue s' x)
352            Emit s' x' -> Emit (IEmitValue s' x') sep
353    -- We emitted a separator, now emit the value that comes after.
354    step' (IEmitValue s x) = return $ Emit (IGotValue s x) x
355{-# INLINE intersperseS #-}
356
357data SlidingWindowState seq s
358    = SWInitial Int seq s
359    | SWSliding seq s
360    | SWEarlyExit
361
362slidingWindowS :: (Monad m, Seq.IsSequence seq, Element seq ~ a) => Int -> StreamConduit a m seq
363slidingWindowS sz (Stream step ms0) =
364    Stream step' (liftM (SWInitial (max 1 sz) mempty) ms0)
365  where
366    step' (SWInitial n st s) = do
367        res <- step s
368        return $ case res of
369            Stop () -> Emit SWEarlyExit st
370            Skip s' -> Skip (SWInitial n st s')
371            Emit s' x ->
372                if n == 1
373                    then Emit (SWSliding (Seq.unsafeTail st') s') st'
374                    else Skip (SWInitial (n - 1) st' s')
375              where
376                st' = Seq.snoc st x
377    -- After collecting the initial window, each upstream element
378    -- causes an additional window to be yielded.
379    step' (SWSliding st s) = do
380        res <- step s
381        return $ case res of
382            Stop () -> Stop ()
383            Skip s' -> Skip (SWSliding st s')
384            Emit s' x -> Emit (SWSliding (Seq.unsafeTail st') s') st'
385              where
386                st' = Seq.snoc st x
387    step' SWEarlyExit = return $ Stop ()
388
389{-# INLINE slidingWindowS #-}
390
391filterMS :: Monad m
392         => (a -> m Bool)
393         -> StreamConduit a m a
394filterMS f (Stream step ms0) = do
395    Stream step' ms0
396  where
397    step' s = do
398        res <- step s
399        case res of
400            Stop () -> return $ Stop ()
401            Skip s' -> return $ Skip s'
402            Emit s' x -> do
403                r <- f x
404                return $
405                    if r
406                        then Emit s' x
407                        else Skip s'
408{-# INLINE filterMS #-}
409
410data SplitState seq s
411    = SplitDone
412    -- When no element of seq passes the predicate.  This allows
413    -- 'splitOnUnboundedES' to not run 'Seq.break' multiple times due
414    -- to 'Skip's being sent by the upstream.
415    | SplitNoSep seq s
416    | SplitState seq s
417
418splitOnUnboundedES :: (Monad m, Seq.IsSequence seq)
419                   => (Element seq -> Bool) -> StreamConduit seq m seq
420splitOnUnboundedES f (Stream step ms0) =
421    Stream step' (liftM (SplitState mempty) ms0)
422  where
423    step' SplitDone = return $ Stop ()
424    step' (SplitNoSep t s) = do
425        res <- step s
426        return $ case res of
427            Stop () | not (onull t) -> Emit SplitDone t
428                    | otherwise -> Stop ()
429            Skip s' -> Skip (SplitNoSep t s')
430            Emit s' t' -> Skip (SplitState (t `mappend` t') s')
431    step' (SplitState t s) = do
432        if onull y
433            then do
434                res <- step s
435                return $ case res of
436                    Stop () | not (onull t) -> Emit SplitDone t
437                            | otherwise -> Stop ()
438                    Skip s' -> Skip (SplitNoSep t s')
439                    Emit s' t' -> Skip (SplitState (t `mappend` t') s')
440            else return $ Emit (SplitState (Seq.drop 1 y) s) x
441      where
442        (x, y) = Seq.break f t
443{-# INLINE splitOnUnboundedES #-}
444
445-- | Streaming versions of @Data.Conduit.Combinators.Internal.initReplicate@
446initReplicateS :: Monad m => m seed -> (seed -> m a) -> Int -> StreamProducer m a
447initReplicateS mseed f cnt _ =
448    Stream step (liftM (cnt, ) mseed)
449  where
450    step (ix, _) | ix <= 0 = return $ Stop ()
451    step (ix, seed) = do
452        x <- f seed
453        return $ Emit (ix - 1, seed) x
454{-# INLINE initReplicateS #-}
455
456-- | Streaming versions of @Data.Conduit.Combinators.Internal.initRepeat@
457initRepeatS :: Monad m => m seed -> (seed -> m a) -> StreamProducer m a
458initRepeatS mseed f _ =
459    Stream step mseed
460  where
461    step seed = do
462        x <- f seed
463        return $ Emit seed x
464{-# INLINE initRepeatS #-}
465
466-- | Utility function
467fmapS :: Monad m
468      => (a -> b)
469      -> StreamConduitT i o m a
470      -> StreamConduitT i o m b
471fmapS f s inp =
472    case s inp of
473        Stream step ms0 -> Stream (fmap (liftM (fmap f)) step) ms0
474{-# INLINE fmapS #-}
475