1-- | Generic stream manipulations
2
3{-# LANGUAGE BangPatterns       #-}
4{-# LANGUAGE DeriveDataTypeable #-}
5{-# LANGUAGE RankNTypes         #-}
6
7module System.IO.Streams.Combinators
8 ( -- * Folds
9   inputFoldM
10 , outputFoldM
11 , fold
12 , foldM
13 , fold_
14 , foldM_
15 , any
16 , all
17 , maximum
18 , minimum
19
20   -- * Unfolds
21 , unfoldM
22
23   -- * Maps
24 , map
25 , mapM
26 , mapM_
27 , mapMaybe
28 , contramap
29 , contramapM
30 , contramapM_
31 , contramapMaybe
32
33   -- * Filter
34 , filter
35 , filterM
36 , filterOutput
37 , filterOutputM
38
39   -- * Takes and drops
40 , give
41 , take
42 , drop
43 , ignore
44
45   -- * Zip and unzip
46 , zip
47 , zipWith
48 , zipWithM
49 , unzip
50
51   -- * Utility
52 , intersperse
53 , skipToEof
54 , ignoreEof
55 , atEndOfInput
56 , atEndOfOutput
57 ) where
58
59------------------------------------------------------------------------------
60import           Control.Concurrent.MVar    (newMVar, withMVar)
61import           Control.Monad              (liftM, void, when)
62import           Control.Monad.IO.Class     (liftIO)
63import           Data.Int                   (Int64)
64import           Data.IORef                 (IORef, atomicModifyIORef, modifyIORef, newIORef, readIORef, writeIORef)
65import           Data.Maybe                 (isJust)
66import           Prelude                    hiding (all, any, drop, filter, map, mapM, mapM_, maximum, minimum, read, take, unzip, zip, zipWith)
67------------------------------------------------------------------------------
68import           System.IO.Streams.Internal (InputStream (..), OutputStream (..), fromGenerator, makeInputStream, makeOutputStream, read, unRead, write, yield)
69
70
71------------------------------------------------------------------------------
72-- | A side-effecting fold over an 'OutputStream', as a stream transformer.
73--
74-- The IO action returned by 'outputFoldM' can be used to fetch and reset the updated
75-- seed value. Example:
76--
77-- @
78-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3::Int]
79-- ghci> (os, getList) <- Streams.'System.IO.Streams.List.listOutputStream'
80-- ghci> (os', getSeed) \<- Streams.'outputFoldM' (\\x y -> return (x+y)) 0 os
81-- ghci> Streams.'System.IO.Streams.connect' is os'
82-- ghci> getList
83-- [1,2,3]
84-- ghci> getSeed
85-- 6
86-- @
87outputFoldM :: (a -> b -> IO a)           -- ^ fold function
88            -> a                          -- ^ initial seed
89            -> OutputStream b             -- ^ output stream
90            -> IO (OutputStream b, IO a)  -- ^ returns a new stream as well as
91                                          -- an IO action to fetch and reset the
92                                          --  updated seed value.
93outputFoldM f initial stream = do
94    ref <- newIORef initial
95    os  <- makeOutputStream (wr ref)
96    return (os, fetch ref)
97
98  where
99    wr _ Nothing       = write Nothing stream
100    wr ref mb@(Just x) = do
101        !z  <- readIORef ref
102        !z' <- f z x
103        writeIORef ref z'
104        write mb stream
105
106    fetch ref = atomicModifyIORef ref $ \x -> (initial, x)
107
108
109------------------------------------------------------------------------------
110-- | A side-effecting fold over an 'InputStream', as a stream transformer.
111--
112-- The IO action returned by 'inputFoldM' can be used to fetch and reset the updated seed
113-- value. Example:
114--
115-- @
116-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3::Int]
117-- ghci> (is', getSeed) \<- Streams.'inputFoldM' (\\x y -> return (x+y)) 0 is
118-- ghci> Streams.'System.IO.Streams.List.toList' is'
119-- [1,2,3]
120-- ghci> getSeed
121-- 6
122-- @
123inputFoldM :: (a -> b -> IO a)          -- ^ fold function
124           -> a                         -- ^ initial seed
125           -> InputStream b             -- ^ input stream
126           -> IO (InputStream b, IO a)  -- ^ returns a new stream as well as an
127                                        -- IO action to fetch and reset the
128                                        -- updated seed value.
129inputFoldM f initial stream = do
130    ref <- newIORef initial
131    is  <- makeInputStream (rd ref)
132    return (is, fetch ref)
133
134  where
135    twiddle _ Nothing = return Nothing
136
137    twiddle ref mb@(Just x) = do
138        !z  <- readIORef ref
139        !z' <- f z x
140        writeIORef ref z'
141        return mb
142
143    rd ref = read stream >>= twiddle ref
144
145    fetch ref = atomicModifyIORef ref $ \x -> (initial, x)
146
147
148------------------------------------------------------------------------------
149-- | A left fold over an input stream. The input stream is fully consumed. See
150-- 'Prelude.foldl'.
151--
152-- Example:
153--
154-- @
155-- ghci> Streams.'System.IO.Streams.fromList' [1..10] >>= Streams.'fold' (+) 0
156-- 55
157-- @
158fold :: (s -> a -> s)       -- ^ fold function
159     -> s                   -- ^ initial seed
160     -> InputStream a       -- ^ input stream
161     -> IO s
162fold f seed stream = go seed
163  where
164    go !s = read stream >>= maybe (return s) (go . f s)
165
166
167------------------------------------------------------------------------------
168-- | A side-effecting left fold over an input stream. The input stream is fully
169-- consumed. See 'Prelude.foldl'.
170--
171-- Example:
172--
173-- @
174-- ghci> Streams.'System.IO.Streams.fromList' [1..10] >>= Streams.'foldM' (\x y -> 'return' (x + y)) 0
175-- 55
176-- @
177foldM :: (s -> a -> IO s)       -- ^ fold function
178      -> s                      -- ^ initial seed
179      -> InputStream a          -- ^ input stream
180      -> IO s
181foldM f seed stream = go seed
182  where
183    go !s = read stream >>= maybe (return s) ((go =<<) . f s)
184
185
186------------------------------------------------------------------------------
187-- | A variant of 'System.IO.Streams.fold' suitable for use with composable folds
188-- from \'beautiful folding\' libraries like
189-- <http://hackage.haskell.org/package/foldl the foldl library>.
190-- The input stream is fully consumed.
191--
192-- Example:
193--
194-- @
195-- ghci> let folds = liftA3 (,,) Foldl.length Foldl.mean Foldl.maximum
196-- ghci> Streams.'System.IO.Streams.fromList' [1..10::Double] >>= Foldl.purely Streams.'System.IO.Streams.fold_' folds is
197-- ghci> (10,5.5,Just 10.0)
198-- @
199--
200-- /Since 1.3.6.0/
201--
202fold_ :: (x -> a -> x)    -- ^ accumulator update function
203      -> x                -- ^ initial seed
204      -> (x -> s)         -- ^ recover folded value
205      -> InputStream a    -- ^ input stream
206      -> IO s
207fold_ op seed done stream = liftM done (go seed)
208   where
209     go !s = read stream >>= maybe (return s) (go . op s)
210
211
212------------------------------------------------------------------------------
213-- | A variant of 'System.IO.Streams.foldM' suitable for use with composable folds
214-- from \'beautiful folding\' libraries like
215-- <http://hackage.haskell.org/package/foldl the foldl library>.
216-- The input stream is fully consumed.
217--
218-- Example:
219--
220-- @
221-- ghci> let folds = Foldl.mapM_ print *> Foldl.generalize (liftA2 (,) Foldl.sum Foldl.mean)
222-- ghci> Streams.'System.IO.Streams.fromList' [1..3::Double] >>= Foldl.impurely Streams.'System.IO.Streams.foldM_' folds
223-- 1.0
224-- 2.0
225-- 3.0
226-- (6.0,2.0)
227-- @
228--
229-- /Since 1.3.6.0/
230--
231foldM_ :: (x -> a -> IO x)   -- ^ accumulator update action
232       -> IO x               -- ^ initial seed
233       -> (x -> IO s)        -- ^ recover folded value
234       -> InputStream a      -- ^ input stream
235       -> IO s
236foldM_ f seed done stream = seed >>= go
237  where
238    go !x = read stream >>= maybe (done x) ((go =<<) . f x)
239
240
241------------------------------------------------------------------------------
242-- | @any predicate stream@ returns 'True' if any element in @stream@ matches
243-- the predicate.
244--
245-- 'any' consumes as few elements as possible, ending consumption if an element
246-- satisfies the predicate.
247--
248-- @
249-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3]
250-- ghci> Streams.'System.IO.Streams.Combinators.any' (> 0) is    -- Consumes one element
251-- True
252-- ghci> Streams.'System.IO.Streams.read' is
253-- Just 2
254-- ghci> Streams.'System.IO.Streams.Combinators.any' even is     -- Only 3 remains
255-- False
256-- @
257any :: (a -> Bool) -> InputStream a -> IO Bool
258any predicate stream = go
259  where
260    go = do
261        mElem <- read stream
262        case mElem of
263            Nothing -> return False
264            Just e  -> if predicate e then return True else go
265
266
267------------------------------------------------------------------------------
268-- | @all predicate stream@ returns 'True' if every element in @stream@ matches
269-- the predicate.
270--
271-- 'all' consumes as few elements as possible, ending consumption if any element
272-- fails the predicate.
273--
274-- @
275-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3]
276-- ghci> Streams.'System.IO.Streams.Combinators.all' (< 0) is    -- Consumes one element
277-- False
278-- ghci> Streams.'System.IO.Streams.read' is
279-- Just 2
280-- ghci> Streams.'System.IO.Streams.Combinators.all' odd is      -- Only 3 remains
281-- True
282-- @
283all :: (a -> Bool) -> InputStream a -> IO Bool
284all predicate stream = go
285  where
286    go = do
287        mElem <- read stream
288        case mElem of
289            Nothing -> return True
290            Just e  -> if predicate e then go else return False
291
292
293------------------------------------------------------------------------------
294-- | @maximum stream@ returns the greatest element in @stream@ or 'Nothing' if
295-- the stream is empty.
296--
297-- 'maximum' consumes the entire stream.
298--
299-- @
300-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3]
301-- ghci> Streams.'System.IO.Streams.Combinators.maximum' is
302-- 3
303-- ghci> Streams.'System.IO.Streams.read' is     -- The stream is now empty
304-- Nothing
305-- @
306maximum :: (Ord a) => InputStream a -> IO (Maybe a)
307maximum stream = do
308    mElem0 <- read stream
309    case mElem0 of
310        Nothing -> return Nothing
311        Just e  -> go e
312  where
313    go oldElem = do
314        mElem <- read stream
315        case mElem of
316            Nothing      -> return (Just oldElem)
317            Just newElem -> go (max oldElem newElem)
318
319
320------------------------------------------------------------------------------
321-- | @minimum stream@ returns the greatest element in @stream@
322--
323-- 'minimum' consumes the entire stream.
324--
325-- @
326-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3]
327-- ghci> Streams.'System.IO.Streams.Combinators.minimum' is
328-- 1
329-- ghci> Streams.'System.IO.Streams.read' is    -- The stream is now empty
330-- Nothing
331-- @
332minimum :: (Ord a) => InputStream a -> IO (Maybe a)
333minimum stream = do
334    mElem0 <- read stream
335    case mElem0 of
336        Nothing -> return Nothing
337        Just e  -> go e
338  where
339    go oldElem = do
340        mElem <- read stream
341        case mElem of
342            Nothing      -> return (Just oldElem)
343            Just newElem -> go (min oldElem newElem)
344
345
346------------------------------------------------------------------------------
347-- | @unfoldM f seed@ builds an 'InputStream' from successively applying @f@ to
348-- the @seed@ value, continuing if @f@ produces 'Just' and halting on
349-- 'Nothing'.
350--
351-- @
352-- ghci> is \<- Streams.'System.IO.Streams.Combinators.unfoldM' (\n -> return $ if n < 3 then Just (n, n + 1) else Nothing) 0
353-- ghci> Streams.'System.IO.Streams.List.toList' is
354-- [0,1,2]
355-- @
356unfoldM :: (b -> IO (Maybe (a, b))) -> b -> IO (InputStream a)
357unfoldM f seed = fromGenerator (go seed)
358  where
359    go oldSeed = do
360       m <- liftIO (f oldSeed)
361       case m of
362           Nothing           -> return $! ()
363           Just (a, newSeed) -> do
364               yield a
365               go newSeed
366
367------------------------------------------------------------------------------
368-- | Maps a pure function over an 'InputStream'.
369--
370-- @map f s@ passes all output from @s@ through the function @f@.
371--
372-- Satisfies the following laws:
373--
374-- @
375-- Streams.'map' (g . f) === Streams.'map' f >=> Streams.'map' g
376-- Streams.'map' 'id' === Streams.'makeInputStream' . Streams.'read'
377-- @
378map :: (a -> b) -> InputStream a -> IO (InputStream b)
379map f s = makeInputStream g
380  where
381    g = read s >>= return . fmap f
382
383
384------------------------------------------------------------------------------
385-- | Maps an impure function over an 'InputStream'.
386--
387-- @mapM f s@ passes all output from @s@ through the IO action @f@.
388--
389-- Satisfies the following laws:
390--
391-- @
392-- Streams.'mapM' (f >=> g) === Streams.'mapM' f >=> Streams.'mapM' g
393-- Streams.'mapM' 'return' === Streams.'makeInputStream' . Streams.'read'
394-- @
395--
396mapM :: (a -> IO b) -> InputStream a -> IO (InputStream b)
397mapM f s = makeInputStream g
398  where
399    g = do
400        mb <- read s >>= maybe (return Nothing)
401                               (\x -> liftM Just $ f x)
402
403        return mb
404
405
406------------------------------------------------------------------------------
407-- | Maps a side effect over an 'InputStream'.
408--
409-- @mapM_ f s@ produces a new input stream that passes all output from @s@
410-- through the side-effecting IO action @f@.
411--
412-- Example:
413--
414-- @
415-- ghci> Streams.'System.IO.Streams.fromList' [1,2,3] >>=
416--       Streams.'mapM_' ('putStrLn' . 'show' . (*2)) >>=
417--       Streams.'System.IO.Streams.toList'
418-- 2
419-- 4
420-- 6
421-- [1,2,3]
422-- @
423--
424mapM_ :: (a -> IO b) -> InputStream a -> IO (InputStream a)
425mapM_ f s = makeInputStream $ do
426    mb <- read s
427    _  <- maybe (return $! ()) (void . f) mb
428    return mb
429
430
431------------------------------------------------------------------------------
432-- | A version of map that discards elements
433--
434-- @mapMaybe f s@ passes all output from @s@ through the function @f@ and
435-- discards elements for which @f s@ evaluates to 'Nothing'.
436--
437-- Example:
438--
439-- @
440-- ghci> Streams.'System.IO.Streams.fromList' [Just 1, None, Just 3] >>=
441--       Streams.'mapMaybe' 'id' >>=
442--       Streams.'System.IO.Streams.toList'
443-- [1,3]
444-- @
445--
446-- /Since: 1.2.1.0/
447mapMaybe :: (a -> Maybe b) -> InputStream a -> IO (InputStream b)
448mapMaybe f src = makeInputStream g
449  where
450    g = do
451      s <- read src
452      case s of
453        Nothing -> return Nothing
454        Just x ->
455          case f x of
456            Nothing -> g
457            y -> return y
458------------------------------------------------------------------------------
459-- | Contravariant counterpart to 'map'.
460--
461-- @contramap f s@ passes all input to @s@ through the function @f@.
462--
463-- Satisfies the following laws:
464--
465-- @
466-- Streams.'contramap' (g . f) === Streams.'contramap' g >=> Streams.'contramap' f
467-- Streams.'contramap' 'id' === 'return'
468-- @
469contramap :: (a -> b) -> OutputStream b -> IO (OutputStream a)
470contramap f s = makeOutputStream $ flip write s . fmap f
471
472
473------------------------------------------------------------------------------
474-- | Contravariant counterpart to 'mapM'.
475--
476-- @contramapM f s@ passes all input to @s@ through the IO action @f@
477--
478-- Satisfies the following laws:
479--
480-- @
481-- Streams.'contramapM' (f >=> g) = Streams.'contramapM' g >=> Streams.'contramapM' f
482-- Streams.'contramapM' 'return' = 'return'
483-- @
484contramapM :: (a -> IO b) -> OutputStream b -> IO (OutputStream a)
485contramapM f s = makeOutputStream g
486  where
487    g Nothing = write Nothing s
488
489    g (Just x) = do
490        !y <- f x
491        write (Just y) s
492
493
494------------------------------------------------------------------------------
495-- | Equivalent to 'mapM_' for output.
496--
497-- @contramapM f s@ passes all input to @s@ through the side-effecting IO
498-- action @f@.
499--
500contramapM_ :: (a -> IO b) -> OutputStream a -> IO (OutputStream a)
501contramapM_ f s = makeOutputStream $ \mb -> do
502    _ <- maybe (return $! ()) (void . f) mb
503    write mb s
504
505
506------------------------------------------------------------------------------
507-- | Contravariant counterpart to 'contramapMaybe'.
508--
509-- @contramap f s@ passes all input to @s@ through the function @f@.
510-- Discards all the elements for which @f@ returns 'Nothing'.
511--
512-- /Since: 1.2.1.0/
513--
514contramapMaybe :: (a -> Maybe b) -> OutputStream b -> IO (OutputStream a)
515contramapMaybe f s = makeOutputStream $ g
516    where
517      g Nothing = write Nothing s
518      g (Just a) =
519        case f a of
520          Nothing -> return ()
521          x -> write x s
522
523
524------------------------------------------------------------------------------
525-- | Drives an 'InputStream' to end-of-stream, discarding all of the yielded
526-- values.
527skipToEof :: InputStream a -> IO ()
528skipToEof str = go
529  where
530    go = read str >>= maybe (return $! ()) (const go)
531{-# INLINE skipToEof #-}
532
533
534------------------------------------------------------------------------------
535-- | Drops chunks from an input stream if they fail to match a given filter
536-- predicate. See 'Prelude.filter'.
537--
538-- Items pushed back to the returned stream are propagated back upstream.
539--
540-- Example:
541--
542-- @
543-- ghci> Streams.'System.IO.Streams.fromList' [\"the\", \"quick\", \"brown\", \"fox\"] >>=
544--       Streams.'filterM' ('return' . (/= \"brown\")) >>= Streams.'System.IO.Streams.toList'
545-- [\"the\",\"quick\",\"fox\"]
546-- @
547filterM :: (a -> IO Bool)
548        -> InputStream a
549        -> IO (InputStream a)
550filterM p src = return $! InputStream prod pb
551  where
552    prod = read src >>= maybe eof chunk
553
554    chunk s = do
555        b <- p s
556        if b then return $! Just s
557             else prod
558
559    eof = return Nothing
560
561    pb s = unRead s src
562
563
564------------------------------------------------------------------------------
565-- | Drops chunks from an input stream if they fail to match a given filter
566-- predicate. See 'Prelude.filter'.
567--
568-- Items pushed back to the returned stream are propagated back upstream.
569--
570-- Example:
571--
572-- @
573-- ghci> Streams.'System.IO.Streams.fromList' [\"the\", \"quick\", \"brown\", \"fox\"] >>=
574--       Streams.'filter' (/= \"brown\") >>= Streams.'System.IO.Streams.toList'
575-- [\"the\",\"quick\",\"fox\"]
576-- @
577filter :: (a -> Bool)
578       -> InputStream a
579       -> IO (InputStream a)
580filter p src = return $! InputStream prod pb
581  where
582    prod = read src >>= maybe eof chunk
583
584    chunk s = do
585        let b = p s
586        if b then return $! Just s
587             else prod
588
589    eof  = return Nothing
590    pb s = unRead s src
591
592
593------------------------------------------------------------------------------
594-- | The function @intersperse v s@ wraps the 'OutputStream' @s@, creating a
595-- new output stream that writes its input to @s@ interspersed with the
596-- provided value @v@. See 'Data.List.intersperse'.
597--
598-- Example:
599--
600-- @
601-- ghci> import Control.Monad ((>=>))
602-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [\"nom\", \"nom\", \"nom\"::'ByteString']
603-- ghci> Streams.'System.IO.Streams.List.outputToList' (Streams.'intersperse' \"burp!\" >=> Streams.'System.IO.Streams.connect' is)
604-- [\"nom\",\"burp!\",\"nom\",\"burp!\",\"nom\"]
605-- @
606intersperse :: a -> OutputStream a -> IO (OutputStream a)
607intersperse sep os = newIORef False >>= makeOutputStream . f
608  where
609    f _ Nothing = write Nothing os
610    f sendRef s    = do
611        b <- readIORef sendRef
612        writeIORef sendRef True
613        when b $ write (Just sep) os
614        write s os
615
616
617------------------------------------------------------------------------------
618-- | Combines two input streams. Continues yielding elements from both input
619-- streams until one of them finishes.
620zip :: InputStream a -> InputStream b -> IO (InputStream (a, b))
621zip src1 src2 = makeInputStream src
622  where
623    src = read src1 >>= (maybe (return Nothing) $ \a ->
624            read src2 >>= (maybe (unRead a src1 >> return Nothing) $ \b ->
625              return $! Just $! (a, b)))
626
627
628------------------------------------------------------------------------------
629-- | Combines two input streams using the supplied function. Continues yielding
630-- elements from both input streams until one of them finishes.
631zipWith :: (a -> b -> c)
632        -> InputStream a
633        -> InputStream b
634        -> IO (InputStream c)
635zipWith f src1 src2 = makeInputStream src
636  where
637    src = read src1 >>= (maybe (return Nothing) $ \a ->
638            read src2 >>= (maybe (unRead a src1 >> return Nothing) $ \b ->
639              return $! Just $! f a b ) )
640
641
642------------------------------------------------------------------------------
643-- | Combines two input streams using the supplied monadic function. Continues
644-- yielding elements from both input streams until one of them finishes.
645zipWithM :: (a -> b -> IO c)
646         -> InputStream a
647         -> InputStream b
648         -> IO (InputStream c)
649zipWithM f src1 src2 = makeInputStream src
650  where
651    src = read src1 >>= (maybe (return Nothing) $ \a ->
652            read src2 >>= (maybe (unRead a src1 >> return Nothing) $ \b ->
653              f a b >>= \c -> return $! Just $! c ) )
654
655
656------------------------------------------------------------------------------
657-- | Filters output to be sent to the given 'OutputStream' using a pure
658-- function. See 'filter'.
659--
660-- Example:
661--
662-- @
663-- ghci> import qualified "Data.ByteString.Char8" as S
664-- ghci> os1 \<- Streams.'System.IO.Streams.stdout' >>= Streams.'System.IO.Streams.unlines
665-- ghci> os2 \<- os1 >>= Streams.'contramap' (S.pack . show) >>= Streams.'filterOutput' even
666-- ghci> Streams.'write' (Just 3) os2
667-- ghci> Streams.'write' (Just 4) os2
668-- 4
669-- @
670{- Note: The example is a lie, because unlines has weird behavior -}
671filterOutput :: (a -> Bool) -> OutputStream a -> IO (OutputStream a)
672filterOutput p output = makeOutputStream chunk
673  where
674    chunk Nothing  = write Nothing output
675    chunk ch@(Just x) = when (p x) $ write ch output
676
677
678------------------------------------------------------------------------------
679-- | Filters output to be sent to the given 'OutputStream' using a predicate
680-- function in IO. See 'filterM'.
681--
682-- Example:
683--
684-- @
685-- ghci> let check a = putStrLn a ("Allow " ++ show a ++ "?") >> readLn :: IO Bool
686-- ghci> import qualified Data.ByteString.Char8 as S
687-- ghci> os1 <- Streams.'System.IO.Streams.unlines' Streams.'System.IO.Streams.stdout'
688-- ghci> os2 \<- os1 >>= Streams.'contramap' (S.pack . show) >>= Streams.'filterOutputM' check
689-- ghci> Streams.'System.IO.Streams.write' (Just 3) os2
690-- Allow 3?
691-- False\<Enter>
692-- ghci> Streams.'System.IO.Streams.write' (Just 4) os2
693-- Allow 4?
694-- True\<Enter>
695-- 4
696-- @
697filterOutputM :: (a -> IO Bool) -> OutputStream a -> IO (OutputStream a)
698filterOutputM p output = makeOutputStream chunk
699  where
700    chunk Nothing  = write Nothing output
701    chunk ch@(Just x) = do
702        b <- p x
703        if b then write ch output else return $! ()
704
705
706------------------------------------------------------------------------------
707-- | Takes apart a stream of pairs, producing a pair of input streams. Reading
708-- from either of the produced streams will cause a pair of values to be pulled
709-- from the original stream if necessary. Note that reading @n@ values from one
710-- of the returned streams will cause @n@ values to be buffered at the other
711-- stream.
712--
713-- Access to the original stream is thread safe, i.e. guarded by a lock.
714unzip :: forall a b . InputStream (a, b) -> IO (InputStream a, InputStream b)
715unzip os = do
716    lock <- newMVar $! ()
717    buf1 <- newIORef id
718    buf2 <- newIORef id
719
720    is1  <- makeInputStream $ src1 lock buf1 buf2
721    is2  <- makeInputStream $ src2 lock buf1 buf2
722
723    return (is1, is2)
724
725  where
726    twist (a,b) = (b,a)
727
728    src1 lock aBuf bBuf = withMVar lock $ const $ do
729        dl <- readIORef aBuf
730        case dl [] of
731          []     -> more os id bBuf
732          (x:xs) -> writeIORef aBuf (xs++) >> (return $! Just x)
733
734    src2 lock aBuf bBuf = withMVar lock $ const $ do
735        dl <- readIORef bBuf
736        case dl [] of
737          []     -> more os twist aBuf
738          (y:ys) -> writeIORef bBuf (ys++) >> (return $! Just y)
739
740    more :: forall a b x y .
741            InputStream (a,b)
742         -> ((a,b) -> (x,y))
743         -> IORef ([y] -> [y])
744         -> IO (Maybe x)
745    more origs proj buf = read origs >>=
746                          maybe (return Nothing)
747                                (\x -> do
748                                    let (a, b) = proj x
749                                    modifyIORef buf (. (b:))
750                                    return $! Just a)
751
752
753------------------------------------------------------------------------------
754-- | Wraps an 'InputStream', producing a new 'InputStream' that will produce at
755-- most @n@ items, subsequently yielding end-of-stream forever.
756--
757-- Items pushed back to the returned 'InputStream' will be propagated upstream,
758-- modifying the count of taken items accordingly.
759--
760-- Example:
761--
762-- @
763-- ghci> is <- Streams.'fromList' [1..9::Int]
764-- ghci> is' <- Streams.'take' 1 is
765-- ghci> Streams.'read' is'
766-- Just 1
767-- ghci> Streams.'read' is'
768-- Nothing
769-- ghci> Streams.'System.IO.Streams.peek' is
770-- Just 2
771-- ghci> Streams.'unRead' 11 is'
772-- ghci> Streams.'System.IO.Streams.peek' is
773-- Just 11
774-- ghci> Streams.'System.IO.Streams.peek' is'
775-- Just 11
776-- ghci> Streams.'read' is'
777-- Just 11
778-- ghci> Streams.'read' is'
779-- Nothing
780-- ghci> Streams.'read' is
781-- Just 2
782-- ghci> Streams.'toList' is
783-- [3,4,5,6,7,8,9]
784-- @
785--
786take :: Int64 -> InputStream a -> IO (InputStream a)
787take k0 input = do
788    kref <- newIORef k0
789    return $! InputStream (prod kref) (pb kref)
790  where
791    prod kref = do
792        !k <- readIORef kref
793        if k <= 0
794          then return Nothing
795          else do
796              m <- read input
797              when (isJust m) $ modifyIORef kref $ \x -> x - 1
798              return m
799
800    pb kref !s = do
801       unRead s input
802       modifyIORef kref (+1)
803
804
805------------------------------------------------------------------------------
806-- | Wraps an 'InputStream', producing a new 'InputStream' that will drop the
807-- first @n@ items produced by the wrapped stream. See 'Prelude.drop'.
808--
809-- Items pushed back to the returned 'InputStream' will be propagated upstream,
810-- modifying the count of dropped items accordingly.
811drop :: Int64 -> InputStream a -> IO (InputStream a)
812drop k0 input = do
813    kref <- newIORef k0
814    return $! InputStream (prod kref) (pb kref)
815  where
816    prod kref = do
817        !k <- readIORef kref
818        if k <= 0
819          then getInput kref
820          else discard kref
821
822    getInput kref = do
823        read input >>= maybe (return Nothing) (\c -> do
824            modifyIORef kref (\x -> x - 1)
825            return $! Just c)
826
827    discard kref = getInput kref >>= maybe (return Nothing) (const $ prod kref)
828
829    pb kref s = do
830        unRead s input
831        modifyIORef kref (+1)
832
833
834------------------------------------------------------------------------------
835-- | Wraps an 'OutputStream', producing a new 'OutputStream' that will pass at
836-- most @n@ items on to the wrapped stream, subsequently ignoring the rest of
837-- the input.
838--
839give :: Int64 -> OutputStream a -> IO (OutputStream a)
840give k output = newIORef k >>= makeOutputStream . chunk
841  where
842    chunk ref = maybe (return $! ()) $ \x -> do
843                    !n <- readIORef ref
844                    if n <= 0
845                      then return $! ()
846                      else do
847                          writeIORef ref $! n - 1
848                          write (Just x) output
849
850
851------------------------------------------------------------------------------
852-- | Wraps an 'OutputStream', producing a new 'OutputStream' that will ignore
853-- the first @n@ items received, subsequently passing the rest of the input on
854-- to the wrapped stream.
855--
856ignore :: Int64 -> OutputStream a -> IO (OutputStream a)
857ignore k output = newIORef k >>= makeOutputStream . chunk
858  where
859    chunk ref = maybe (return $! ()) $ \x -> do
860                    !n <- readIORef ref
861                    if n > 0
862                      then writeIORef ref $! n - 1
863                      else write (Just x) output
864
865
866------------------------------------------------------------------------------
867-- | Wraps an 'OutputStream', ignoring any end-of-stream 'Nothing' values
868-- written to the returned stream.
869--
870-- /Since: 1.0.1.0/
871--
872ignoreEof :: OutputStream a -> IO (OutputStream a)
873ignoreEof s = return $ OutputStream f
874  where
875    f Nothing  = return $! ()
876    f x        = write x s
877
878
879------------------------------------------------------------------------------
880-- | Wraps an 'InputStream', running the specified action when the stream
881-- yields end-of-file.
882--
883-- /Since: 1.0.2.0/
884--
885atEndOfInput :: IO b -> InputStream a -> IO (InputStream a)
886atEndOfInput m is = return $! InputStream prod pb
887  where
888    prod    = read is >>= maybe eof (return . Just)
889    eof     = void m >> return Nothing
890    pb s    = unRead s is
891
892
893------------------------------------------------------------------------------
894-- | Wraps an 'OutputStream', running the specified action when the stream
895-- receives end-of-file.
896--
897-- /Since: 1.0.2.0/
898--
899atEndOfOutput :: IO b -> OutputStream a -> IO (OutputStream a)
900atEndOfOutput m os = makeOutputStream f
901  where
902    f Nothing = write Nothing os >> void m
903    f x       = write x os
904