1-- | Generic stream manipulations 2 3{-# LANGUAGE BangPatterns #-} 4{-# LANGUAGE DeriveDataTypeable #-} 5{-# LANGUAGE RankNTypes #-} 6 7module System.IO.Streams.Combinators 8 ( -- * Folds 9 inputFoldM 10 , outputFoldM 11 , fold 12 , foldM 13 , fold_ 14 , foldM_ 15 , any 16 , all 17 , maximum 18 , minimum 19 20 -- * Unfolds 21 , unfoldM 22 23 -- * Maps 24 , map 25 , mapM 26 , mapM_ 27 , mapMaybe 28 , contramap 29 , contramapM 30 , contramapM_ 31 , contramapMaybe 32 33 -- * Filter 34 , filter 35 , filterM 36 , filterOutput 37 , filterOutputM 38 39 -- * Takes and drops 40 , give 41 , take 42 , drop 43 , ignore 44 45 -- * Zip and unzip 46 , zip 47 , zipWith 48 , zipWithM 49 , unzip 50 51 -- * Utility 52 , intersperse 53 , skipToEof 54 , ignoreEof 55 , atEndOfInput 56 , atEndOfOutput 57 ) where 58 59------------------------------------------------------------------------------ 60import Control.Concurrent.MVar (newMVar, withMVar) 61import Control.Monad (liftM, void, when) 62import Control.Monad.IO.Class (liftIO) 63import Data.Int (Int64) 64import Data.IORef (IORef, atomicModifyIORef, modifyIORef, newIORef, readIORef, writeIORef) 65import Data.Maybe (isJust) 66import Prelude hiding (all, any, drop, filter, map, mapM, mapM_, maximum, minimum, read, take, unzip, zip, zipWith) 67------------------------------------------------------------------------------ 68import System.IO.Streams.Internal (InputStream (..), OutputStream (..), fromGenerator, makeInputStream, makeOutputStream, read, unRead, write, yield) 69 70 71------------------------------------------------------------------------------ 72-- | A side-effecting fold over an 'OutputStream', as a stream transformer. 73-- 74-- The IO action returned by 'outputFoldM' can be used to fetch and reset the updated 75-- seed value. Example: 76-- 77-- @ 78-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3::Int] 79-- ghci> (os, getList) <- Streams.'System.IO.Streams.List.listOutputStream' 80-- ghci> (os', getSeed) \<- Streams.'outputFoldM' (\\x y -> return (x+y)) 0 os 81-- ghci> Streams.'System.IO.Streams.connect' is os' 82-- ghci> getList 83-- [1,2,3] 84-- ghci> getSeed 85-- 6 86-- @ 87outputFoldM :: (a -> b -> IO a) -- ^ fold function 88 -> a -- ^ initial seed 89 -> OutputStream b -- ^ output stream 90 -> IO (OutputStream b, IO a) -- ^ returns a new stream as well as 91 -- an IO action to fetch and reset the 92 -- updated seed value. 93outputFoldM f initial stream = do 94 ref <- newIORef initial 95 os <- makeOutputStream (wr ref) 96 return (os, fetch ref) 97 98 where 99 wr _ Nothing = write Nothing stream 100 wr ref mb@(Just x) = do 101 !z <- readIORef ref 102 !z' <- f z x 103 writeIORef ref z' 104 write mb stream 105 106 fetch ref = atomicModifyIORef ref $ \x -> (initial, x) 107 108 109------------------------------------------------------------------------------ 110-- | A side-effecting fold over an 'InputStream', as a stream transformer. 111-- 112-- The IO action returned by 'inputFoldM' can be used to fetch and reset the updated seed 113-- value. Example: 114-- 115-- @ 116-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3::Int] 117-- ghci> (is', getSeed) \<- Streams.'inputFoldM' (\\x y -> return (x+y)) 0 is 118-- ghci> Streams.'System.IO.Streams.List.toList' is' 119-- [1,2,3] 120-- ghci> getSeed 121-- 6 122-- @ 123inputFoldM :: (a -> b -> IO a) -- ^ fold function 124 -> a -- ^ initial seed 125 -> InputStream b -- ^ input stream 126 -> IO (InputStream b, IO a) -- ^ returns a new stream as well as an 127 -- IO action to fetch and reset the 128 -- updated seed value. 129inputFoldM f initial stream = do 130 ref <- newIORef initial 131 is <- makeInputStream (rd ref) 132 return (is, fetch ref) 133 134 where 135 twiddle _ Nothing = return Nothing 136 137 twiddle ref mb@(Just x) = do 138 !z <- readIORef ref 139 !z' <- f z x 140 writeIORef ref z' 141 return mb 142 143 rd ref = read stream >>= twiddle ref 144 145 fetch ref = atomicModifyIORef ref $ \x -> (initial, x) 146 147 148------------------------------------------------------------------------------ 149-- | A left fold over an input stream. The input stream is fully consumed. See 150-- 'Prelude.foldl'. 151-- 152-- Example: 153-- 154-- @ 155-- ghci> Streams.'System.IO.Streams.fromList' [1..10] >>= Streams.'fold' (+) 0 156-- 55 157-- @ 158fold :: (s -> a -> s) -- ^ fold function 159 -> s -- ^ initial seed 160 -> InputStream a -- ^ input stream 161 -> IO s 162fold f seed stream = go seed 163 where 164 go !s = read stream >>= maybe (return s) (go . f s) 165 166 167------------------------------------------------------------------------------ 168-- | A side-effecting left fold over an input stream. The input stream is fully 169-- consumed. See 'Prelude.foldl'. 170-- 171-- Example: 172-- 173-- @ 174-- ghci> Streams.'System.IO.Streams.fromList' [1..10] >>= Streams.'foldM' (\x y -> 'return' (x + y)) 0 175-- 55 176-- @ 177foldM :: (s -> a -> IO s) -- ^ fold function 178 -> s -- ^ initial seed 179 -> InputStream a -- ^ input stream 180 -> IO s 181foldM f seed stream = go seed 182 where 183 go !s = read stream >>= maybe (return s) ((go =<<) . f s) 184 185 186------------------------------------------------------------------------------ 187-- | A variant of 'System.IO.Streams.fold' suitable for use with composable folds 188-- from \'beautiful folding\' libraries like 189-- <http://hackage.haskell.org/package/foldl the foldl library>. 190-- The input stream is fully consumed. 191-- 192-- Example: 193-- 194-- @ 195-- ghci> let folds = liftA3 (,,) Foldl.length Foldl.mean Foldl.maximum 196-- ghci> Streams.'System.IO.Streams.fromList' [1..10::Double] >>= Foldl.purely Streams.'System.IO.Streams.fold_' folds is 197-- ghci> (10,5.5,Just 10.0) 198-- @ 199-- 200-- /Since 1.3.6.0/ 201-- 202fold_ :: (x -> a -> x) -- ^ accumulator update function 203 -> x -- ^ initial seed 204 -> (x -> s) -- ^ recover folded value 205 -> InputStream a -- ^ input stream 206 -> IO s 207fold_ op seed done stream = liftM done (go seed) 208 where 209 go !s = read stream >>= maybe (return s) (go . op s) 210 211 212------------------------------------------------------------------------------ 213-- | A variant of 'System.IO.Streams.foldM' suitable for use with composable folds 214-- from \'beautiful folding\' libraries like 215-- <http://hackage.haskell.org/package/foldl the foldl library>. 216-- The input stream is fully consumed. 217-- 218-- Example: 219-- 220-- @ 221-- ghci> let folds = Foldl.mapM_ print *> Foldl.generalize (liftA2 (,) Foldl.sum Foldl.mean) 222-- ghci> Streams.'System.IO.Streams.fromList' [1..3::Double] >>= Foldl.impurely Streams.'System.IO.Streams.foldM_' folds 223-- 1.0 224-- 2.0 225-- 3.0 226-- (6.0,2.0) 227-- @ 228-- 229-- /Since 1.3.6.0/ 230-- 231foldM_ :: (x -> a -> IO x) -- ^ accumulator update action 232 -> IO x -- ^ initial seed 233 -> (x -> IO s) -- ^ recover folded value 234 -> InputStream a -- ^ input stream 235 -> IO s 236foldM_ f seed done stream = seed >>= go 237 where 238 go !x = read stream >>= maybe (done x) ((go =<<) . f x) 239 240 241------------------------------------------------------------------------------ 242-- | @any predicate stream@ returns 'True' if any element in @stream@ matches 243-- the predicate. 244-- 245-- 'any' consumes as few elements as possible, ending consumption if an element 246-- satisfies the predicate. 247-- 248-- @ 249-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3] 250-- ghci> Streams.'System.IO.Streams.Combinators.any' (> 0) is -- Consumes one element 251-- True 252-- ghci> Streams.'System.IO.Streams.read' is 253-- Just 2 254-- ghci> Streams.'System.IO.Streams.Combinators.any' even is -- Only 3 remains 255-- False 256-- @ 257any :: (a -> Bool) -> InputStream a -> IO Bool 258any predicate stream = go 259 where 260 go = do 261 mElem <- read stream 262 case mElem of 263 Nothing -> return False 264 Just e -> if predicate e then return True else go 265 266 267------------------------------------------------------------------------------ 268-- | @all predicate stream@ returns 'True' if every element in @stream@ matches 269-- the predicate. 270-- 271-- 'all' consumes as few elements as possible, ending consumption if any element 272-- fails the predicate. 273-- 274-- @ 275-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3] 276-- ghci> Streams.'System.IO.Streams.Combinators.all' (< 0) is -- Consumes one element 277-- False 278-- ghci> Streams.'System.IO.Streams.read' is 279-- Just 2 280-- ghci> Streams.'System.IO.Streams.Combinators.all' odd is -- Only 3 remains 281-- True 282-- @ 283all :: (a -> Bool) -> InputStream a -> IO Bool 284all predicate stream = go 285 where 286 go = do 287 mElem <- read stream 288 case mElem of 289 Nothing -> return True 290 Just e -> if predicate e then go else return False 291 292 293------------------------------------------------------------------------------ 294-- | @maximum stream@ returns the greatest element in @stream@ or 'Nothing' if 295-- the stream is empty. 296-- 297-- 'maximum' consumes the entire stream. 298-- 299-- @ 300-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3] 301-- ghci> Streams.'System.IO.Streams.Combinators.maximum' is 302-- 3 303-- ghci> Streams.'System.IO.Streams.read' is -- The stream is now empty 304-- Nothing 305-- @ 306maximum :: (Ord a) => InputStream a -> IO (Maybe a) 307maximum stream = do 308 mElem0 <- read stream 309 case mElem0 of 310 Nothing -> return Nothing 311 Just e -> go e 312 where 313 go oldElem = do 314 mElem <- read stream 315 case mElem of 316 Nothing -> return (Just oldElem) 317 Just newElem -> go (max oldElem newElem) 318 319 320------------------------------------------------------------------------------ 321-- | @minimum stream@ returns the greatest element in @stream@ 322-- 323-- 'minimum' consumes the entire stream. 324-- 325-- @ 326-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3] 327-- ghci> Streams.'System.IO.Streams.Combinators.minimum' is 328-- 1 329-- ghci> Streams.'System.IO.Streams.read' is -- The stream is now empty 330-- Nothing 331-- @ 332minimum :: (Ord a) => InputStream a -> IO (Maybe a) 333minimum stream = do 334 mElem0 <- read stream 335 case mElem0 of 336 Nothing -> return Nothing 337 Just e -> go e 338 where 339 go oldElem = do 340 mElem <- read stream 341 case mElem of 342 Nothing -> return (Just oldElem) 343 Just newElem -> go (min oldElem newElem) 344 345 346------------------------------------------------------------------------------ 347-- | @unfoldM f seed@ builds an 'InputStream' from successively applying @f@ to 348-- the @seed@ value, continuing if @f@ produces 'Just' and halting on 349-- 'Nothing'. 350-- 351-- @ 352-- ghci> is \<- Streams.'System.IO.Streams.Combinators.unfoldM' (\n -> return $ if n < 3 then Just (n, n + 1) else Nothing) 0 353-- ghci> Streams.'System.IO.Streams.List.toList' is 354-- [0,1,2] 355-- @ 356unfoldM :: (b -> IO (Maybe (a, b))) -> b -> IO (InputStream a) 357unfoldM f seed = fromGenerator (go seed) 358 where 359 go oldSeed = do 360 m <- liftIO (f oldSeed) 361 case m of 362 Nothing -> return $! () 363 Just (a, newSeed) -> do 364 yield a 365 go newSeed 366 367------------------------------------------------------------------------------ 368-- | Maps a pure function over an 'InputStream'. 369-- 370-- @map f s@ passes all output from @s@ through the function @f@. 371-- 372-- Satisfies the following laws: 373-- 374-- @ 375-- Streams.'map' (g . f) === Streams.'map' f >=> Streams.'map' g 376-- Streams.'map' 'id' === Streams.'makeInputStream' . Streams.'read' 377-- @ 378map :: (a -> b) -> InputStream a -> IO (InputStream b) 379map f s = makeInputStream g 380 where 381 g = read s >>= return . fmap f 382 383 384------------------------------------------------------------------------------ 385-- | Maps an impure function over an 'InputStream'. 386-- 387-- @mapM f s@ passes all output from @s@ through the IO action @f@. 388-- 389-- Satisfies the following laws: 390-- 391-- @ 392-- Streams.'mapM' (f >=> g) === Streams.'mapM' f >=> Streams.'mapM' g 393-- Streams.'mapM' 'return' === Streams.'makeInputStream' . Streams.'read' 394-- @ 395-- 396mapM :: (a -> IO b) -> InputStream a -> IO (InputStream b) 397mapM f s = makeInputStream g 398 where 399 g = do 400 mb <- read s >>= maybe (return Nothing) 401 (\x -> liftM Just $ f x) 402 403 return mb 404 405 406------------------------------------------------------------------------------ 407-- | Maps a side effect over an 'InputStream'. 408-- 409-- @mapM_ f s@ produces a new input stream that passes all output from @s@ 410-- through the side-effecting IO action @f@. 411-- 412-- Example: 413-- 414-- @ 415-- ghci> Streams.'System.IO.Streams.fromList' [1,2,3] >>= 416-- Streams.'mapM_' ('putStrLn' . 'show' . (*2)) >>= 417-- Streams.'System.IO.Streams.toList' 418-- 2 419-- 4 420-- 6 421-- [1,2,3] 422-- @ 423-- 424mapM_ :: (a -> IO b) -> InputStream a -> IO (InputStream a) 425mapM_ f s = makeInputStream $ do 426 mb <- read s 427 _ <- maybe (return $! ()) (void . f) mb 428 return mb 429 430 431------------------------------------------------------------------------------ 432-- | A version of map that discards elements 433-- 434-- @mapMaybe f s@ passes all output from @s@ through the function @f@ and 435-- discards elements for which @f s@ evaluates to 'Nothing'. 436-- 437-- Example: 438-- 439-- @ 440-- ghci> Streams.'System.IO.Streams.fromList' [Just 1, None, Just 3] >>= 441-- Streams.'mapMaybe' 'id' >>= 442-- Streams.'System.IO.Streams.toList' 443-- [1,3] 444-- @ 445-- 446-- /Since: 1.2.1.0/ 447mapMaybe :: (a -> Maybe b) -> InputStream a -> IO (InputStream b) 448mapMaybe f src = makeInputStream g 449 where 450 g = do 451 s <- read src 452 case s of 453 Nothing -> return Nothing 454 Just x -> 455 case f x of 456 Nothing -> g 457 y -> return y 458------------------------------------------------------------------------------ 459-- | Contravariant counterpart to 'map'. 460-- 461-- @contramap f s@ passes all input to @s@ through the function @f@. 462-- 463-- Satisfies the following laws: 464-- 465-- @ 466-- Streams.'contramap' (g . f) === Streams.'contramap' g >=> Streams.'contramap' f 467-- Streams.'contramap' 'id' === 'return' 468-- @ 469contramap :: (a -> b) -> OutputStream b -> IO (OutputStream a) 470contramap f s = makeOutputStream $ flip write s . fmap f 471 472 473------------------------------------------------------------------------------ 474-- | Contravariant counterpart to 'mapM'. 475-- 476-- @contramapM f s@ passes all input to @s@ through the IO action @f@ 477-- 478-- Satisfies the following laws: 479-- 480-- @ 481-- Streams.'contramapM' (f >=> g) = Streams.'contramapM' g >=> Streams.'contramapM' f 482-- Streams.'contramapM' 'return' = 'return' 483-- @ 484contramapM :: (a -> IO b) -> OutputStream b -> IO (OutputStream a) 485contramapM f s = makeOutputStream g 486 where 487 g Nothing = write Nothing s 488 489 g (Just x) = do 490 !y <- f x 491 write (Just y) s 492 493 494------------------------------------------------------------------------------ 495-- | Equivalent to 'mapM_' for output. 496-- 497-- @contramapM f s@ passes all input to @s@ through the side-effecting IO 498-- action @f@. 499-- 500contramapM_ :: (a -> IO b) -> OutputStream a -> IO (OutputStream a) 501contramapM_ f s = makeOutputStream $ \mb -> do 502 _ <- maybe (return $! ()) (void . f) mb 503 write mb s 504 505 506------------------------------------------------------------------------------ 507-- | Contravariant counterpart to 'contramapMaybe'. 508-- 509-- @contramap f s@ passes all input to @s@ through the function @f@. 510-- Discards all the elements for which @f@ returns 'Nothing'. 511-- 512-- /Since: 1.2.1.0/ 513-- 514contramapMaybe :: (a -> Maybe b) -> OutputStream b -> IO (OutputStream a) 515contramapMaybe f s = makeOutputStream $ g 516 where 517 g Nothing = write Nothing s 518 g (Just a) = 519 case f a of 520 Nothing -> return () 521 x -> write x s 522 523 524------------------------------------------------------------------------------ 525-- | Drives an 'InputStream' to end-of-stream, discarding all of the yielded 526-- values. 527skipToEof :: InputStream a -> IO () 528skipToEof str = go 529 where 530 go = read str >>= maybe (return $! ()) (const go) 531{-# INLINE skipToEof #-} 532 533 534------------------------------------------------------------------------------ 535-- | Drops chunks from an input stream if they fail to match a given filter 536-- predicate. See 'Prelude.filter'. 537-- 538-- Items pushed back to the returned stream are propagated back upstream. 539-- 540-- Example: 541-- 542-- @ 543-- ghci> Streams.'System.IO.Streams.fromList' [\"the\", \"quick\", \"brown\", \"fox\"] >>= 544-- Streams.'filterM' ('return' . (/= \"brown\")) >>= Streams.'System.IO.Streams.toList' 545-- [\"the\",\"quick\",\"fox\"] 546-- @ 547filterM :: (a -> IO Bool) 548 -> InputStream a 549 -> IO (InputStream a) 550filterM p src = return $! InputStream prod pb 551 where 552 prod = read src >>= maybe eof chunk 553 554 chunk s = do 555 b <- p s 556 if b then return $! Just s 557 else prod 558 559 eof = return Nothing 560 561 pb s = unRead s src 562 563 564------------------------------------------------------------------------------ 565-- | Drops chunks from an input stream if they fail to match a given filter 566-- predicate. See 'Prelude.filter'. 567-- 568-- Items pushed back to the returned stream are propagated back upstream. 569-- 570-- Example: 571-- 572-- @ 573-- ghci> Streams.'System.IO.Streams.fromList' [\"the\", \"quick\", \"brown\", \"fox\"] >>= 574-- Streams.'filter' (/= \"brown\") >>= Streams.'System.IO.Streams.toList' 575-- [\"the\",\"quick\",\"fox\"] 576-- @ 577filter :: (a -> Bool) 578 -> InputStream a 579 -> IO (InputStream a) 580filter p src = return $! InputStream prod pb 581 where 582 prod = read src >>= maybe eof chunk 583 584 chunk s = do 585 let b = p s 586 if b then return $! Just s 587 else prod 588 589 eof = return Nothing 590 pb s = unRead s src 591 592 593------------------------------------------------------------------------------ 594-- | The function @intersperse v s@ wraps the 'OutputStream' @s@, creating a 595-- new output stream that writes its input to @s@ interspersed with the 596-- provided value @v@. See 'Data.List.intersperse'. 597-- 598-- Example: 599-- 600-- @ 601-- ghci> import Control.Monad ((>=>)) 602-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [\"nom\", \"nom\", \"nom\"::'ByteString'] 603-- ghci> Streams.'System.IO.Streams.List.outputToList' (Streams.'intersperse' \"burp!\" >=> Streams.'System.IO.Streams.connect' is) 604-- [\"nom\",\"burp!\",\"nom\",\"burp!\",\"nom\"] 605-- @ 606intersperse :: a -> OutputStream a -> IO (OutputStream a) 607intersperse sep os = newIORef False >>= makeOutputStream . f 608 where 609 f _ Nothing = write Nothing os 610 f sendRef s = do 611 b <- readIORef sendRef 612 writeIORef sendRef True 613 when b $ write (Just sep) os 614 write s os 615 616 617------------------------------------------------------------------------------ 618-- | Combines two input streams. Continues yielding elements from both input 619-- streams until one of them finishes. 620zip :: InputStream a -> InputStream b -> IO (InputStream (a, b)) 621zip src1 src2 = makeInputStream src 622 where 623 src = read src1 >>= (maybe (return Nothing) $ \a -> 624 read src2 >>= (maybe (unRead a src1 >> return Nothing) $ \b -> 625 return $! Just $! (a, b))) 626 627 628------------------------------------------------------------------------------ 629-- | Combines two input streams using the supplied function. Continues yielding 630-- elements from both input streams until one of them finishes. 631zipWith :: (a -> b -> c) 632 -> InputStream a 633 -> InputStream b 634 -> IO (InputStream c) 635zipWith f src1 src2 = makeInputStream src 636 where 637 src = read src1 >>= (maybe (return Nothing) $ \a -> 638 read src2 >>= (maybe (unRead a src1 >> return Nothing) $ \b -> 639 return $! Just $! f a b ) ) 640 641 642------------------------------------------------------------------------------ 643-- | Combines two input streams using the supplied monadic function. Continues 644-- yielding elements from both input streams until one of them finishes. 645zipWithM :: (a -> b -> IO c) 646 -> InputStream a 647 -> InputStream b 648 -> IO (InputStream c) 649zipWithM f src1 src2 = makeInputStream src 650 where 651 src = read src1 >>= (maybe (return Nothing) $ \a -> 652 read src2 >>= (maybe (unRead a src1 >> return Nothing) $ \b -> 653 f a b >>= \c -> return $! Just $! c ) ) 654 655 656------------------------------------------------------------------------------ 657-- | Filters output to be sent to the given 'OutputStream' using a pure 658-- function. See 'filter'. 659-- 660-- Example: 661-- 662-- @ 663-- ghci> import qualified "Data.ByteString.Char8" as S 664-- ghci> os1 \<- Streams.'System.IO.Streams.stdout' >>= Streams.'System.IO.Streams.unlines 665-- ghci> os2 \<- os1 >>= Streams.'contramap' (S.pack . show) >>= Streams.'filterOutput' even 666-- ghci> Streams.'write' (Just 3) os2 667-- ghci> Streams.'write' (Just 4) os2 668-- 4 669-- @ 670{- Note: The example is a lie, because unlines has weird behavior -} 671filterOutput :: (a -> Bool) -> OutputStream a -> IO (OutputStream a) 672filterOutput p output = makeOutputStream chunk 673 where 674 chunk Nothing = write Nothing output 675 chunk ch@(Just x) = when (p x) $ write ch output 676 677 678------------------------------------------------------------------------------ 679-- | Filters output to be sent to the given 'OutputStream' using a predicate 680-- function in IO. See 'filterM'. 681-- 682-- Example: 683-- 684-- @ 685-- ghci> let check a = putStrLn a ("Allow " ++ show a ++ "?") >> readLn :: IO Bool 686-- ghci> import qualified Data.ByteString.Char8 as S 687-- ghci> os1 <- Streams.'System.IO.Streams.unlines' Streams.'System.IO.Streams.stdout' 688-- ghci> os2 \<- os1 >>= Streams.'contramap' (S.pack . show) >>= Streams.'filterOutputM' check 689-- ghci> Streams.'System.IO.Streams.write' (Just 3) os2 690-- Allow 3? 691-- False\<Enter> 692-- ghci> Streams.'System.IO.Streams.write' (Just 4) os2 693-- Allow 4? 694-- True\<Enter> 695-- 4 696-- @ 697filterOutputM :: (a -> IO Bool) -> OutputStream a -> IO (OutputStream a) 698filterOutputM p output = makeOutputStream chunk 699 where 700 chunk Nothing = write Nothing output 701 chunk ch@(Just x) = do 702 b <- p x 703 if b then write ch output else return $! () 704 705 706------------------------------------------------------------------------------ 707-- | Takes apart a stream of pairs, producing a pair of input streams. Reading 708-- from either of the produced streams will cause a pair of values to be pulled 709-- from the original stream if necessary. Note that reading @n@ values from one 710-- of the returned streams will cause @n@ values to be buffered at the other 711-- stream. 712-- 713-- Access to the original stream is thread safe, i.e. guarded by a lock. 714unzip :: forall a b . InputStream (a, b) -> IO (InputStream a, InputStream b) 715unzip os = do 716 lock <- newMVar $! () 717 buf1 <- newIORef id 718 buf2 <- newIORef id 719 720 is1 <- makeInputStream $ src1 lock buf1 buf2 721 is2 <- makeInputStream $ src2 lock buf1 buf2 722 723 return (is1, is2) 724 725 where 726 twist (a,b) = (b,a) 727 728 src1 lock aBuf bBuf = withMVar lock $ const $ do 729 dl <- readIORef aBuf 730 case dl [] of 731 [] -> more os id bBuf 732 (x:xs) -> writeIORef aBuf (xs++) >> (return $! Just x) 733 734 src2 lock aBuf bBuf = withMVar lock $ const $ do 735 dl <- readIORef bBuf 736 case dl [] of 737 [] -> more os twist aBuf 738 (y:ys) -> writeIORef bBuf (ys++) >> (return $! Just y) 739 740 more :: forall a b x y . 741 InputStream (a,b) 742 -> ((a,b) -> (x,y)) 743 -> IORef ([y] -> [y]) 744 -> IO (Maybe x) 745 more origs proj buf = read origs >>= 746 maybe (return Nothing) 747 (\x -> do 748 let (a, b) = proj x 749 modifyIORef buf (. (b:)) 750 return $! Just a) 751 752 753------------------------------------------------------------------------------ 754-- | Wraps an 'InputStream', producing a new 'InputStream' that will produce at 755-- most @n@ items, subsequently yielding end-of-stream forever. 756-- 757-- Items pushed back to the returned 'InputStream' will be propagated upstream, 758-- modifying the count of taken items accordingly. 759-- 760-- Example: 761-- 762-- @ 763-- ghci> is <- Streams.'fromList' [1..9::Int] 764-- ghci> is' <- Streams.'take' 1 is 765-- ghci> Streams.'read' is' 766-- Just 1 767-- ghci> Streams.'read' is' 768-- Nothing 769-- ghci> Streams.'System.IO.Streams.peek' is 770-- Just 2 771-- ghci> Streams.'unRead' 11 is' 772-- ghci> Streams.'System.IO.Streams.peek' is 773-- Just 11 774-- ghci> Streams.'System.IO.Streams.peek' is' 775-- Just 11 776-- ghci> Streams.'read' is' 777-- Just 11 778-- ghci> Streams.'read' is' 779-- Nothing 780-- ghci> Streams.'read' is 781-- Just 2 782-- ghci> Streams.'toList' is 783-- [3,4,5,6,7,8,9] 784-- @ 785-- 786take :: Int64 -> InputStream a -> IO (InputStream a) 787take k0 input = do 788 kref <- newIORef k0 789 return $! InputStream (prod kref) (pb kref) 790 where 791 prod kref = do 792 !k <- readIORef kref 793 if k <= 0 794 then return Nothing 795 else do 796 m <- read input 797 when (isJust m) $ modifyIORef kref $ \x -> x - 1 798 return m 799 800 pb kref !s = do 801 unRead s input 802 modifyIORef kref (+1) 803 804 805------------------------------------------------------------------------------ 806-- | Wraps an 'InputStream', producing a new 'InputStream' that will drop the 807-- first @n@ items produced by the wrapped stream. See 'Prelude.drop'. 808-- 809-- Items pushed back to the returned 'InputStream' will be propagated upstream, 810-- modifying the count of dropped items accordingly. 811drop :: Int64 -> InputStream a -> IO (InputStream a) 812drop k0 input = do 813 kref <- newIORef k0 814 return $! InputStream (prod kref) (pb kref) 815 where 816 prod kref = do 817 !k <- readIORef kref 818 if k <= 0 819 then getInput kref 820 else discard kref 821 822 getInput kref = do 823 read input >>= maybe (return Nothing) (\c -> do 824 modifyIORef kref (\x -> x - 1) 825 return $! Just c) 826 827 discard kref = getInput kref >>= maybe (return Nothing) (const $ prod kref) 828 829 pb kref s = do 830 unRead s input 831 modifyIORef kref (+1) 832 833 834------------------------------------------------------------------------------ 835-- | Wraps an 'OutputStream', producing a new 'OutputStream' that will pass at 836-- most @n@ items on to the wrapped stream, subsequently ignoring the rest of 837-- the input. 838-- 839give :: Int64 -> OutputStream a -> IO (OutputStream a) 840give k output = newIORef k >>= makeOutputStream . chunk 841 where 842 chunk ref = maybe (return $! ()) $ \x -> do 843 !n <- readIORef ref 844 if n <= 0 845 then return $! () 846 else do 847 writeIORef ref $! n - 1 848 write (Just x) output 849 850 851------------------------------------------------------------------------------ 852-- | Wraps an 'OutputStream', producing a new 'OutputStream' that will ignore 853-- the first @n@ items received, subsequently passing the rest of the input on 854-- to the wrapped stream. 855-- 856ignore :: Int64 -> OutputStream a -> IO (OutputStream a) 857ignore k output = newIORef k >>= makeOutputStream . chunk 858 where 859 chunk ref = maybe (return $! ()) $ \x -> do 860 !n <- readIORef ref 861 if n > 0 862 then writeIORef ref $! n - 1 863 else write (Just x) output 864 865 866------------------------------------------------------------------------------ 867-- | Wraps an 'OutputStream', ignoring any end-of-stream 'Nothing' values 868-- written to the returned stream. 869-- 870-- /Since: 1.0.1.0/ 871-- 872ignoreEof :: OutputStream a -> IO (OutputStream a) 873ignoreEof s = return $ OutputStream f 874 where 875 f Nothing = return $! () 876 f x = write x s 877 878 879------------------------------------------------------------------------------ 880-- | Wraps an 'InputStream', running the specified action when the stream 881-- yields end-of-file. 882-- 883-- /Since: 1.0.2.0/ 884-- 885atEndOfInput :: IO b -> InputStream a -> IO (InputStream a) 886atEndOfInput m is = return $! InputStream prod pb 887 where 888 prod = read is >>= maybe eof (return . Just) 889 eof = void m >> return Nothing 890 pb s = unRead s is 891 892 893------------------------------------------------------------------------------ 894-- | Wraps an 'OutputStream', running the specified action when the stream 895-- receives end-of-file. 896-- 897-- /Since: 1.0.2.0/ 898-- 899atEndOfOutput :: IO b -> OutputStream a -> IO (OutputStream a) 900atEndOfOutput m os = makeOutputStream f 901 where 902 f Nothing = write Nothing os >> void m 903 f x = write x os 904