1{-# LANGUAGE CPP #-} 2{-# LANGUAGE RankNTypes #-} 3{-# LANGUAGE TupleSections #-} 4{-# LANGUAGE ViewPatterns #-} 5{-# LANGUAGE BangPatterns #-} 6{-# LANGUAGE TypeFamilies #-} 7-- | These are stream fusion versions of some of the functions in 8-- "Data.Conduit.Combinators". Many functions don't have stream 9-- versions here because instead they have @RULES@ which inline a 10-- definition that fuses. 11module Data.Conduit.Combinators.Stream 12 ( yieldManyS 13 , repeatMS 14 , repeatWhileMS 15 , foldl1S 16 , allS 17 , anyS 18 , sinkLazyS 19 , sinkVectorS 20 , sinkVectorNS 21 , sinkLazyBuilderS 22 , lastS 23 , lastES 24 , findS 25 , concatMapS 26 , concatMapMS 27 , concatS 28 , scanlS 29 , scanlMS 30 , mapAccumWhileS 31 , mapAccumWhileMS 32 , intersperseS 33 , slidingWindowS 34 , filterMS 35 , splitOnUnboundedES 36 , initReplicateS 37 , initRepeatS 38 ) 39 where 40 41-- BEGIN IMPORTS 42 43import Control.Monad (liftM) 44import Control.Monad.Primitive (PrimMonad) 45import qualified Data.ByteString.Lazy as BL 46import Data.ByteString.Builder (Builder, toLazyByteString) 47import Data.Conduit.Internal.Fusion 48import Data.Conduit.Internal.List.Stream (foldS) 49import Data.Maybe (isNothing, isJust) 50import Data.MonoTraversable 51#if ! MIN_VERSION_base(4,8,0) 52import Data.Monoid (Monoid (..)) 53#endif 54import qualified Data.NonNull as NonNull 55import qualified Data.Sequences as Seq 56import qualified Data.Vector.Generic as V 57import qualified Data.Vector.Generic.Mutable as VM 58import Prelude 59 60#if MIN_VERSION_mono_traversable(1,0,0) 61import Data.Sequences (LazySequence (..)) 62#else 63import Data.Sequences.Lazy 64#endif 65 66-- END IMPORTS 67 68yieldManyS :: (Monad m, MonoFoldable mono) 69 => mono 70 -> StreamProducer m (Element mono) 71yieldManyS mono _ = 72 Stream (return . step) (return (otoList mono)) 73 where 74 step [] = Stop () 75 step (x:xs) = Emit xs x 76{-# INLINE yieldManyS #-} 77 78repeatMS :: Monad m 79 => m a 80 -> StreamProducer m a 81repeatMS m _ = 82 Stream step (return ()) 83 where 84 step _ = liftM (Emit ()) m 85{-# INLINE repeatMS #-} 86 87repeatWhileMS :: Monad m 88 => m a 89 -> (a -> Bool) 90 -> StreamProducer m a 91repeatWhileMS m f _ = 92 Stream step (return ()) 93 where 94 step _ = do 95 x <- m 96 return $ if f x 97 then Emit () x 98 else Stop () 99{-# INLINE repeatWhileMS #-} 100 101foldl1S :: Monad m 102 => (a -> a -> a) 103 -> StreamConsumer a m (Maybe a) 104foldl1S f (Stream step ms0) = 105 Stream step' (liftM (Nothing, ) ms0) 106 where 107 step' (mprev, s) = do 108 res <- step s 109 return $ case res of 110 Stop () -> Stop mprev 111 Skip s' -> Skip (mprev, s') 112 Emit s' a -> Skip (Just $ maybe a (`f` a) mprev, s') 113{-# INLINE foldl1S #-} 114 115allS :: Monad m 116 => (a -> Bool) 117 -> StreamConsumer a m Bool 118allS f = fmapS isNothing (findS (Prelude.not . f)) 119{-# INLINE allS #-} 120 121anyS :: Monad m 122 => (a -> Bool) 123 -> StreamConsumer a m Bool 124anyS f = fmapS isJust (findS f) 125{-# INLINE anyS #-} 126 127--TODO: use a definition like 128-- fmapS (fromChunks . ($ [])) <$> CL.fold (\front next -> front . (next:)) id 129 130sinkLazyS :: (Monad m, LazySequence lazy strict) 131 => StreamConsumer strict m lazy 132sinkLazyS = fmapS (fromChunks . ($ [])) $ foldS (\front next -> front . (next:)) id 133{-# INLINE sinkLazyS #-} 134 135sinkVectorS :: (V.Vector v a, PrimMonad m) 136 => StreamConsumer a m (v a) 137sinkVectorS (Stream step ms0) = do 138 Stream step' $ do 139 s0 <- ms0 140 mv0 <- VM.new initSize 141 return (initSize, 0, mv0, s0) 142 where 143 initSize = 10 144 step' (maxSize, i, mv, s) = do 145 res <- step s 146 case res of 147 Stop () -> liftM (Stop . V.slice 0 i) $ V.unsafeFreeze mv 148 Skip s' -> return $ Skip (maxSize, i, mv, s') 149 Emit s' x -> do 150 VM.write mv i x 151 let i' = i + 1 152 if i' >= maxSize 153 then do 154 let newMax = maxSize * 2 155 mv' <- VM.grow mv maxSize 156 return $ Skip (newMax, i', mv', s') 157 else return $ Skip (maxSize, i', mv, s') 158{-# INLINE sinkVectorS #-} 159 160sinkVectorNS :: (V.Vector v a, PrimMonad m) 161 => Int -- ^ maximum allowed size 162 -> StreamConsumer a m (v a) 163sinkVectorNS maxSize (Stream step ms0) = do 164 Stream step' $ do 165 s0 <- ms0 166 mv0 <- VM.new maxSize 167 return (0, mv0, s0) 168 where 169 step' (i, mv, _) | i >= maxSize = liftM Stop $ V.unsafeFreeze mv 170 step' (i, mv, s) = do 171 res <- step s 172 case res of 173 Stop () -> liftM (Stop . V.slice 0 i) $ V.unsafeFreeze mv 174 Skip s' -> return $ Skip (i, mv, s') 175 Emit s' x -> do 176 VM.write mv i x 177 let i' = i + 1 178 return $ Skip (i', mv, s') 179{-# INLINE sinkVectorNS #-} 180 181sinkLazyBuilderS :: Monad m => StreamConsumer Builder m BL.ByteString 182sinkLazyBuilderS = fmapS toLazyByteString (foldS mappend mempty) 183{-# INLINE sinkLazyBuilderS #-} 184 185lastS :: Monad m 186 => StreamConsumer a m (Maybe a) 187lastS (Stream step ms0) = 188 Stream step' (liftM (Nothing,) ms0) 189 where 190 step' (mlast, s) = do 191 res <- step s 192 return $ case res of 193 Stop () -> Stop mlast 194 Skip s' -> Skip (mlast, s') 195 Emit s' x -> Skip (Just x, s') 196{-# INLINE lastS #-} 197 198lastES :: (Monad m, Seq.IsSequence seq) 199 => StreamConsumer seq m (Maybe (Element seq)) 200lastES (Stream step ms0) = 201 Stream step' (liftM (Nothing, ) ms0) 202 where 203 step' (mlast, s) = do 204 res <- step s 205 return $ case res of 206 Stop () -> Stop (fmap NonNull.last mlast) 207 Skip s' -> Skip (mlast, s') 208 Emit s' (NonNull.fromNullable -> mlast'@(Just _)) -> Skip (mlast', s') 209 Emit s' _ -> Skip (mlast, s') 210{-# INLINE lastES #-} 211 212findS :: Monad m 213 => (a -> Bool) -> StreamConsumer a m (Maybe a) 214findS f (Stream step ms0) = 215 Stream step' ms0 216 where 217 step' s = do 218 res <- step s 219 return $ case res of 220 Stop () -> Stop Nothing 221 Skip s' -> Skip s' 222 Emit s' x -> 223 if f x 224 then Stop (Just x) 225 else Skip s' 226{-# INLINE findS #-} 227 228concatMapS :: (Monad m, MonoFoldable mono) 229 => (a -> mono) 230 -> StreamConduit a m (Element mono) 231concatMapS f (Stream step ms0) = 232 Stream step' (liftM ([], ) ms0) 233 where 234 step' ([], s) = do 235 res <- step s 236 return $ case res of 237 Stop () -> Stop () 238 Skip s' -> Skip ([], s') 239 Emit s' x -> Skip (otoList (f x), s') 240 step' ((x:xs), s) = return (Emit (xs, s) x) 241{-# INLINE concatMapS #-} 242 243concatMapMS :: (Monad m, MonoFoldable mono) 244 => (a -> m mono) 245 -> StreamConduit a m (Element mono) 246concatMapMS f (Stream step ms0) = 247 Stream step' (liftM ([], ) ms0) 248 where 249 step' ([], s) = do 250 res <- step s 251 case res of 252 Stop () -> return $ Stop () 253 Skip s' -> return $ Skip ([], s') 254 Emit s' x -> do 255 o <- f x 256 return $ Skip (otoList o, s') 257 step' ((x:xs), s) = return (Emit (xs, s) x) 258{-# INLINE concatMapMS #-} 259 260concatS :: (Monad m, MonoFoldable mono) 261 => StreamConduit mono m (Element mono) 262concatS = concatMapS id 263{-# INLINE concatS #-} 264 265data ScanState a s 266 = ScanEnded 267 | ScanContinues a s 268 269scanlS :: Monad m => (a -> b -> a) -> a -> StreamConduit b m a 270scanlS f seed0 (Stream step ms0) = 271 Stream step' (liftM (ScanContinues seed0) ms0) 272 where 273 step' ScanEnded = return $ Stop () 274 step' (ScanContinues seed s) = do 275 res <- step s 276 return $ case res of 277 Stop () -> Emit ScanEnded seed 278 Skip s' -> Skip (ScanContinues seed s') 279 Emit s' x -> Emit (ScanContinues seed' s') seed 280 where 281 !seed' = f seed x 282{-# INLINE scanlS #-} 283 284scanlMS :: Monad m => (a -> b -> m a) -> a -> StreamConduit b m a 285scanlMS f seed0 (Stream step ms0) = 286 Stream step' (liftM (ScanContinues seed0) ms0) 287 where 288 step' ScanEnded = return $ Stop () 289 step' (ScanContinues seed s) = do 290 res <- step s 291 case res of 292 Stop () -> return $ Emit ScanEnded seed 293 Skip s' -> return $ Skip (ScanContinues seed s') 294 Emit s' x -> do 295 !seed' <- f seed x 296 return $ Emit (ScanContinues seed' s') seed 297{-# INLINE scanlMS #-} 298 299mapAccumWhileS :: Monad m => 300 (a -> s -> Either s (s, b)) -> s -> StreamConduitT a b m s 301mapAccumWhileS f initial (Stream step ms0) = 302 Stream step' (liftM (initial, ) ms0) 303 where 304 step' (!accum, s) = do 305 res <- step s 306 return $ case res of 307 Stop () -> Stop accum 308 Skip s' -> Skip (accum, s') 309 Emit s' x -> case f x accum of 310 Right (!accum', r) -> Emit (accum', s') r 311 Left !accum' -> Stop accum' 312{-# INLINE mapAccumWhileS #-} 313 314mapAccumWhileMS :: Monad m => 315 (a -> s -> m (Either s (s, b))) -> s -> StreamConduitT a b m s 316mapAccumWhileMS f initial (Stream step ms0) = 317 Stream step' (liftM (initial, ) ms0) 318 where 319 step' (!accum, s) = do 320 res <- step s 321 case res of 322 Stop () -> return $ Stop accum 323 Skip s' -> return $ Skip (accum, s') 324 Emit s' x -> do 325 lr <- f x accum 326 return $ case lr of 327 Right (!accum', r) -> Emit (accum', s') r 328 Left !accum' -> Stop accum' 329{-# INLINE mapAccumWhileMS #-} 330 331data IntersperseState a s 332 = IFirstValue s 333 | IGotValue s a 334 | IEmitValue s a 335 336intersperseS :: Monad m => a -> StreamConduit a m a 337intersperseS sep (Stream step ms0) = 338 Stream step' (liftM IFirstValue ms0) 339 where 340 step' (IFirstValue s) = do 341 res <- step s 342 return $ case res of 343 Stop () -> Stop () 344 Skip s' -> Skip (IFirstValue s') 345 Emit s' x -> Emit (IGotValue s' x) x 346 -- Emit the separator once we know it's not the end of the list. 347 step' (IGotValue s x) = do 348 res <- step s 349 return $ case res of 350 Stop () -> Stop () 351 Skip s' -> Skip (IGotValue s' x) 352 Emit s' x' -> Emit (IEmitValue s' x') sep 353 -- We emitted a separator, now emit the value that comes after. 354 step' (IEmitValue s x) = return $ Emit (IGotValue s x) x 355{-# INLINE intersperseS #-} 356 357data SlidingWindowState seq s 358 = SWInitial Int seq s 359 | SWSliding seq s 360 | SWEarlyExit 361 362slidingWindowS :: (Monad m, Seq.IsSequence seq, Element seq ~ a) => Int -> StreamConduit a m seq 363slidingWindowS sz (Stream step ms0) = 364 Stream step' (liftM (SWInitial (max 1 sz) mempty) ms0) 365 where 366 step' (SWInitial n st s) = do 367 res <- step s 368 return $ case res of 369 Stop () -> Emit SWEarlyExit st 370 Skip s' -> Skip (SWInitial n st s') 371 Emit s' x -> 372 if n == 1 373 then Emit (SWSliding (Seq.unsafeTail st') s') st' 374 else Skip (SWInitial (n - 1) st' s') 375 where 376 st' = Seq.snoc st x 377 -- After collecting the initial window, each upstream element 378 -- causes an additional window to be yielded. 379 step' (SWSliding st s) = do 380 res <- step s 381 return $ case res of 382 Stop () -> Stop () 383 Skip s' -> Skip (SWSliding st s') 384 Emit s' x -> Emit (SWSliding (Seq.unsafeTail st') s') st' 385 where 386 st' = Seq.snoc st x 387 step' SWEarlyExit = return $ Stop () 388 389{-# INLINE slidingWindowS #-} 390 391filterMS :: Monad m 392 => (a -> m Bool) 393 -> StreamConduit a m a 394filterMS f (Stream step ms0) = do 395 Stream step' ms0 396 where 397 step' s = do 398 res <- step s 399 case res of 400 Stop () -> return $ Stop () 401 Skip s' -> return $ Skip s' 402 Emit s' x -> do 403 r <- f x 404 return $ 405 if r 406 then Emit s' x 407 else Skip s' 408{-# INLINE filterMS #-} 409 410data SplitState seq s 411 = SplitDone 412 -- When no element of seq passes the predicate. This allows 413 -- 'splitOnUnboundedES' to not run 'Seq.break' multiple times due 414 -- to 'Skip's being sent by the upstream. 415 | SplitNoSep seq s 416 | SplitState seq s 417 418splitOnUnboundedES :: (Monad m, Seq.IsSequence seq) 419 => (Element seq -> Bool) -> StreamConduit seq m seq 420splitOnUnboundedES f (Stream step ms0) = 421 Stream step' (liftM (SplitState mempty) ms0) 422 where 423 step' SplitDone = return $ Stop () 424 step' (SplitNoSep t s) = do 425 res <- step s 426 return $ case res of 427 Stop () | not (onull t) -> Emit SplitDone t 428 | otherwise -> Stop () 429 Skip s' -> Skip (SplitNoSep t s') 430 Emit s' t' -> Skip (SplitState (t `mappend` t') s') 431 step' (SplitState t s) = do 432 if onull y 433 then do 434 res <- step s 435 return $ case res of 436 Stop () | not (onull t) -> Emit SplitDone t 437 | otherwise -> Stop () 438 Skip s' -> Skip (SplitNoSep t s') 439 Emit s' t' -> Skip (SplitState (t `mappend` t') s') 440 else return $ Emit (SplitState (Seq.drop 1 y) s) x 441 where 442 (x, y) = Seq.break f t 443{-# INLINE splitOnUnboundedES #-} 444 445-- | Streaming versions of @Data.Conduit.Combinators.Internal.initReplicate@ 446initReplicateS :: Monad m => m seed -> (seed -> m a) -> Int -> StreamProducer m a 447initReplicateS mseed f cnt _ = 448 Stream step (liftM (cnt, ) mseed) 449 where 450 step (ix, _) | ix <= 0 = return $ Stop () 451 step (ix, seed) = do 452 x <- f seed 453 return $ Emit (ix - 1, seed) x 454{-# INLINE initReplicateS #-} 455 456-- | Streaming versions of @Data.Conduit.Combinators.Internal.initRepeat@ 457initRepeatS :: Monad m => m seed -> (seed -> m a) -> StreamProducer m a 458initRepeatS mseed f _ = 459 Stream step mseed 460 where 461 step seed = do 462 x <- f seed 463 return $ Emit seed x 464{-# INLINE initRepeatS #-} 465 466-- | Utility function 467fmapS :: Monad m 468 => (a -> b) 469 -> StreamConduitT i o m a 470 -> StreamConduitT i o m b 471fmapS f s inp = 472 case s inp of 473 Stream step ms0 -> Stream (fmap (liftM (fmap f)) step) ms0 474{-# INLINE fmapS #-} 475