1{-# LANGUAGE BangPatterns #-} 2{-# LANGUAGE TupleSections #-} 3{-# LANGUAGE RankNTypes #-} 4{-# LANGUAGE Trustworthy #-} 5module Data.Conduit.Internal.List.Stream where 6 7import Control.Monad (liftM) 8import Data.Conduit.Internal.Fusion 9import qualified Data.Foldable as F 10 11--FIXME: Should streamSource / streamSourcePure be used for sources? 12 13unfoldS :: Monad m 14 => (b -> Maybe (a, b)) 15 -> b 16 -> StreamProducer m a 17unfoldS f s0 _ = 18 Stream step (return s0) 19 where 20 step s = return $ 21 case f s of 22 Nothing -> Stop () 23 Just (x, s') -> Emit s' x 24{-# INLINE unfoldS #-} 25 26unfoldEitherS :: Monad m 27 => (b -> Either r (a, b)) 28 -> b 29 -> StreamConduitT i a m r 30unfoldEitherS f s0 _ = 31 Stream step (return s0) 32 where 33 step s = return $ 34 case f s of 35 Left r -> Stop r 36 Right (x, s') -> Emit s' x 37{-# INLINE unfoldEitherS #-} 38 39unfoldMS :: Monad m 40 => (b -> m (Maybe (a, b))) 41 -> b 42 -> StreamProducer m a 43unfoldMS f s0 _ = 44 Stream step (return s0) 45 where 46 step s = do 47 ms' <- f s 48 return $ case ms' of 49 Nothing -> Stop () 50 Just (x, s') -> Emit s' x 51{-# INLINE unfoldMS #-} 52 53unfoldEitherMS :: Monad m 54 => (b -> m (Either r (a, b))) 55 -> b 56 -> StreamConduitT i a m r 57unfoldEitherMS f s0 _ = 58 Stream step (return s0) 59 where 60 step s = do 61 ms' <- f s 62 return $ case ms' of 63 Left r -> Stop r 64 Right (x, s') -> Emit s' x 65{-# INLINE unfoldEitherMS #-} 66sourceListS :: Monad m => [a] -> StreamProducer m a 67sourceListS xs0 _ = 68 Stream (return . step) (return xs0) 69 where 70 step [] = Stop () 71 step (x:xs) = Emit xs x 72{-# INLINE sourceListS #-} 73 74enumFromToS :: (Enum a, Prelude.Ord a, Monad m) 75 => a 76 -> a 77 -> StreamProducer m a 78enumFromToS x0 y _ = 79 Stream step (return x0) 80 where 81 step x = return $ if x Prelude.> y 82 then Stop () 83 else Emit (Prelude.succ x) x 84{-# INLINE [0] enumFromToS #-} 85 86enumFromToS_int :: (Prelude.Integral a, Monad m) 87 => a 88 -> a 89 -> StreamProducer m a 90enumFromToS_int x0 y _ = x0 `seq` y `seq` Stream step (return x0) 91 where 92 step x | x <= y = return $ Emit (x Prelude.+ 1) x 93 | otherwise = return $ Stop () 94{-# INLINE enumFromToS_int #-} 95 96{-# RULES "conduit: enumFromTo<Int>" forall f t. 97 enumFromToS f t = enumFromToS_int f t :: Monad m => StreamProducer m Int 98 #-} 99 100iterateS :: Monad m => (a -> a) -> a -> StreamProducer m a 101iterateS f x0 _ = 102 Stream (return . step) (return x0) 103 where 104 step x = Emit x' x 105 where 106 x' = f x 107{-# INLINE iterateS #-} 108 109replicateS :: Monad m => Int -> a -> StreamProducer m a 110replicateS cnt0 a _ = 111 Stream step (return cnt0) 112 where 113 step cnt 114 | cnt <= 0 = return $ Stop () 115 | otherwise = return $ Emit (cnt - 1) a 116{-# INLINE replicateS #-} 117 118replicateMS :: Monad m => Int -> m a -> StreamProducer m a 119replicateMS cnt0 ma _ = 120 Stream step (return cnt0) 121 where 122 step cnt 123 | cnt <= 0 = return $ Stop () 124 | otherwise = Emit (cnt - 1) `liftM` ma 125{-# INLINE replicateMS #-} 126 127foldS :: Monad m => (b -> a -> b) -> b -> StreamConsumer a m b 128foldS f b0 (Stream step ms0) = 129 Stream step' (liftM (b0, ) ms0) 130 where 131 step' (!b, s) = do 132 res <- step s 133 return $ case res of 134 Stop () -> Stop b 135 Skip s' -> Skip (b, s') 136 Emit s' a -> Skip (f b a, s') 137{-# INLINE foldS #-} 138 139foldMS :: Monad m => (b -> a -> m b) -> b -> StreamConsumer a m b 140foldMS f b0 (Stream step ms0) = 141 Stream step' (liftM (b0, ) ms0) 142 where 143 step' (!b, s) = do 144 res <- step s 145 case res of 146 Stop () -> return $ Stop b 147 Skip s' -> return $ Skip (b, s') 148 Emit s' a -> do 149 b' <- f b a 150 return $ Skip (b', s') 151{-# INLINE foldMS #-} 152 153mapM_S :: Monad m 154 => (a -> m ()) 155 -> StreamConsumer a m () 156mapM_S f (Stream step ms0) = 157 Stream step' ms0 158 where 159 step' s = do 160 res <- step s 161 case res of 162 Stop () -> return $ Stop () 163 Skip s' -> return $ Skip s' 164 Emit s' x -> f x >> return (Skip s') 165{-# INLINE [1] mapM_S #-} 166 167dropS :: Monad m 168 => Int 169 -> StreamConsumer a m () 170dropS n0 (Stream step ms0) = 171 Stream step' (liftM (, n0) ms0) 172 where 173 step' (_, n) | n <= 0 = return $ Stop () 174 step' (s, n) = do 175 res <- step s 176 return $ case res of 177 Stop () -> Stop () 178 Skip s' -> Skip (s', n) 179 Emit s' _ -> Skip (s', n - 1) 180{-# INLINE dropS #-} 181 182takeS :: Monad m 183 => Int 184 -> StreamConsumer a m [a] 185takeS n0 (Stream step s0) = 186 Stream step' (liftM (id, n0,) s0) 187 where 188 step' (output, n, _) | n <= 0 = return $ Stop (output []) 189 step' (output, n, s) = do 190 res <- step s 191 return $ case res of 192 Stop () -> Stop (output []) 193 Skip s' -> Skip (output, n, s') 194 Emit s' x -> Skip (output . (x:), n - 1, s') 195{-# INLINE takeS #-} 196 197headS :: Monad m => StreamConsumer a m (Maybe a) 198headS (Stream step s0) = 199 Stream step' s0 200 where 201 step' s = do 202 res <- step s 203 return $ case res of 204 Stop () -> Stop Nothing 205 Skip s' -> Skip s' 206 Emit _ x -> Stop (Just x) 207{-# INLINE headS #-} 208 209mapS :: Monad m => (a -> b) -> StreamConduit a m b 210mapS f (Stream step ms0) = 211 Stream step' ms0 212 where 213 step' s = do 214 res <- step s 215 return $ case res of 216 Stop r -> Stop r 217 Emit s' a -> Emit s' (f a) 218 Skip s' -> Skip s' 219{-# INLINE mapS #-} 220 221mapMS :: Monad m => (a -> m b) -> StreamConduit a m b 222mapMS f (Stream step ms0) = 223 Stream step' ms0 224 where 225 step' s = do 226 res <- step s 227 case res of 228 Stop r -> return $ Stop r 229 Emit s' a -> Emit s' `liftM` f a 230 Skip s' -> return $ Skip s' 231{-# INLINE mapMS #-} 232 233iterMS :: Monad m => (a -> m ()) -> StreamConduit a m a 234iterMS f (Stream step ms0) = 235 Stream step' ms0 236 where 237 step' s = do 238 res <- step s 239 case res of 240 Stop () -> return $ Stop () 241 Skip s' -> return $ Skip s' 242 Emit s' x -> f x >> return (Emit s' x) 243{-# INLINE iterMS #-} 244 245mapMaybeS :: Monad m => (a -> Maybe b) -> StreamConduit a m b 246mapMaybeS f (Stream step ms0) = 247 Stream step' ms0 248 where 249 step' s = do 250 res <- step s 251 return $ case res of 252 Stop () -> Stop () 253 Skip s' -> Skip s' 254 Emit s' x -> 255 case f x of 256 Just y -> Emit s' y 257 Nothing -> Skip s' 258{-# INLINE mapMaybeS #-} 259 260mapMaybeMS :: Monad m => (a -> m (Maybe b)) -> StreamConduit a m b 261mapMaybeMS f (Stream step ms0) = 262 Stream step' ms0 263 where 264 step' s = do 265 res <- step s 266 case res of 267 Stop () -> return $ Stop () 268 Skip s' -> return $ Skip s' 269 Emit s' x -> do 270 my <- f x 271 case my of 272 Just y -> return $ Emit s' y 273 Nothing -> return $ Skip s' 274{-# INLINE mapMaybeMS #-} 275 276catMaybesS :: Monad m => StreamConduit (Maybe a) m a 277catMaybesS (Stream step ms0) = 278 Stream step' ms0 279 where 280 step' s = do 281 res <- step s 282 return $ case res of 283 Stop () -> Stop () 284 Skip s' -> Skip s' 285 Emit s' Nothing -> Skip s' 286 Emit s' (Just x) -> Emit s' x 287{-# INLINE catMaybesS #-} 288 289concatS :: (Monad m, F.Foldable f) => StreamConduit (f a) m a 290concatS (Stream step ms0) = 291 Stream step' (liftM ([], ) ms0) 292 where 293 step' ([], s) = do 294 res <- step s 295 return $ case res of 296 Stop () -> Stop () 297 Skip s' -> Skip ([], s') 298 Emit s' x -> Skip (F.toList x, s') 299 step' ((x:xs), s) = return (Emit (xs, s) x) 300{-# INLINE concatS #-} 301 302concatMapS :: Monad m => (a -> [b]) -> StreamConduit a m b 303concatMapS f (Stream step ms0) = 304 Stream step' (liftM ([], ) ms0) 305 where 306 step' ([], s) = do 307 res <- step s 308 return $ case res of 309 Stop () -> Stop () 310 Skip s' -> Skip ([], s') 311 Emit s' x -> Skip (f x, s') 312 step' ((x:xs), s) = return (Emit (xs, s) x) 313{-# INLINE concatMapS #-} 314 315concatMapMS :: Monad m => (a -> m [b]) -> StreamConduit a m b 316concatMapMS f (Stream step ms0) = 317 Stream step' (liftM ([], ) ms0) 318 where 319 step' ([], s) = do 320 res <- step s 321 case res of 322 Stop () -> return $ Stop () 323 Skip s' -> return $ Skip ([], s') 324 Emit s' x -> do 325 xs <- f x 326 return $ Skip (xs, s') 327 step' ((x:xs), s) = return (Emit (xs, s) x) 328{-# INLINE concatMapMS #-} 329 330concatMapAccumS :: Monad m => (a -> accum -> (accum, [b])) -> accum -> StreamConduit a m b 331concatMapAccumS f initial (Stream step ms0) = 332 Stream step' (liftM (initial, [], ) ms0) 333 where 334 step' (accum, [], s) = do 335 res <- step s 336 return $ case res of 337 Stop () -> Stop () 338 Skip s' -> Skip (accum, [], s') 339 Emit s' x -> 340 let (accum', xs) = f x accum 341 in Skip (accum', xs, s') 342 step' (accum, (x:xs), s) = return (Emit (accum, xs, s) x) 343{-# INLINE concatMapAccumS #-} 344 345mapAccumS :: Monad m => (a -> s -> (s, b)) -> s -> StreamConduitT a b m s 346mapAccumS f initial (Stream step ms0) = 347 Stream step' (liftM (initial, ) ms0) 348 where 349 step' (accum, s) = do 350 res <- step s 351 return $ case res of 352 Stop () -> Stop accum 353 Skip s' -> Skip (accum, s') 354 Emit s' x -> 355 let (accum', r) = f x accum 356 in Emit (accum', s') r 357{-# INLINE mapAccumS #-} 358 359mapAccumMS :: Monad m => (a -> s -> m (s, b)) -> s -> StreamConduitT a b m s 360mapAccumMS f initial (Stream step ms0) = 361 Stream step' (liftM (initial, ) ms0) 362 where 363 step' (accum, s) = do 364 res <- step s 365 case res of 366 Stop () -> return $ Stop accum 367 Skip s' -> return $ Skip (accum, s') 368 Emit s' x -> do 369 (accum', r) <- f x accum 370 return $ Emit (accum', s') r 371{-# INLINE mapAccumMS #-} 372 373concatMapAccumMS :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> StreamConduit a m b 374concatMapAccumMS f initial (Stream step ms0) = 375 Stream step' (liftM (initial, [], ) ms0) 376 where 377 step' (accum, [], s) = do 378 res <- step s 379 case res of 380 Stop () -> return $ Stop () 381 Skip s' -> return $ Skip (accum, [], s') 382 Emit s' x -> do 383 (accum', xs) <- f x accum 384 return $ Skip (accum', xs, s') 385 step' (accum, (x:xs), s) = return (Emit (accum, xs, s) x) 386{-# INLINE concatMapAccumMS #-} 387 388mapFoldableS :: (Monad m, F.Foldable f) => (a -> f b) -> StreamConduit a m b 389mapFoldableS f (Stream step ms0) = 390 Stream step' (liftM ([], ) ms0) 391 where 392 step' ([], s) = do 393 res <- step s 394 return $ case res of 395 Stop () -> Stop () 396 Skip s' -> Skip ([], s') 397 Emit s' x -> Skip (F.toList (f x), s') 398 step' ((x:xs), s) = return (Emit (xs, s) x) 399{-# INLINE mapFoldableS #-} 400 401mapFoldableMS :: (Monad m, F.Foldable f) => (a -> m (f b)) -> StreamConduit a m b 402mapFoldableMS f (Stream step ms0) = 403 Stream step' (liftM ([], ) ms0) 404 where 405 step' ([], s) = do 406 res <- step s 407 case res of 408 Stop () -> return $ Stop () 409 Skip s' -> return $ Skip ([], s') 410 Emit s' x -> do 411 y <- f x 412 return $ Skip (F.toList y, s') 413 step' ((x:xs), s) = return (Emit (xs, s) x) 414{-# INLINE mapFoldableMS #-} 415 416consumeS :: Monad m => StreamConsumer a m [a] 417consumeS (Stream step ms0) = 418 Stream step' (liftM (id,) ms0) 419 where 420 step' (front, s) = do 421 res <- step s 422 return $ case res of 423 Stop () -> Stop (front []) 424 Skip s' -> Skip (front, s') 425 Emit s' a -> Skip (front . (a:), s') 426{-# INLINE consumeS #-} 427 428groupByS :: Monad m => (a -> a -> Bool) -> StreamConduit a m [a] 429groupByS f = mapS (Prelude.uncurry (:)) . groupBy1S id f 430{-# INLINE groupByS #-} 431 432groupOn1S :: (Monad m, Eq b) => (a -> b) -> StreamConduit a m (a, [a]) 433groupOn1S f = groupBy1S f (==) 434{-# INLINE groupOn1S #-} 435 436data GroupByState a b s 437 = GBStart s 438 | GBLoop ([a] -> [a]) a b s 439 | GBDone 440 441groupBy1S :: Monad m => (a -> b) -> (b -> b -> Bool) -> StreamConduit a m (a, [a]) 442groupBy1S f eq (Stream step ms0) = 443 Stream step' (liftM GBStart ms0) 444 where 445 step' (GBStart s) = do 446 res <- step s 447 return $ case res of 448 Stop () -> Stop () 449 Skip s' -> Skip (GBStart s') 450 Emit s' x0 -> Skip (GBLoop id x0 (f x0) s') 451 step' (GBLoop rest x0 fx0 s) = do 452 res <- step s 453 return $ case res of 454 Stop () -> Emit GBDone (x0, rest []) 455 Skip s' -> Skip (GBLoop rest x0 fx0 s') 456 Emit s' x 457 | fx0 `eq` f x -> Skip (GBLoop (rest . (x:)) x0 fx0 s') 458 | otherwise -> Emit (GBLoop id x (f x) s') (x0, rest []) 459 step' GBDone = return $ Stop () 460{-# INLINE groupBy1S #-} 461 462isolateS :: Monad m => Int -> StreamConduit a m a 463isolateS count (Stream step ms0) = 464 Stream step' (liftM (count,) ms0) 465 where 466 step' (n, _) | n <= 0 = return $ Stop () 467 step' (n, s) = do 468 res <- step s 469 return $ case res of 470 Stop () -> Stop () 471 Skip s' -> Skip (n, s') 472 Emit s' x -> Emit (n - 1, s') x 473{-# INLINE isolateS #-} 474 475filterS :: Monad m => (a -> Bool) -> StreamConduit a m a 476filterS f (Stream step ms0) = 477 Stream step' ms0 478 where 479 step' s = do 480 res <- step s 481 return $ case res of 482 Stop () -> Stop () 483 Skip s' -> Skip s' 484 Emit s' x 485 | f x -> Emit s' x 486 | otherwise -> Skip s' 487 488sinkNullS :: Monad m => StreamConsumer a m () 489sinkNullS (Stream step ms0) = 490 Stream step' ms0 491 where 492 step' s = do 493 res <- step s 494 return $ case res of 495 Stop () -> Stop () 496 Skip s' -> Skip s' 497 Emit s' _ -> Skip s' 498{-# INLINE sinkNullS #-} 499 500sourceNullS :: Monad m => StreamProducer m a 501sourceNullS _ = Stream (\_ -> return (Stop ())) (return ()) 502{-# INLINE sourceNullS #-} 503