1-- | Generic stream manipulations
2
3{-# LANGUAGE BangPatterns       #-}
4{-# LANGUAGE DeriveDataTypeable #-}
5{-# LANGUAGE RankNTypes         #-}
6
7module System.IO.Streams.Combinators
8 ( -- * Folds
9   inputFoldM
10 , outputFoldM
11 , fold
12 , foldM
13 , fold_
14 , foldM_
15 , any
16 , all
17 , maximum
18 , minimum
19
20   -- * Unfolds
21 , unfoldM
22
23   -- * Maps
24 , map
25 , mapM
26 , mapM_
27 , mapMaybe
28 , contramap
29 , contramapM
30 , contramapM_
31 , contramapMaybe
32
33   -- * Filter
34 , filter
35 , filterM
36 , filterOutput
37 , filterOutputM
38
39   -- * Takes and drops
40 , give
41 , take
42 , drop
43 , ignore
44
45   -- * Zip and unzip
46 , zip
47 , zipWith
48 , zipWithM
49 , unzip
50 , contraunzip
51
52   -- * Utility
53 , intersperse
54 , skipToEof
55 , ignoreEof
56 , atEndOfInput
57 , atEndOfOutput
58 ) where
59
60------------------------------------------------------------------------------
61import           Control.Concurrent.MVar    (newMVar, withMVar)
62import           Control.Monad              (liftM, void, when)
63import           Control.Monad.IO.Class     (liftIO)
64import           Data.Int                   (Int64)
65import           Data.IORef                 (IORef, atomicModifyIORef, modifyIORef, newIORef, readIORef, writeIORef)
66import           Data.Maybe                 (isJust)
67import           Prelude                    hiding (all, any, drop, filter, map, mapM, mapM_, maximum, minimum, read, take, unzip, zip, zipWith)
68------------------------------------------------------------------------------
69import           System.IO.Streams.Internal (InputStream (..), OutputStream (..), fromGenerator, makeInputStream, makeOutputStream, read, unRead, write, yield)
70
71
72------------------------------------------------------------------------------
73-- | A side-effecting fold over an 'OutputStream', as a stream transformer.
74--
75-- The IO action returned by 'outputFoldM' can be used to fetch and reset the updated
76-- seed value. Example:
77--
78-- @
79-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3::Int]
80-- ghci> (os, getList) <- Streams.'System.IO.Streams.List.listOutputStream'
81-- ghci> (os', getSeed) \<- Streams.'outputFoldM' (\\x y -> return (x+y)) 0 os
82-- ghci> Streams.'System.IO.Streams.connect' is os'
83-- ghci> getList
84-- [1,2,3]
85-- ghci> getSeed
86-- 6
87-- @
88outputFoldM :: (a -> b -> IO a)           -- ^ fold function
89            -> a                          -- ^ initial seed
90            -> OutputStream b             -- ^ output stream
91            -> IO (OutputStream b, IO a)  -- ^ returns a new stream as well as
92                                          -- an IO action to fetch and reset the
93                                          --  updated seed value.
94outputFoldM f initial stream = do
95    ref <- newIORef initial
96    os  <- makeOutputStream (wr ref)
97    return (os, fetch ref)
98
99  where
100    wr _ Nothing       = write Nothing stream
101    wr ref mb@(Just x) = do
102        !z  <- readIORef ref
103        !z' <- f z x
104        writeIORef ref z'
105        write mb stream
106
107    fetch ref = atomicModifyIORef ref $ \x -> (initial, x)
108
109
110------------------------------------------------------------------------------
111-- | A side-effecting fold over an 'InputStream', as a stream transformer.
112--
113-- The IO action returned by 'inputFoldM' can be used to fetch and reset the updated seed
114-- value. Example:
115--
116-- @
117-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3::Int]
118-- ghci> (is', getSeed) \<- Streams.'inputFoldM' (\\x y -> return (x+y)) 0 is
119-- ghci> Streams.'System.IO.Streams.List.toList' is'
120-- [1,2,3]
121-- ghci> getSeed
122-- 6
123-- @
124inputFoldM :: (a -> b -> IO a)          -- ^ fold function
125           -> a                         -- ^ initial seed
126           -> InputStream b             -- ^ input stream
127           -> IO (InputStream b, IO a)  -- ^ returns a new stream as well as an
128                                        -- IO action to fetch and reset the
129                                        -- updated seed value.
130inputFoldM f initial stream = do
131    ref <- newIORef initial
132    is  <- makeInputStream (rd ref)
133    return (is, fetch ref)
134
135  where
136    twiddle _ Nothing = return Nothing
137
138    twiddle ref mb@(Just x) = do
139        !z  <- readIORef ref
140        !z' <- f z x
141        writeIORef ref z'
142        return mb
143
144    rd ref = read stream >>= twiddle ref
145
146    fetch ref = atomicModifyIORef ref $ \x -> (initial, x)
147
148
149------------------------------------------------------------------------------
150-- | A left fold over an input stream. The input stream is fully consumed. See
151-- 'Prelude.foldl'.
152--
153-- Example:
154--
155-- @
156-- ghci> Streams.'System.IO.Streams.fromList' [1..10] >>= Streams.'fold' (+) 0
157-- 55
158-- @
159fold :: (s -> a -> s)       -- ^ fold function
160     -> s                   -- ^ initial seed
161     -> InputStream a       -- ^ input stream
162     -> IO s
163fold f seed stream = go seed
164  where
165    go !s = read stream >>= maybe (return s) (go . f s)
166
167
168------------------------------------------------------------------------------
169-- | A side-effecting left fold over an input stream. The input stream is fully
170-- consumed. See 'Prelude.foldl'.
171--
172-- Example:
173--
174-- @
175-- ghci> Streams.'System.IO.Streams.fromList' [1..10] >>= Streams.'foldM' (\x y -> 'return' (x + y)) 0
176-- 55
177-- @
178foldM :: (s -> a -> IO s)       -- ^ fold function
179      -> s                      -- ^ initial seed
180      -> InputStream a          -- ^ input stream
181      -> IO s
182foldM f seed stream = go seed
183  where
184    go !s = read stream >>= maybe (return s) ((go =<<) . f s)
185
186
187------------------------------------------------------------------------------
188-- | A variant of 'System.IO.Streams.fold' suitable for use with composable folds
189-- from \'beautiful folding\' libraries like
190-- <http://hackage.haskell.org/package/foldl the foldl library>.
191-- The input stream is fully consumed.
192--
193-- Example:
194--
195-- @
196-- ghci> let folds = liftA3 (,,) Foldl.length Foldl.mean Foldl.maximum
197-- ghci> Streams.'System.IO.Streams.fromList' [1..10::Double] >>= Foldl.purely Streams.'System.IO.Streams.fold_' folds is
198-- ghci> (10,5.5,Just 10.0)
199-- @
200--
201-- /Since 1.3.6.0/
202--
203fold_ :: (x -> a -> x)    -- ^ accumulator update function
204      -> x                -- ^ initial seed
205      -> (x -> s)         -- ^ recover folded value
206      -> InputStream a    -- ^ input stream
207      -> IO s
208fold_ op seed done stream = liftM done (go seed)
209   where
210     go !s = read stream >>= maybe (return s) (go . op s)
211
212
213------------------------------------------------------------------------------
214-- | A variant of 'System.IO.Streams.foldM' suitable for use with composable folds
215-- from \'beautiful folding\' libraries like
216-- <http://hackage.haskell.org/package/foldl the foldl library>.
217-- The input stream is fully consumed.
218--
219-- Example:
220--
221-- @
222-- ghci> let folds = Foldl.mapM_ print *> Foldl.generalize (liftA2 (,) Foldl.sum Foldl.mean)
223-- ghci> Streams.'System.IO.Streams.fromList' [1..3::Double] >>= Foldl.impurely Streams.'System.IO.Streams.foldM_' folds
224-- 1.0
225-- 2.0
226-- 3.0
227-- (6.0,2.0)
228-- @
229--
230-- /Since 1.3.6.0/
231--
232foldM_ :: (x -> a -> IO x)   -- ^ accumulator update action
233       -> IO x               -- ^ initial seed
234       -> (x -> IO s)        -- ^ recover folded value
235       -> InputStream a      -- ^ input stream
236       -> IO s
237foldM_ f seed done stream = seed >>= go
238  where
239    go !x = read stream >>= maybe (done x) ((go =<<) . f x)
240
241
242------------------------------------------------------------------------------
243-- | @any predicate stream@ returns 'True' if any element in @stream@ matches
244-- the predicate.
245--
246-- 'any' consumes as few elements as possible, ending consumption if an element
247-- satisfies the predicate.
248--
249-- @
250-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3]
251-- ghci> Streams.'System.IO.Streams.Combinators.any' (> 0) is    -- Consumes one element
252-- True
253-- ghci> Streams.'System.IO.Streams.read' is
254-- Just 2
255-- ghci> Streams.'System.IO.Streams.Combinators.any' even is     -- Only 3 remains
256-- False
257-- @
258any :: (a -> Bool) -> InputStream a -> IO Bool
259any predicate stream = go
260  where
261    go = do
262        mElem <- read stream
263        case mElem of
264            Nothing -> return False
265            Just e  -> if predicate e then return True else go
266
267
268------------------------------------------------------------------------------
269-- | @all predicate stream@ returns 'True' if every element in @stream@ matches
270-- the predicate.
271--
272-- 'all' consumes as few elements as possible, ending consumption if any element
273-- fails the predicate.
274--
275-- @
276-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3]
277-- ghci> Streams.'System.IO.Streams.Combinators.all' (< 0) is    -- Consumes one element
278-- False
279-- ghci> Streams.'System.IO.Streams.read' is
280-- Just 2
281-- ghci> Streams.'System.IO.Streams.Combinators.all' odd is      -- Only 3 remains
282-- True
283-- @
284all :: (a -> Bool) -> InputStream a -> IO Bool
285all predicate stream = go
286  where
287    go = do
288        mElem <- read stream
289        case mElem of
290            Nothing -> return True
291            Just e  -> if predicate e then go else return False
292
293
294------------------------------------------------------------------------------
295-- | @maximum stream@ returns the greatest element in @stream@ or 'Nothing' if
296-- the stream is empty.
297--
298-- 'maximum' consumes the entire stream.
299--
300-- @
301-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3]
302-- ghci> Streams.'System.IO.Streams.Combinators.maximum' is
303-- 3
304-- ghci> Streams.'System.IO.Streams.read' is     -- The stream is now empty
305-- Nothing
306-- @
307maximum :: (Ord a) => InputStream a -> IO (Maybe a)
308maximum stream = do
309    mElem0 <- read stream
310    case mElem0 of
311        Nothing -> return Nothing
312        Just e  -> go e
313  where
314    go oldElem = do
315        mElem <- read stream
316        case mElem of
317            Nothing      -> return (Just oldElem)
318            Just newElem -> go (max oldElem newElem)
319
320
321------------------------------------------------------------------------------
322-- | @minimum stream@ returns the greatest element in @stream@
323--
324-- 'minimum' consumes the entire stream.
325--
326-- @
327-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3]
328-- ghci> Streams.'System.IO.Streams.Combinators.minimum' is
329-- 1
330-- ghci> Streams.'System.IO.Streams.read' is    -- The stream is now empty
331-- Nothing
332-- @
333minimum :: (Ord a) => InputStream a -> IO (Maybe a)
334minimum stream = do
335    mElem0 <- read stream
336    case mElem0 of
337        Nothing -> return Nothing
338        Just e  -> go e
339  where
340    go oldElem = do
341        mElem <- read stream
342        case mElem of
343            Nothing      -> return (Just oldElem)
344            Just newElem -> go (min oldElem newElem)
345
346
347------------------------------------------------------------------------------
348-- | @unfoldM f seed@ builds an 'InputStream' from successively applying @f@ to
349-- the @seed@ value, continuing if @f@ produces 'Just' and halting on
350-- 'Nothing'.
351--
352-- @
353-- ghci> is \<- Streams.'System.IO.Streams.Combinators.unfoldM' (\n -> return $ if n < 3 then Just (n, n + 1) else Nothing) 0
354-- ghci> Streams.'System.IO.Streams.List.toList' is
355-- [0,1,2]
356-- @
357unfoldM :: (b -> IO (Maybe (a, b))) -> b -> IO (InputStream a)
358unfoldM f seed = fromGenerator (go seed)
359  where
360    go oldSeed = do
361       m <- liftIO (f oldSeed)
362       case m of
363           Nothing           -> return $! ()
364           Just (a, newSeed) -> do
365               yield a
366               go newSeed
367
368------------------------------------------------------------------------------
369-- | Maps a pure function over an 'InputStream'.
370--
371-- @map f s@ passes all output from @s@ through the function @f@.
372--
373-- Satisfies the following laws:
374--
375-- @
376-- Streams.'map' (g . f) === Streams.'map' f >=> Streams.'map' g
377-- Streams.'map' 'id' === Streams.'makeInputStream' . Streams.'read'
378-- @
379map :: (a -> b) -> InputStream a -> IO (InputStream b)
380map f s = makeInputStream g
381  where
382    g = read s >>= return . fmap f
383
384
385------------------------------------------------------------------------------
386-- | Maps an impure function over an 'InputStream'.
387--
388-- @mapM f s@ passes all output from @s@ through the IO action @f@.
389--
390-- Satisfies the following laws:
391--
392-- @
393-- Streams.'mapM' (f >=> g) === Streams.'mapM' f >=> Streams.'mapM' g
394-- Streams.'mapM' 'return' === Streams.'makeInputStream' . Streams.'read'
395-- @
396--
397mapM :: (a -> IO b) -> InputStream a -> IO (InputStream b)
398mapM f s = makeInputStream g
399  where
400    g = do
401        mb <- read s >>= maybe (return Nothing)
402                               (\x -> liftM Just $ f x)
403
404        return mb
405
406
407------------------------------------------------------------------------------
408-- | Maps a side effect over an 'InputStream'.
409--
410-- @mapM_ f s@ produces a new input stream that passes all output from @s@
411-- through the side-effecting IO action @f@.
412--
413-- Example:
414--
415-- @
416-- ghci> Streams.'System.IO.Streams.fromList' [1,2,3] >>=
417--       Streams.'mapM_' ('putStrLn' . 'show' . (*2)) >>=
418--       Streams.'System.IO.Streams.toList'
419-- 2
420-- 4
421-- 6
422-- [1,2,3]
423-- @
424--
425mapM_ :: (a -> IO b) -> InputStream a -> IO (InputStream a)
426mapM_ f s = makeInputStream $ do
427    mb <- read s
428    _  <- maybe (return $! ()) (void . f) mb
429    return mb
430
431
432------------------------------------------------------------------------------
433-- | A version of map that discards elements
434--
435-- @mapMaybe f s@ passes all output from @s@ through the function @f@ and
436-- discards elements for which @f s@ evaluates to 'Nothing'.
437--
438-- Example:
439--
440-- @
441-- ghci> Streams.'System.IO.Streams.fromList' [Just 1, None, Just 3] >>=
442--       Streams.'mapMaybe' 'id' >>=
443--       Streams.'System.IO.Streams.toList'
444-- [1,3]
445-- @
446--
447-- /Since: 1.2.1.0/
448mapMaybe :: (a -> Maybe b) -> InputStream a -> IO (InputStream b)
449mapMaybe f src = makeInputStream g
450  where
451    g = do
452      s <- read src
453      case s of
454        Nothing -> return Nothing
455        Just x ->
456          case f x of
457            Nothing -> g
458            y -> return y
459------------------------------------------------------------------------------
460-- | Contravariant counterpart to 'map'.
461--
462-- @contramap f s@ passes all input to @s@ through the function @f@.
463--
464-- Satisfies the following laws:
465--
466-- @
467-- Streams.'contramap' (g . f) === Streams.'contramap' g >=> Streams.'contramap' f
468-- Streams.'contramap' 'id' === 'return'
469-- @
470contramap :: (a -> b) -> OutputStream b -> IO (OutputStream a)
471contramap f s = makeOutputStream $ flip write s . fmap f
472
473
474------------------------------------------------------------------------------
475-- | Contravariant counterpart to 'mapM'.
476--
477-- @contramapM f s@ passes all input to @s@ through the IO action @f@
478--
479-- Satisfies the following laws:
480--
481-- @
482-- Streams.'contramapM' (f >=> g) = Streams.'contramapM' g >=> Streams.'contramapM' f
483-- Streams.'contramapM' 'return' = 'return'
484-- @
485contramapM :: (a -> IO b) -> OutputStream b -> IO (OutputStream a)
486contramapM f s = makeOutputStream g
487  where
488    g Nothing = write Nothing s
489
490    g (Just x) = do
491        !y <- f x
492        write (Just y) s
493
494
495------------------------------------------------------------------------------
496-- | Equivalent to 'mapM_' for output.
497--
498-- @contramapM f s@ passes all input to @s@ through the side-effecting IO
499-- action @f@.
500--
501contramapM_ :: (a -> IO b) -> OutputStream a -> IO (OutputStream a)
502contramapM_ f s = makeOutputStream $ \mb -> do
503    _ <- maybe (return $! ()) (void . f) mb
504    write mb s
505
506
507------------------------------------------------------------------------------
508-- | Contravariant counterpart to 'contramapMaybe'.
509--
510-- @contramap f s@ passes all input to @s@ through the function @f@.
511-- Discards all the elements for which @f@ returns 'Nothing'.
512--
513-- /Since: 1.2.1.0/
514--
515contramapMaybe :: (a -> Maybe b) -> OutputStream b -> IO (OutputStream a)
516contramapMaybe f s = makeOutputStream $ g
517    where
518      g Nothing = write Nothing s
519      g (Just a) =
520        case f a of
521          Nothing -> return ()
522          x -> write x s
523
524
525------------------------------------------------------------------------------
526-- | Drives an 'InputStream' to end-of-stream, discarding all of the yielded
527-- values.
528skipToEof :: InputStream a -> IO ()
529skipToEof str = go
530  where
531    go = read str >>= maybe (return $! ()) (const go)
532{-# INLINE skipToEof #-}
533
534
535------------------------------------------------------------------------------
536-- | Drops chunks from an input stream if they fail to match a given filter
537-- predicate. See 'Prelude.filter'.
538--
539-- Items pushed back to the returned stream are propagated back upstream.
540--
541-- Example:
542--
543-- @
544-- ghci> Streams.'System.IO.Streams.fromList' [\"the\", \"quick\", \"brown\", \"fox\"] >>=
545--       Streams.'filterM' ('return' . (/= \"brown\")) >>= Streams.'System.IO.Streams.toList'
546-- [\"the\",\"quick\",\"fox\"]
547-- @
548filterM :: (a -> IO Bool)
549        -> InputStream a
550        -> IO (InputStream a)
551filterM p src = return $! InputStream prod pb
552  where
553    prod = read src >>= maybe eof chunk
554
555    chunk s = do
556        b <- p s
557        if b then return $! Just s
558             else prod
559
560    eof = return Nothing
561
562    pb s = unRead s src
563
564
565------------------------------------------------------------------------------
566-- | Drops chunks from an input stream if they fail to match a given filter
567-- predicate. See 'Prelude.filter'.
568--
569-- Items pushed back to the returned stream are propagated back upstream.
570--
571-- Example:
572--
573-- @
574-- ghci> Streams.'System.IO.Streams.fromList' [\"the\", \"quick\", \"brown\", \"fox\"] >>=
575--       Streams.'filter' (/= \"brown\") >>= Streams.'System.IO.Streams.toList'
576-- [\"the\",\"quick\",\"fox\"]
577-- @
578filter :: (a -> Bool)
579       -> InputStream a
580       -> IO (InputStream a)
581filter p src = return $! InputStream prod pb
582  where
583    prod = read src >>= maybe eof chunk
584
585    chunk s = do
586        let b = p s
587        if b then return $! Just s
588             else prod
589
590    eof  = return Nothing
591    pb s = unRead s src
592
593
594------------------------------------------------------------------------------
595-- | The function @intersperse v s@ wraps the 'OutputStream' @s@, creating a
596-- new output stream that writes its input to @s@ interspersed with the
597-- provided value @v@. See 'Data.List.intersperse'.
598--
599-- Example:
600--
601-- @
602-- ghci> import Control.Monad ((>=>))
603-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [\"nom\", \"nom\", \"nom\"::'ByteString']
604-- ghci> Streams.'System.IO.Streams.List.outputToList' (Streams.'intersperse' \"burp!\" >=> Streams.'System.IO.Streams.connect' is)
605-- [\"nom\",\"burp!\",\"nom\",\"burp!\",\"nom\"]
606-- @
607intersperse :: a -> OutputStream a -> IO (OutputStream a)
608intersperse sep os = newIORef False >>= makeOutputStream . f
609  where
610    f _ Nothing = write Nothing os
611    f sendRef s    = do
612        b <- readIORef sendRef
613        writeIORef sendRef True
614        when b $ write (Just sep) os
615        write s os
616
617
618------------------------------------------------------------------------------
619-- | Combines two input streams. Continues yielding elements from both input
620-- streams until one of them finishes.
621zip :: InputStream a -> InputStream b -> IO (InputStream (a, b))
622zip src1 src2 = makeInputStream src
623  where
624    src = read src1 >>= (maybe (return Nothing) $ \a ->
625            read src2 >>= (maybe (unRead a src1 >> return Nothing) $ \b ->
626              return $! Just $! (a, b)))
627
628
629------------------------------------------------------------------------------
630-- | Combines two input streams using the supplied function. Continues yielding
631-- elements from both input streams until one of them finishes.
632zipWith :: (a -> b -> c)
633        -> InputStream a
634        -> InputStream b
635        -> IO (InputStream c)
636zipWith f src1 src2 = makeInputStream src
637  where
638    src = read src1 >>= (maybe (return Nothing) $ \a ->
639            read src2 >>= (maybe (unRead a src1 >> return Nothing) $ \b ->
640              return $! Just $! f a b ) )
641
642
643------------------------------------------------------------------------------
644-- | Combines two input streams using the supplied monadic function. Continues
645-- yielding elements from both input streams until one of them finishes.
646zipWithM :: (a -> b -> IO c)
647         -> InputStream a
648         -> InputStream b
649         -> IO (InputStream c)
650zipWithM f src1 src2 = makeInputStream src
651  where
652    src = read src1 >>= (maybe (return Nothing) $ \a ->
653            read src2 >>= (maybe (unRead a src1 >> return Nothing) $ \b ->
654              f a b >>= \c -> return $! Just $! c ) )
655
656
657------------------------------------------------------------------------------
658-- | Filters output to be sent to the given 'OutputStream' using a pure
659-- function. See 'filter'.
660--
661-- Example:
662--
663-- @
664-- ghci> import qualified "Data.ByteString.Char8" as S
665-- ghci> os1 \<- Streams.'System.IO.Streams.stdout' >>= Streams.'System.IO.Streams.unlines
666-- ghci> os2 \<- os1 >>= Streams.'contramap' (S.pack . show) >>= Streams.'filterOutput' even
667-- ghci> Streams.'write' (Just 3) os2
668-- ghci> Streams.'write' (Just 4) os2
669-- 4
670-- @
671{- Note: The example is a lie, because unlines has weird behavior -}
672filterOutput :: (a -> Bool) -> OutputStream a -> IO (OutputStream a)
673filterOutput p output = makeOutputStream chunk
674  where
675    chunk Nothing  = write Nothing output
676    chunk ch@(Just x) = when (p x) $ write ch output
677
678
679------------------------------------------------------------------------------
680-- | Filters output to be sent to the given 'OutputStream' using a predicate
681-- function in IO. See 'filterM'.
682--
683-- Example:
684--
685-- @
686-- ghci> let check a = putStrLn a ("Allow " ++ show a ++ "?") >> readLn :: IO Bool
687-- ghci> import qualified Data.ByteString.Char8 as S
688-- ghci> os1 <- Streams.'System.IO.Streams.unlines' Streams.'System.IO.Streams.stdout'
689-- ghci> os2 \<- os1 >>= Streams.'contramap' (S.pack . show) >>= Streams.'filterOutputM' check
690-- ghci> Streams.'System.IO.Streams.write' (Just 3) os2
691-- Allow 3?
692-- False\<Enter>
693-- ghci> Streams.'System.IO.Streams.write' (Just 4) os2
694-- Allow 4?
695-- True\<Enter>
696-- 4
697-- @
698filterOutputM :: (a -> IO Bool) -> OutputStream a -> IO (OutputStream a)
699filterOutputM p output = makeOutputStream chunk
700  where
701    chunk Nothing  = write Nothing output
702    chunk ch@(Just x) = do
703        b <- p x
704        if b then write ch output else return $! ()
705
706
707------------------------------------------------------------------------------
708-- | Takes apart a stream of pairs, producing a pair of input streams. Reading
709-- from either of the produced streams will cause a pair of values to be pulled
710-- from the original stream if necessary. Note that reading @n@ values from one
711-- of the returned streams will cause @n@ values to be buffered at the other
712-- stream.
713--
714-- Access to the original stream is thread safe, i.e. guarded by a lock.
715unzip :: forall a b . InputStream (a, b) -> IO (InputStream a, InputStream b)
716unzip os = do
717    lock <- newMVar $! ()
718    buf1 <- newIORef id
719    buf2 <- newIORef id
720
721    is1  <- makeInputStream $ src1 lock buf1 buf2
722    is2  <- makeInputStream $ src2 lock buf1 buf2
723
724    return (is1, is2)
725
726  where
727    twist (a,b) = (b,a)
728
729    src1 lock aBuf bBuf = withMVar lock $ const $ do
730        dl <- readIORef aBuf
731        case dl [] of
732          []     -> more os id bBuf
733          (x:xs) -> writeIORef aBuf (xs++) >> (return $! Just x)
734
735    src2 lock aBuf bBuf = withMVar lock $ const $ do
736        dl <- readIORef bBuf
737        case dl [] of
738          []     -> more os twist aBuf
739          (y:ys) -> writeIORef bBuf (ys++) >> (return $! Just y)
740
741    more :: forall a b x y .
742            InputStream (a,b)
743         -> ((a,b) -> (x,y))
744         -> IORef ([y] -> [y])
745         -> IO (Maybe x)
746    more origs proj buf = read origs >>=
747                          maybe (return Nothing)
748                                (\x -> do
749                                    let (a, b) = proj x
750                                    modifyIORef buf (. (b:))
751                                    return $! Just a)
752
753
754------------------------------------------------------------------------------
755-- | Given two 'OutputStream's, returns a new stream that "unzips" the tuples
756-- being written, writing the two elements to the corresponding given streams.
757--
758-- You can use this together with @'contramap' (\\ x -> (x, x))@ to "fork" a
759-- stream into two.
760--
761-- /Since: 1.5.2.0/
762contraunzip :: OutputStream a -> OutputStream b -> IO (OutputStream (a, b))
763contraunzip sink1 sink2 = makeOutputStream $ \ tuple -> do
764    write (fmap fst tuple) sink1
765    write (fmap snd tuple) sink2
766
767
768------------------------------------------------------------------------------
769-- | Wraps an 'InputStream', producing a new 'InputStream' that will produce at
770-- most @n@ items, subsequently yielding end-of-stream forever.
771--
772-- Items pushed back to the returned 'InputStream' will be propagated upstream,
773-- modifying the count of taken items accordingly.
774--
775-- Example:
776--
777-- @
778-- ghci> is <- Streams.'fromList' [1..9::Int]
779-- ghci> is' <- Streams.'take' 1 is
780-- ghci> Streams.'read' is'
781-- Just 1
782-- ghci> Streams.'read' is'
783-- Nothing
784-- ghci> Streams.'System.IO.Streams.peek' is
785-- Just 2
786-- ghci> Streams.'unRead' 11 is'
787-- ghci> Streams.'System.IO.Streams.peek' is
788-- Just 11
789-- ghci> Streams.'System.IO.Streams.peek' is'
790-- Just 11
791-- ghci> Streams.'read' is'
792-- Just 11
793-- ghci> Streams.'read' is'
794-- Nothing
795-- ghci> Streams.'read' is
796-- Just 2
797-- ghci> Streams.'toList' is
798-- [3,4,5,6,7,8,9]
799-- @
800--
801take :: Int64 -> InputStream a -> IO (InputStream a)
802take k0 input = do
803    kref <- newIORef k0
804    return $! InputStream (prod kref) (pb kref)
805  where
806    prod kref = do
807        !k <- readIORef kref
808        if k <= 0
809          then return Nothing
810          else do
811              m <- read input
812              when (isJust m) $ modifyIORef kref $ \x -> x - 1
813              return m
814
815    pb kref !s = do
816       unRead s input
817       modifyIORef kref (+1)
818
819
820------------------------------------------------------------------------------
821-- | Wraps an 'InputStream', producing a new 'InputStream' that will drop the
822-- first @n@ items produced by the wrapped stream. See 'Prelude.drop'.
823--
824-- Items pushed back to the returned 'InputStream' will be propagated upstream,
825-- modifying the count of dropped items accordingly.
826drop :: Int64 -> InputStream a -> IO (InputStream a)
827drop k0 input = do
828    kref <- newIORef k0
829    return $! InputStream (prod kref) (pb kref)
830  where
831    prod kref = do
832        !k <- readIORef kref
833        if k <= 0
834          then getInput kref
835          else discard kref
836
837    getInput kref = do
838        read input >>= maybe (return Nothing) (\c -> do
839            modifyIORef kref (\x -> x - 1)
840            return $! Just c)
841
842    discard kref = getInput kref >>= maybe (return Nothing) (const $ prod kref)
843
844    pb kref s = do
845        unRead s input
846        modifyIORef kref (+1)
847
848
849------------------------------------------------------------------------------
850-- | Wraps an 'OutputStream', producing a new 'OutputStream' that will pass at
851-- most @n@ items on to the wrapped stream, subsequently ignoring the rest of
852-- the input.
853--
854give :: Int64 -> OutputStream a -> IO (OutputStream a)
855give k output = newIORef k >>= makeOutputStream . chunk
856  where
857    chunk ref = maybe (return $! ()) $ \x -> do
858                    !n <- readIORef ref
859                    if n <= 0
860                      then return $! ()
861                      else do
862                          writeIORef ref $! n - 1
863                          write (Just x) output
864
865
866------------------------------------------------------------------------------
867-- | Wraps an 'OutputStream', producing a new 'OutputStream' that will ignore
868-- the first @n@ items received, subsequently passing the rest of the input on
869-- to the wrapped stream.
870--
871ignore :: Int64 -> OutputStream a -> IO (OutputStream a)
872ignore k output = newIORef k >>= makeOutputStream . chunk
873  where
874    chunk ref = maybe (return $! ()) $ \x -> do
875                    !n <- readIORef ref
876                    if n > 0
877                      then writeIORef ref $! n - 1
878                      else write (Just x) output
879
880
881------------------------------------------------------------------------------
882-- | Wraps an 'OutputStream', ignoring any end-of-stream 'Nothing' values
883-- written to the returned stream.
884--
885-- /Since: 1.0.1.0/
886--
887ignoreEof :: OutputStream a -> IO (OutputStream a)
888ignoreEof s = return $ OutputStream f
889  where
890    f Nothing  = return $! ()
891    f x        = write x s
892
893
894------------------------------------------------------------------------------
895-- | Wraps an 'InputStream', running the specified action when the stream
896-- yields end-of-file.
897--
898-- /Since: 1.0.2.0/
899--
900atEndOfInput :: IO b -> InputStream a -> IO (InputStream a)
901atEndOfInput m is = return $! InputStream prod pb
902  where
903    prod    = read is >>= maybe eof (return . Just)
904    eof     = void m >> return Nothing
905    pb s    = unRead s is
906
907
908------------------------------------------------------------------------------
909-- | Wraps an 'OutputStream', running the specified action when the stream
910-- receives end-of-file.
911--
912-- /Since: 1.0.2.0/
913--
914atEndOfOutput :: IO b -> OutputStream a -> IO (OutputStream a)
915atEndOfOutput m os = makeOutputStream f
916  where
917    f Nothing = write Nothing os >> void m
918    f x       = write x os
919