1-- | Generic stream manipulations 2 3{-# LANGUAGE BangPatterns #-} 4{-# LANGUAGE DeriveDataTypeable #-} 5{-# LANGUAGE RankNTypes #-} 6 7module System.IO.Streams.Combinators 8 ( -- * Folds 9 inputFoldM 10 , outputFoldM 11 , fold 12 , foldM 13 , fold_ 14 , foldM_ 15 , any 16 , all 17 , maximum 18 , minimum 19 20 -- * Unfolds 21 , unfoldM 22 23 -- * Maps 24 , map 25 , mapM 26 , mapM_ 27 , mapMaybe 28 , contramap 29 , contramapM 30 , contramapM_ 31 , contramapMaybe 32 33 -- * Filter 34 , filter 35 , filterM 36 , filterOutput 37 , filterOutputM 38 39 -- * Takes and drops 40 , give 41 , take 42 , drop 43 , ignore 44 45 -- * Zip and unzip 46 , zip 47 , zipWith 48 , zipWithM 49 , unzip 50 , contraunzip 51 52 -- * Utility 53 , intersperse 54 , skipToEof 55 , ignoreEof 56 , atEndOfInput 57 , atEndOfOutput 58 ) where 59 60------------------------------------------------------------------------------ 61import Control.Concurrent.MVar (newMVar, withMVar) 62import Control.Monad (liftM, void, when) 63import Control.Monad.IO.Class (liftIO) 64import Data.Int (Int64) 65import Data.IORef (IORef, atomicModifyIORef, modifyIORef, newIORef, readIORef, writeIORef) 66import Data.Maybe (isJust) 67import Prelude hiding (all, any, drop, filter, map, mapM, mapM_, maximum, minimum, read, take, unzip, zip, zipWith) 68------------------------------------------------------------------------------ 69import System.IO.Streams.Internal (InputStream (..), OutputStream (..), fromGenerator, makeInputStream, makeOutputStream, read, unRead, write, yield) 70 71 72------------------------------------------------------------------------------ 73-- | A side-effecting fold over an 'OutputStream', as a stream transformer. 74-- 75-- The IO action returned by 'outputFoldM' can be used to fetch and reset the updated 76-- seed value. Example: 77-- 78-- @ 79-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3::Int] 80-- ghci> (os, getList) <- Streams.'System.IO.Streams.List.listOutputStream' 81-- ghci> (os', getSeed) \<- Streams.'outputFoldM' (\\x y -> return (x+y)) 0 os 82-- ghci> Streams.'System.IO.Streams.connect' is os' 83-- ghci> getList 84-- [1,2,3] 85-- ghci> getSeed 86-- 6 87-- @ 88outputFoldM :: (a -> b -> IO a) -- ^ fold function 89 -> a -- ^ initial seed 90 -> OutputStream b -- ^ output stream 91 -> IO (OutputStream b, IO a) -- ^ returns a new stream as well as 92 -- an IO action to fetch and reset the 93 -- updated seed value. 94outputFoldM f initial stream = do 95 ref <- newIORef initial 96 os <- makeOutputStream (wr ref) 97 return (os, fetch ref) 98 99 where 100 wr _ Nothing = write Nothing stream 101 wr ref mb@(Just x) = do 102 !z <- readIORef ref 103 !z' <- f z x 104 writeIORef ref z' 105 write mb stream 106 107 fetch ref = atomicModifyIORef ref $ \x -> (initial, x) 108 109 110------------------------------------------------------------------------------ 111-- | A side-effecting fold over an 'InputStream', as a stream transformer. 112-- 113-- The IO action returned by 'inputFoldM' can be used to fetch and reset the updated seed 114-- value. Example: 115-- 116-- @ 117-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3::Int] 118-- ghci> (is', getSeed) \<- Streams.'inputFoldM' (\\x y -> return (x+y)) 0 is 119-- ghci> Streams.'System.IO.Streams.List.toList' is' 120-- [1,2,3] 121-- ghci> getSeed 122-- 6 123-- @ 124inputFoldM :: (a -> b -> IO a) -- ^ fold function 125 -> a -- ^ initial seed 126 -> InputStream b -- ^ input stream 127 -> IO (InputStream b, IO a) -- ^ returns a new stream as well as an 128 -- IO action to fetch and reset the 129 -- updated seed value. 130inputFoldM f initial stream = do 131 ref <- newIORef initial 132 is <- makeInputStream (rd ref) 133 return (is, fetch ref) 134 135 where 136 twiddle _ Nothing = return Nothing 137 138 twiddle ref mb@(Just x) = do 139 !z <- readIORef ref 140 !z' <- f z x 141 writeIORef ref z' 142 return mb 143 144 rd ref = read stream >>= twiddle ref 145 146 fetch ref = atomicModifyIORef ref $ \x -> (initial, x) 147 148 149------------------------------------------------------------------------------ 150-- | A left fold over an input stream. The input stream is fully consumed. See 151-- 'Prelude.foldl'. 152-- 153-- Example: 154-- 155-- @ 156-- ghci> Streams.'System.IO.Streams.fromList' [1..10] >>= Streams.'fold' (+) 0 157-- 55 158-- @ 159fold :: (s -> a -> s) -- ^ fold function 160 -> s -- ^ initial seed 161 -> InputStream a -- ^ input stream 162 -> IO s 163fold f seed stream = go seed 164 where 165 go !s = read stream >>= maybe (return s) (go . f s) 166 167 168------------------------------------------------------------------------------ 169-- | A side-effecting left fold over an input stream. The input stream is fully 170-- consumed. See 'Prelude.foldl'. 171-- 172-- Example: 173-- 174-- @ 175-- ghci> Streams.'System.IO.Streams.fromList' [1..10] >>= Streams.'foldM' (\x y -> 'return' (x + y)) 0 176-- 55 177-- @ 178foldM :: (s -> a -> IO s) -- ^ fold function 179 -> s -- ^ initial seed 180 -> InputStream a -- ^ input stream 181 -> IO s 182foldM f seed stream = go seed 183 where 184 go !s = read stream >>= maybe (return s) ((go =<<) . f s) 185 186 187------------------------------------------------------------------------------ 188-- | A variant of 'System.IO.Streams.fold' suitable for use with composable folds 189-- from \'beautiful folding\' libraries like 190-- <http://hackage.haskell.org/package/foldl the foldl library>. 191-- The input stream is fully consumed. 192-- 193-- Example: 194-- 195-- @ 196-- ghci> let folds = liftA3 (,,) Foldl.length Foldl.mean Foldl.maximum 197-- ghci> Streams.'System.IO.Streams.fromList' [1..10::Double] >>= Foldl.purely Streams.'System.IO.Streams.fold_' folds is 198-- ghci> (10,5.5,Just 10.0) 199-- @ 200-- 201-- /Since 1.3.6.0/ 202-- 203fold_ :: (x -> a -> x) -- ^ accumulator update function 204 -> x -- ^ initial seed 205 -> (x -> s) -- ^ recover folded value 206 -> InputStream a -- ^ input stream 207 -> IO s 208fold_ op seed done stream = liftM done (go seed) 209 where 210 go !s = read stream >>= maybe (return s) (go . op s) 211 212 213------------------------------------------------------------------------------ 214-- | A variant of 'System.IO.Streams.foldM' suitable for use with composable folds 215-- from \'beautiful folding\' libraries like 216-- <http://hackage.haskell.org/package/foldl the foldl library>. 217-- The input stream is fully consumed. 218-- 219-- Example: 220-- 221-- @ 222-- ghci> let folds = Foldl.mapM_ print *> Foldl.generalize (liftA2 (,) Foldl.sum Foldl.mean) 223-- ghci> Streams.'System.IO.Streams.fromList' [1..3::Double] >>= Foldl.impurely Streams.'System.IO.Streams.foldM_' folds 224-- 1.0 225-- 2.0 226-- 3.0 227-- (6.0,2.0) 228-- @ 229-- 230-- /Since 1.3.6.0/ 231-- 232foldM_ :: (x -> a -> IO x) -- ^ accumulator update action 233 -> IO x -- ^ initial seed 234 -> (x -> IO s) -- ^ recover folded value 235 -> InputStream a -- ^ input stream 236 -> IO s 237foldM_ f seed done stream = seed >>= go 238 where 239 go !x = read stream >>= maybe (done x) ((go =<<) . f x) 240 241 242------------------------------------------------------------------------------ 243-- | @any predicate stream@ returns 'True' if any element in @stream@ matches 244-- the predicate. 245-- 246-- 'any' consumes as few elements as possible, ending consumption if an element 247-- satisfies the predicate. 248-- 249-- @ 250-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3] 251-- ghci> Streams.'System.IO.Streams.Combinators.any' (> 0) is -- Consumes one element 252-- True 253-- ghci> Streams.'System.IO.Streams.read' is 254-- Just 2 255-- ghci> Streams.'System.IO.Streams.Combinators.any' even is -- Only 3 remains 256-- False 257-- @ 258any :: (a -> Bool) -> InputStream a -> IO Bool 259any predicate stream = go 260 where 261 go = do 262 mElem <- read stream 263 case mElem of 264 Nothing -> return False 265 Just e -> if predicate e then return True else go 266 267 268------------------------------------------------------------------------------ 269-- | @all predicate stream@ returns 'True' if every element in @stream@ matches 270-- the predicate. 271-- 272-- 'all' consumes as few elements as possible, ending consumption if any element 273-- fails the predicate. 274-- 275-- @ 276-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3] 277-- ghci> Streams.'System.IO.Streams.Combinators.all' (< 0) is -- Consumes one element 278-- False 279-- ghci> Streams.'System.IO.Streams.read' is 280-- Just 2 281-- ghci> Streams.'System.IO.Streams.Combinators.all' odd is -- Only 3 remains 282-- True 283-- @ 284all :: (a -> Bool) -> InputStream a -> IO Bool 285all predicate stream = go 286 where 287 go = do 288 mElem <- read stream 289 case mElem of 290 Nothing -> return True 291 Just e -> if predicate e then go else return False 292 293 294------------------------------------------------------------------------------ 295-- | @maximum stream@ returns the greatest element in @stream@ or 'Nothing' if 296-- the stream is empty. 297-- 298-- 'maximum' consumes the entire stream. 299-- 300-- @ 301-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3] 302-- ghci> Streams.'System.IO.Streams.Combinators.maximum' is 303-- 3 304-- ghci> Streams.'System.IO.Streams.read' is -- The stream is now empty 305-- Nothing 306-- @ 307maximum :: (Ord a) => InputStream a -> IO (Maybe a) 308maximum stream = do 309 mElem0 <- read stream 310 case mElem0 of 311 Nothing -> return Nothing 312 Just e -> go e 313 where 314 go oldElem = do 315 mElem <- read stream 316 case mElem of 317 Nothing -> return (Just oldElem) 318 Just newElem -> go (max oldElem newElem) 319 320 321------------------------------------------------------------------------------ 322-- | @minimum stream@ returns the greatest element in @stream@ 323-- 324-- 'minimum' consumes the entire stream. 325-- 326-- @ 327-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3] 328-- ghci> Streams.'System.IO.Streams.Combinators.minimum' is 329-- 1 330-- ghci> Streams.'System.IO.Streams.read' is -- The stream is now empty 331-- Nothing 332-- @ 333minimum :: (Ord a) => InputStream a -> IO (Maybe a) 334minimum stream = do 335 mElem0 <- read stream 336 case mElem0 of 337 Nothing -> return Nothing 338 Just e -> go e 339 where 340 go oldElem = do 341 mElem <- read stream 342 case mElem of 343 Nothing -> return (Just oldElem) 344 Just newElem -> go (min oldElem newElem) 345 346 347------------------------------------------------------------------------------ 348-- | @unfoldM f seed@ builds an 'InputStream' from successively applying @f@ to 349-- the @seed@ value, continuing if @f@ produces 'Just' and halting on 350-- 'Nothing'. 351-- 352-- @ 353-- ghci> is \<- Streams.'System.IO.Streams.Combinators.unfoldM' (\n -> return $ if n < 3 then Just (n, n + 1) else Nothing) 0 354-- ghci> Streams.'System.IO.Streams.List.toList' is 355-- [0,1,2] 356-- @ 357unfoldM :: (b -> IO (Maybe (a, b))) -> b -> IO (InputStream a) 358unfoldM f seed = fromGenerator (go seed) 359 where 360 go oldSeed = do 361 m <- liftIO (f oldSeed) 362 case m of 363 Nothing -> return $! () 364 Just (a, newSeed) -> do 365 yield a 366 go newSeed 367 368------------------------------------------------------------------------------ 369-- | Maps a pure function over an 'InputStream'. 370-- 371-- @map f s@ passes all output from @s@ through the function @f@. 372-- 373-- Satisfies the following laws: 374-- 375-- @ 376-- Streams.'map' (g . f) === Streams.'map' f >=> Streams.'map' g 377-- Streams.'map' 'id' === Streams.'makeInputStream' . Streams.'read' 378-- @ 379map :: (a -> b) -> InputStream a -> IO (InputStream b) 380map f s = makeInputStream g 381 where 382 g = read s >>= return . fmap f 383 384 385------------------------------------------------------------------------------ 386-- | Maps an impure function over an 'InputStream'. 387-- 388-- @mapM f s@ passes all output from @s@ through the IO action @f@. 389-- 390-- Satisfies the following laws: 391-- 392-- @ 393-- Streams.'mapM' (f >=> g) === Streams.'mapM' f >=> Streams.'mapM' g 394-- Streams.'mapM' 'return' === Streams.'makeInputStream' . Streams.'read' 395-- @ 396-- 397mapM :: (a -> IO b) -> InputStream a -> IO (InputStream b) 398mapM f s = makeInputStream g 399 where 400 g = do 401 mb <- read s >>= maybe (return Nothing) 402 (\x -> liftM Just $ f x) 403 404 return mb 405 406 407------------------------------------------------------------------------------ 408-- | Maps a side effect over an 'InputStream'. 409-- 410-- @mapM_ f s@ produces a new input stream that passes all output from @s@ 411-- through the side-effecting IO action @f@. 412-- 413-- Example: 414-- 415-- @ 416-- ghci> Streams.'System.IO.Streams.fromList' [1,2,3] >>= 417-- Streams.'mapM_' ('putStrLn' . 'show' . (*2)) >>= 418-- Streams.'System.IO.Streams.toList' 419-- 2 420-- 4 421-- 6 422-- [1,2,3] 423-- @ 424-- 425mapM_ :: (a -> IO b) -> InputStream a -> IO (InputStream a) 426mapM_ f s = makeInputStream $ do 427 mb <- read s 428 _ <- maybe (return $! ()) (void . f) mb 429 return mb 430 431 432------------------------------------------------------------------------------ 433-- | A version of map that discards elements 434-- 435-- @mapMaybe f s@ passes all output from @s@ through the function @f@ and 436-- discards elements for which @f s@ evaluates to 'Nothing'. 437-- 438-- Example: 439-- 440-- @ 441-- ghci> Streams.'System.IO.Streams.fromList' [Just 1, None, Just 3] >>= 442-- Streams.'mapMaybe' 'id' >>= 443-- Streams.'System.IO.Streams.toList' 444-- [1,3] 445-- @ 446-- 447-- /Since: 1.2.1.0/ 448mapMaybe :: (a -> Maybe b) -> InputStream a -> IO (InputStream b) 449mapMaybe f src = makeInputStream g 450 where 451 g = do 452 s <- read src 453 case s of 454 Nothing -> return Nothing 455 Just x -> 456 case f x of 457 Nothing -> g 458 y -> return y 459------------------------------------------------------------------------------ 460-- | Contravariant counterpart to 'map'. 461-- 462-- @contramap f s@ passes all input to @s@ through the function @f@. 463-- 464-- Satisfies the following laws: 465-- 466-- @ 467-- Streams.'contramap' (g . f) === Streams.'contramap' g >=> Streams.'contramap' f 468-- Streams.'contramap' 'id' === 'return' 469-- @ 470contramap :: (a -> b) -> OutputStream b -> IO (OutputStream a) 471contramap f s = makeOutputStream $ flip write s . fmap f 472 473 474------------------------------------------------------------------------------ 475-- | Contravariant counterpart to 'mapM'. 476-- 477-- @contramapM f s@ passes all input to @s@ through the IO action @f@ 478-- 479-- Satisfies the following laws: 480-- 481-- @ 482-- Streams.'contramapM' (f >=> g) = Streams.'contramapM' g >=> Streams.'contramapM' f 483-- Streams.'contramapM' 'return' = 'return' 484-- @ 485contramapM :: (a -> IO b) -> OutputStream b -> IO (OutputStream a) 486contramapM f s = makeOutputStream g 487 where 488 g Nothing = write Nothing s 489 490 g (Just x) = do 491 !y <- f x 492 write (Just y) s 493 494 495------------------------------------------------------------------------------ 496-- | Equivalent to 'mapM_' for output. 497-- 498-- @contramapM f s@ passes all input to @s@ through the side-effecting IO 499-- action @f@. 500-- 501contramapM_ :: (a -> IO b) -> OutputStream a -> IO (OutputStream a) 502contramapM_ f s = makeOutputStream $ \mb -> do 503 _ <- maybe (return $! ()) (void . f) mb 504 write mb s 505 506 507------------------------------------------------------------------------------ 508-- | Contravariant counterpart to 'contramapMaybe'. 509-- 510-- @contramap f s@ passes all input to @s@ through the function @f@. 511-- Discards all the elements for which @f@ returns 'Nothing'. 512-- 513-- /Since: 1.2.1.0/ 514-- 515contramapMaybe :: (a -> Maybe b) -> OutputStream b -> IO (OutputStream a) 516contramapMaybe f s = makeOutputStream $ g 517 where 518 g Nothing = write Nothing s 519 g (Just a) = 520 case f a of 521 Nothing -> return () 522 x -> write x s 523 524 525------------------------------------------------------------------------------ 526-- | Drives an 'InputStream' to end-of-stream, discarding all of the yielded 527-- values. 528skipToEof :: InputStream a -> IO () 529skipToEof str = go 530 where 531 go = read str >>= maybe (return $! ()) (const go) 532{-# INLINE skipToEof #-} 533 534 535------------------------------------------------------------------------------ 536-- | Drops chunks from an input stream if they fail to match a given filter 537-- predicate. See 'Prelude.filter'. 538-- 539-- Items pushed back to the returned stream are propagated back upstream. 540-- 541-- Example: 542-- 543-- @ 544-- ghci> Streams.'System.IO.Streams.fromList' [\"the\", \"quick\", \"brown\", \"fox\"] >>= 545-- Streams.'filterM' ('return' . (/= \"brown\")) >>= Streams.'System.IO.Streams.toList' 546-- [\"the\",\"quick\",\"fox\"] 547-- @ 548filterM :: (a -> IO Bool) 549 -> InputStream a 550 -> IO (InputStream a) 551filterM p src = return $! InputStream prod pb 552 where 553 prod = read src >>= maybe eof chunk 554 555 chunk s = do 556 b <- p s 557 if b then return $! Just s 558 else prod 559 560 eof = return Nothing 561 562 pb s = unRead s src 563 564 565------------------------------------------------------------------------------ 566-- | Drops chunks from an input stream if they fail to match a given filter 567-- predicate. See 'Prelude.filter'. 568-- 569-- Items pushed back to the returned stream are propagated back upstream. 570-- 571-- Example: 572-- 573-- @ 574-- ghci> Streams.'System.IO.Streams.fromList' [\"the\", \"quick\", \"brown\", \"fox\"] >>= 575-- Streams.'filter' (/= \"brown\") >>= Streams.'System.IO.Streams.toList' 576-- [\"the\",\"quick\",\"fox\"] 577-- @ 578filter :: (a -> Bool) 579 -> InputStream a 580 -> IO (InputStream a) 581filter p src = return $! InputStream prod pb 582 where 583 prod = read src >>= maybe eof chunk 584 585 chunk s = do 586 let b = p s 587 if b then return $! Just s 588 else prod 589 590 eof = return Nothing 591 pb s = unRead s src 592 593 594------------------------------------------------------------------------------ 595-- | The function @intersperse v s@ wraps the 'OutputStream' @s@, creating a 596-- new output stream that writes its input to @s@ interspersed with the 597-- provided value @v@. See 'Data.List.intersperse'. 598-- 599-- Example: 600-- 601-- @ 602-- ghci> import Control.Monad ((>=>)) 603-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [\"nom\", \"nom\", \"nom\"::'ByteString'] 604-- ghci> Streams.'System.IO.Streams.List.outputToList' (Streams.'intersperse' \"burp!\" >=> Streams.'System.IO.Streams.connect' is) 605-- [\"nom\",\"burp!\",\"nom\",\"burp!\",\"nom\"] 606-- @ 607intersperse :: a -> OutputStream a -> IO (OutputStream a) 608intersperse sep os = newIORef False >>= makeOutputStream . f 609 where 610 f _ Nothing = write Nothing os 611 f sendRef s = do 612 b <- readIORef sendRef 613 writeIORef sendRef True 614 when b $ write (Just sep) os 615 write s os 616 617 618------------------------------------------------------------------------------ 619-- | Combines two input streams. Continues yielding elements from both input 620-- streams until one of them finishes. 621zip :: InputStream a -> InputStream b -> IO (InputStream (a, b)) 622zip src1 src2 = makeInputStream src 623 where 624 src = read src1 >>= (maybe (return Nothing) $ \a -> 625 read src2 >>= (maybe (unRead a src1 >> return Nothing) $ \b -> 626 return $! Just $! (a, b))) 627 628 629------------------------------------------------------------------------------ 630-- | Combines two input streams using the supplied function. Continues yielding 631-- elements from both input streams until one of them finishes. 632zipWith :: (a -> b -> c) 633 -> InputStream a 634 -> InputStream b 635 -> IO (InputStream c) 636zipWith f src1 src2 = makeInputStream src 637 where 638 src = read src1 >>= (maybe (return Nothing) $ \a -> 639 read src2 >>= (maybe (unRead a src1 >> return Nothing) $ \b -> 640 return $! Just $! f a b ) ) 641 642 643------------------------------------------------------------------------------ 644-- | Combines two input streams using the supplied monadic function. Continues 645-- yielding elements from both input streams until one of them finishes. 646zipWithM :: (a -> b -> IO c) 647 -> InputStream a 648 -> InputStream b 649 -> IO (InputStream c) 650zipWithM f src1 src2 = makeInputStream src 651 where 652 src = read src1 >>= (maybe (return Nothing) $ \a -> 653 read src2 >>= (maybe (unRead a src1 >> return Nothing) $ \b -> 654 f a b >>= \c -> return $! Just $! c ) ) 655 656 657------------------------------------------------------------------------------ 658-- | Filters output to be sent to the given 'OutputStream' using a pure 659-- function. See 'filter'. 660-- 661-- Example: 662-- 663-- @ 664-- ghci> import qualified "Data.ByteString.Char8" as S 665-- ghci> os1 \<- Streams.'System.IO.Streams.stdout' >>= Streams.'System.IO.Streams.unlines 666-- ghci> os2 \<- os1 >>= Streams.'contramap' (S.pack . show) >>= Streams.'filterOutput' even 667-- ghci> Streams.'write' (Just 3) os2 668-- ghci> Streams.'write' (Just 4) os2 669-- 4 670-- @ 671{- Note: The example is a lie, because unlines has weird behavior -} 672filterOutput :: (a -> Bool) -> OutputStream a -> IO (OutputStream a) 673filterOutput p output = makeOutputStream chunk 674 where 675 chunk Nothing = write Nothing output 676 chunk ch@(Just x) = when (p x) $ write ch output 677 678 679------------------------------------------------------------------------------ 680-- | Filters output to be sent to the given 'OutputStream' using a predicate 681-- function in IO. See 'filterM'. 682-- 683-- Example: 684-- 685-- @ 686-- ghci> let check a = putStrLn a ("Allow " ++ show a ++ "?") >> readLn :: IO Bool 687-- ghci> import qualified Data.ByteString.Char8 as S 688-- ghci> os1 <- Streams.'System.IO.Streams.unlines' Streams.'System.IO.Streams.stdout' 689-- ghci> os2 \<- os1 >>= Streams.'contramap' (S.pack . show) >>= Streams.'filterOutputM' check 690-- ghci> Streams.'System.IO.Streams.write' (Just 3) os2 691-- Allow 3? 692-- False\<Enter> 693-- ghci> Streams.'System.IO.Streams.write' (Just 4) os2 694-- Allow 4? 695-- True\<Enter> 696-- 4 697-- @ 698filterOutputM :: (a -> IO Bool) -> OutputStream a -> IO (OutputStream a) 699filterOutputM p output = makeOutputStream chunk 700 where 701 chunk Nothing = write Nothing output 702 chunk ch@(Just x) = do 703 b <- p x 704 if b then write ch output else return $! () 705 706 707------------------------------------------------------------------------------ 708-- | Takes apart a stream of pairs, producing a pair of input streams. Reading 709-- from either of the produced streams will cause a pair of values to be pulled 710-- from the original stream if necessary. Note that reading @n@ values from one 711-- of the returned streams will cause @n@ values to be buffered at the other 712-- stream. 713-- 714-- Access to the original stream is thread safe, i.e. guarded by a lock. 715unzip :: forall a b . InputStream (a, b) -> IO (InputStream a, InputStream b) 716unzip os = do 717 lock <- newMVar $! () 718 buf1 <- newIORef id 719 buf2 <- newIORef id 720 721 is1 <- makeInputStream $ src1 lock buf1 buf2 722 is2 <- makeInputStream $ src2 lock buf1 buf2 723 724 return (is1, is2) 725 726 where 727 twist (a,b) = (b,a) 728 729 src1 lock aBuf bBuf = withMVar lock $ const $ do 730 dl <- readIORef aBuf 731 case dl [] of 732 [] -> more os id bBuf 733 (x:xs) -> writeIORef aBuf (xs++) >> (return $! Just x) 734 735 src2 lock aBuf bBuf = withMVar lock $ const $ do 736 dl <- readIORef bBuf 737 case dl [] of 738 [] -> more os twist aBuf 739 (y:ys) -> writeIORef bBuf (ys++) >> (return $! Just y) 740 741 more :: forall a b x y . 742 InputStream (a,b) 743 -> ((a,b) -> (x,y)) 744 -> IORef ([y] -> [y]) 745 -> IO (Maybe x) 746 more origs proj buf = read origs >>= 747 maybe (return Nothing) 748 (\x -> do 749 let (a, b) = proj x 750 modifyIORef buf (. (b:)) 751 return $! Just a) 752 753 754------------------------------------------------------------------------------ 755-- | Given two 'OutputStream's, returns a new stream that "unzips" the tuples 756-- being written, writing the two elements to the corresponding given streams. 757-- 758-- You can use this together with @'contramap' (\\ x -> (x, x))@ to "fork" a 759-- stream into two. 760-- 761-- /Since: 1.5.2.0/ 762contraunzip :: OutputStream a -> OutputStream b -> IO (OutputStream (a, b)) 763contraunzip sink1 sink2 = makeOutputStream $ \ tuple -> do 764 write (fmap fst tuple) sink1 765 write (fmap snd tuple) sink2 766 767 768------------------------------------------------------------------------------ 769-- | Wraps an 'InputStream', producing a new 'InputStream' that will produce at 770-- most @n@ items, subsequently yielding end-of-stream forever. 771-- 772-- Items pushed back to the returned 'InputStream' will be propagated upstream, 773-- modifying the count of taken items accordingly. 774-- 775-- Example: 776-- 777-- @ 778-- ghci> is <- Streams.'fromList' [1..9::Int] 779-- ghci> is' <- Streams.'take' 1 is 780-- ghci> Streams.'read' is' 781-- Just 1 782-- ghci> Streams.'read' is' 783-- Nothing 784-- ghci> Streams.'System.IO.Streams.peek' is 785-- Just 2 786-- ghci> Streams.'unRead' 11 is' 787-- ghci> Streams.'System.IO.Streams.peek' is 788-- Just 11 789-- ghci> Streams.'System.IO.Streams.peek' is' 790-- Just 11 791-- ghci> Streams.'read' is' 792-- Just 11 793-- ghci> Streams.'read' is' 794-- Nothing 795-- ghci> Streams.'read' is 796-- Just 2 797-- ghci> Streams.'toList' is 798-- [3,4,5,6,7,8,9] 799-- @ 800-- 801take :: Int64 -> InputStream a -> IO (InputStream a) 802take k0 input = do 803 kref <- newIORef k0 804 return $! InputStream (prod kref) (pb kref) 805 where 806 prod kref = do 807 !k <- readIORef kref 808 if k <= 0 809 then return Nothing 810 else do 811 m <- read input 812 when (isJust m) $ modifyIORef kref $ \x -> x - 1 813 return m 814 815 pb kref !s = do 816 unRead s input 817 modifyIORef kref (+1) 818 819 820------------------------------------------------------------------------------ 821-- | Wraps an 'InputStream', producing a new 'InputStream' that will drop the 822-- first @n@ items produced by the wrapped stream. See 'Prelude.drop'. 823-- 824-- Items pushed back to the returned 'InputStream' will be propagated upstream, 825-- modifying the count of dropped items accordingly. 826drop :: Int64 -> InputStream a -> IO (InputStream a) 827drop k0 input = do 828 kref <- newIORef k0 829 return $! InputStream (prod kref) (pb kref) 830 where 831 prod kref = do 832 !k <- readIORef kref 833 if k <= 0 834 then getInput kref 835 else discard kref 836 837 getInput kref = do 838 read input >>= maybe (return Nothing) (\c -> do 839 modifyIORef kref (\x -> x - 1) 840 return $! Just c) 841 842 discard kref = getInput kref >>= maybe (return Nothing) (const $ prod kref) 843 844 pb kref s = do 845 unRead s input 846 modifyIORef kref (+1) 847 848 849------------------------------------------------------------------------------ 850-- | Wraps an 'OutputStream', producing a new 'OutputStream' that will pass at 851-- most @n@ items on to the wrapped stream, subsequently ignoring the rest of 852-- the input. 853-- 854give :: Int64 -> OutputStream a -> IO (OutputStream a) 855give k output = newIORef k >>= makeOutputStream . chunk 856 where 857 chunk ref = maybe (return $! ()) $ \x -> do 858 !n <- readIORef ref 859 if n <= 0 860 then return $! () 861 else do 862 writeIORef ref $! n - 1 863 write (Just x) output 864 865 866------------------------------------------------------------------------------ 867-- | Wraps an 'OutputStream', producing a new 'OutputStream' that will ignore 868-- the first @n@ items received, subsequently passing the rest of the input on 869-- to the wrapped stream. 870-- 871ignore :: Int64 -> OutputStream a -> IO (OutputStream a) 872ignore k output = newIORef k >>= makeOutputStream . chunk 873 where 874 chunk ref = maybe (return $! ()) $ \x -> do 875 !n <- readIORef ref 876 if n > 0 877 then writeIORef ref $! n - 1 878 else write (Just x) output 879 880 881------------------------------------------------------------------------------ 882-- | Wraps an 'OutputStream', ignoring any end-of-stream 'Nothing' values 883-- written to the returned stream. 884-- 885-- /Since: 1.0.1.0/ 886-- 887ignoreEof :: OutputStream a -> IO (OutputStream a) 888ignoreEof s = return $ OutputStream f 889 where 890 f Nothing = return $! () 891 f x = write x s 892 893 894------------------------------------------------------------------------------ 895-- | Wraps an 'InputStream', running the specified action when the stream 896-- yields end-of-file. 897-- 898-- /Since: 1.0.2.0/ 899-- 900atEndOfInput :: IO b -> InputStream a -> IO (InputStream a) 901atEndOfInput m is = return $! InputStream prod pb 902 where 903 prod = read is >>= maybe eof (return . Just) 904 eof = void m >> return Nothing 905 pb s = unRead s is 906 907 908------------------------------------------------------------------------------ 909-- | Wraps an 'OutputStream', running the specified action when the stream 910-- receives end-of-file. 911-- 912-- /Since: 1.0.2.0/ 913-- 914atEndOfOutput :: IO b -> OutputStream a -> IO (OutputStream a) 915atEndOfOutput m os = makeOutputStream f 916 where 917 f Nothing = write Nothing os >> void m 918 f x = write x os 919