1{- Metered IO and actions
2 -
3 - Copyright 2012-2021 Joey Hess <id@joeyh.name>
4 -
5 - License: BSD-2-clause
6 -}
7
8{-# LANGUAGE TypeSynonymInstances, BangPatterns #-}
9
10module Utility.Metered (
11	MeterUpdate,
12	MeterState(..),
13	nullMeterUpdate,
14	combineMeterUpdate,
15	TotalSize(..),
16	BytesProcessed(..),
17	toBytesProcessed,
18	fromBytesProcessed,
19	addBytesProcessed,
20	zeroBytesProcessed,
21	withMeteredFile,
22	meteredWrite,
23	meteredWrite',
24	meteredWriteFile,
25	offsetMeterUpdate,
26	hGetContentsMetered,
27	hGetMetered,
28	defaultChunkSize,
29	watchFileSize,
30	OutputHandler(..),
31	ProgressParser,
32	commandMeter,
33	commandMeter',
34	commandMeterExitCode,
35	commandMeterExitCode',
36	demeterCommand,
37	demeterCommandEnv,
38	avoidProgress,
39	rateLimitMeterUpdate,
40	Meter,
41	mkMeter,
42	setMeterTotalSize,
43	updateMeter,
44	displayMeterHandle,
45	clearMeterHandle,
46	bandwidthMeter,
47) where
48
49import Common
50import Utility.Percentage
51import Utility.DataUnits
52import Utility.HumanTime
53import Utility.SimpleProtocol as Proto
54
55import qualified Data.ByteString.Lazy as L
56import qualified Data.ByteString as S
57import System.IO.Unsafe
58import Foreign.Storable (Storable(sizeOf))
59import System.Posix.Types
60import Data.Int
61import Control.Concurrent
62import Control.Concurrent.Async
63import Control.Monad.IO.Class (MonadIO)
64import Data.Time.Clock
65import Data.Time.Clock.POSIX
66
67{- An action that can be run repeatedly, updating it on the bytes processed.
68 -
69 - Note that each call receives the total number of bytes processed, so
70 - far, *not* an incremental amount since the last call. -}
71type MeterUpdate = (BytesProcessed -> IO ())
72
73nullMeterUpdate :: MeterUpdate
74nullMeterUpdate _ = return ()
75
76combineMeterUpdate :: MeterUpdate -> MeterUpdate -> MeterUpdate
77combineMeterUpdate a b = \n -> a n >> b n
78
79{- Total number of bytes processed so far. -}
80newtype BytesProcessed = BytesProcessed Integer
81	deriving (Eq, Ord, Show, Read)
82
83class AsBytesProcessed a where
84	toBytesProcessed :: a -> BytesProcessed
85	fromBytesProcessed :: BytesProcessed -> a
86
87instance AsBytesProcessed BytesProcessed where
88	toBytesProcessed = id
89	fromBytesProcessed = id
90
91instance AsBytesProcessed Integer where
92	toBytesProcessed i = BytesProcessed i
93	fromBytesProcessed (BytesProcessed i) = i
94
95instance AsBytesProcessed Int where
96	toBytesProcessed i = BytesProcessed $ toInteger i
97	fromBytesProcessed (BytesProcessed i) = fromInteger i
98
99instance AsBytesProcessed Int64 where
100	toBytesProcessed i = BytesProcessed $ toInteger i
101	fromBytesProcessed (BytesProcessed i) = fromInteger i
102
103instance AsBytesProcessed FileOffset where
104	toBytesProcessed sz = BytesProcessed $ toInteger sz
105	fromBytesProcessed (BytesProcessed sz) = fromInteger sz
106
107addBytesProcessed :: AsBytesProcessed v => BytesProcessed -> v -> BytesProcessed
108addBytesProcessed (BytesProcessed i) v =
109	let (BytesProcessed n) = toBytesProcessed v
110	in BytesProcessed $! i + n
111
112zeroBytesProcessed :: BytesProcessed
113zeroBytesProcessed = BytesProcessed 0
114
115{- Sends the content of a file to an action, updating the meter as it's
116 - consumed. -}
117withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a
118withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h ->
119	hGetContentsMetered h meterupdate >>= a
120
121{- Calls the action repeatedly with chunks from the lazy ByteString.
122 - Updates the meter after each chunk is processed. -}
123meteredWrite :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO ()
124meteredWrite meterupdate a = void . meteredWrite' meterupdate a
125
126meteredWrite' :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO BytesProcessed
127meteredWrite' meterupdate a = go zeroBytesProcessed . L.toChunks
128  where
129	go sofar [] = return sofar
130	go sofar (c:cs) = do
131		a c
132		let !sofar' = addBytesProcessed sofar $ S.length c
133		meterupdate sofar'
134		go sofar' cs
135
136meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO ()
137meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h ->
138	meteredWrite meterupdate (S.hPut h) b
139
140{- Applies an offset to a MeterUpdate. This can be useful when
141 - performing a sequence of actions, such as multiple meteredWriteFiles,
142 - that all update a common meter progressively. Or when resuming.
143 -}
144offsetMeterUpdate :: MeterUpdate -> BytesProcessed -> MeterUpdate
145offsetMeterUpdate base offset = \n -> base (offset `addBytesProcessed` n)
146
147{- This is like L.hGetContents, but after each chunk is read, a meter
148 - is updated based on the size of the chunk.
149 -
150 - All the usual caveats about using unsafeInterleaveIO apply to the
151 - meter updates, so use caution.
152 -}
153hGetContentsMetered :: Handle -> MeterUpdate -> IO L.ByteString
154hGetContentsMetered h = hGetMetered h Nothing
155
156{- Reads from the Handle, updating the meter after each chunk is read.
157 -
158 - Stops at EOF, or when the requested number of bytes have been read.
159 - Closes the Handle at EOF, but otherwise leaves it open.
160 -
161 - Note that the meter update is run in unsafeInterleaveIO, which means that
162 - it can be run at any time. It's even possible for updates to run out
163 - of order, as different parts of the ByteString are consumed.
164 -}
165hGetMetered :: Handle -> Maybe Integer -> MeterUpdate -> IO L.ByteString
166hGetMetered h wantsize meterupdate = lazyRead zeroBytesProcessed
167  where
168	lazyRead sofar = unsafeInterleaveIO $ loop sofar
169
170	loop sofar = do
171		c <- S.hGet h (nextchunksize (fromBytesProcessed sofar))
172		if S.null c
173			then do
174				when (wantsize /= Just 0) $
175					hClose h
176				return L.empty
177			else do
178				let !sofar' = addBytesProcessed sofar (S.length c)
179				meterupdate sofar'
180				if keepgoing (fromBytesProcessed sofar')
181					then do
182						{- unsafeInterleaveIO causes this to be
183						 - deferred until the data is read from the
184						 - ByteString. -}
185						cs <- lazyRead sofar'
186						return $ L.append (L.fromChunks [c]) cs
187					else return $ L.fromChunks [c]
188
189	keepgoing n = case wantsize of
190		Nothing -> True
191		Just sz -> n < sz
192
193	nextchunksize n = case wantsize of
194		Nothing -> defaultChunkSize
195		Just sz ->
196			let togo = sz - n
197			in if togo < toInteger defaultChunkSize
198				then fromIntegral togo
199				else defaultChunkSize
200
201{- Same default chunk size Lazy ByteStrings use. -}
202defaultChunkSize :: Int
203defaultChunkSize = 32 * k - chunkOverhead
204  where
205	k = 1024
206	chunkOverhead = 2 * sizeOf (1 :: Int) -- GHC specific
207
208{- Runs an action, watching a file as it grows and updating the meter.
209 -
210 - The file may already exist, and the action could throw the original file
211 - away and start over. To avoid reporting the original file size followed
212 - by a smaller size in that case, wait until the file starts growing
213 - before updating the meter for the first time.
214 -}
215watchFileSize :: (MonadIO m, MonadMask m) => FilePath -> MeterUpdate -> m a -> m a
216watchFileSize f p a = bracket
217	(liftIO $ forkIO $ watcher =<< getsz)
218	(liftIO . void . tryIO . killThread)
219	(const a)
220  where
221	watcher oldsz = do
222		threadDelay 500000 -- 0.5 seconds
223		sz <- getsz
224		when (sz > oldsz) $
225			p sz
226		watcher sz
227	getsz = catchDefaultIO zeroBytesProcessed $
228		toBytesProcessed <$> getFileSize f'
229	f' = toRawFilePath f
230
231data OutputHandler = OutputHandler
232	{ quietMode :: Bool
233	, stderrHandler :: String -> IO ()
234	}
235
236{- Parses the String looking for a command's progress output, and returns
237 - Maybe the number of bytes done so far, optionally a total size,
238 - and any any remainder of the string that could be an incomplete
239 - progress output. That remainder should be prepended to future output,
240 - and fed back in. This interface allows the command's output to be read
241 - in any desired size chunk, or even one character at a time.
242 -}
243type ProgressParser = String -> (Maybe BytesProcessed, Maybe TotalSize, String)
244
245newtype TotalSize = TotalSize Integer
246	deriving (Show, Eq)
247
248{- Runs a command and runs a ProgressParser on its output, in order
249 - to update a meter.
250 -
251 - If the Meter is provided, the ProgressParser can report the total size,
252 - which allows creating a Meter before the size is known.
253 -}
254commandMeter :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO Bool
255commandMeter progressparser oh meter meterupdate cmd params =
256	commandMeter' progressparser oh meter meterupdate cmd params id
257
258commandMeter' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO Bool
259commandMeter' progressparser oh meter meterupdate cmd params mkprocess = do
260	ret <- commandMeterExitCode' progressparser oh meter meterupdate cmd params mkprocess
261	return $ case ret of
262		Just ExitSuccess -> True
263		_ -> False
264
265commandMeterExitCode :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO (Maybe ExitCode)
266commandMeterExitCode progressparser oh meter meterupdate cmd params =
267	commandMeterExitCode' progressparser oh meter meterupdate cmd params id
268
269commandMeterExitCode' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO (Maybe ExitCode)
270commandMeterExitCode' progressparser oh mmeter meterupdate cmd params mkprocess =
271	outputFilter cmd params mkprocess Nothing
272		(const $ feedprogress mmeter zeroBytesProcessed [])
273		handlestderr
274  where
275	feedprogress sendtotalsize prev buf h = do
276		b <- S.hGetSome h 80
277		if S.null b
278			then return ()
279			else do
280				unless (quietMode oh) $ do
281					S.hPut stdout b
282					hFlush stdout
283				let s = decodeBS b
284				let (mbytes, mtotalsize, buf') = progressparser (buf++s)
285				sendtotalsize' <- case (sendtotalsize, mtotalsize) of
286					(Just meter, Just t) -> do
287						setMeterTotalSize meter t
288						return Nothing
289					_ -> return sendtotalsize
290				case mbytes of
291					Nothing -> feedprogress sendtotalsize' prev buf' h
292					(Just bytes) -> do
293						when (bytes /= prev) $
294							meterupdate bytes
295						feedprogress sendtotalsize' bytes buf' h
296
297	handlestderr ph h = hGetLineUntilExitOrEOF ph h >>= \case
298		Just l -> do
299			stderrHandler oh l
300			handlestderr ph h
301		Nothing -> return ()
302
303{- Runs a command, that may display one or more progress meters on
304 - either stdout or stderr, and prevents the meters from being displayed.
305 -
306 - The other command output is handled as configured by the OutputHandler.
307 -}
308demeterCommand :: OutputHandler -> FilePath -> [CommandParam] -> IO Bool
309demeterCommand oh cmd params = demeterCommandEnv oh cmd params Nothing
310
311demeterCommandEnv :: OutputHandler -> FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO Bool
312demeterCommandEnv oh cmd params environ = do
313	ret <- outputFilter cmd params id environ
314		(\ph outh -> avoidProgress True ph outh stdouthandler)
315		(\ph errh -> avoidProgress True ph errh $ stderrHandler oh)
316	return $ case ret of
317		Just ExitSuccess -> True
318		_ -> False
319  where
320	stdouthandler l =
321		unless (quietMode oh) $
322			putStrLn l
323
324{- To suppress progress output, while displaying other messages,
325 - filter out lines that contain \r (typically used to reset to the
326 - beginning of the line when updating a progress display).
327 -}
328avoidProgress :: Bool -> ProcessHandle -> Handle -> (String -> IO ()) -> IO ()
329avoidProgress doavoid ph h emitter = hGetLineUntilExitOrEOF ph h >>= \case
330	Just s -> do
331		unless (doavoid && '\r' `elem` s) $
332			emitter s
333		avoidProgress doavoid ph h emitter
334	Nothing -> return ()
335
336outputFilter
337	:: FilePath
338	-> [CommandParam]
339	-> (CreateProcess -> CreateProcess)
340	-> Maybe [(String, String)]
341	-> (ProcessHandle -> Handle -> IO ())
342	-> (ProcessHandle -> Handle -> IO ())
343	-> IO (Maybe ExitCode)
344outputFilter cmd params mkprocess environ outfilter errfilter =
345	catchMaybeIO $ withCreateProcess p go
346  where
347	go _ (Just outh) (Just errh) ph = do
348		outt <- async $ tryIO (outfilter ph outh) >> hClose outh
349		errt <- async $ tryIO (errfilter ph errh) >> hClose errh
350		ret <- waitForProcess ph
351		wait outt
352		wait errt
353		return ret
354	go _ _ _ _ = error "internal"
355
356	p = mkprocess (proc cmd (toCommand params))
357		{ env = environ
358		, std_out = CreatePipe
359		, std_err = CreatePipe
360		}
361
362-- | Limit a meter to only update once per unit of time.
363--
364-- It's nice to display the final update to 100%, even if it comes soon
365-- after a previous update. To make that happen, the Meter has to know
366-- its total size.
367rateLimitMeterUpdate :: NominalDiffTime -> Meter -> MeterUpdate -> IO MeterUpdate
368rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do
369	lastupdate <- newMVar (toEnum 0 :: POSIXTime)
370	return $ mu lastupdate
371  where
372	mu lastupdate n@(BytesProcessed i) = readMVar totalsizev >>= \case
373		Just (TotalSize t) | i >= t -> meterupdate n
374		_ -> do
375			now <- getPOSIXTime
376			prev <- takeMVar lastupdate
377			if now - prev >= delta
378				then do
379					putMVar lastupdate now
380					meterupdate n
381				else putMVar lastupdate prev
382
383data Meter = Meter (MVar (Maybe TotalSize)) (MVar MeterState) (MVar String) DisplayMeter
384
385data MeterState = MeterState
386	{ meterBytesProcessed :: BytesProcessed
387	, meterTimeStamp :: POSIXTime
388	} deriving (Show)
389
390type DisplayMeter = MVar String -> Maybe TotalSize -> MeterState -> MeterState -> IO ()
391
392type RenderMeter = Maybe TotalSize -> MeterState -> MeterState -> String
393
394-- | Make a meter. Pass the total size, if it's known.
395mkMeter :: Maybe TotalSize -> DisplayMeter -> IO Meter
396mkMeter totalsize displaymeter = do
397	ts <- getPOSIXTime
398	Meter
399		<$> newMVar totalsize
400		<*> newMVar (MeterState zeroBytesProcessed ts)
401		<*> newMVar ""
402		<*> pure displaymeter
403
404setMeterTotalSize :: Meter -> TotalSize -> IO ()
405setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just
406
407-- | Updates the meter, displaying it if necessary.
408updateMeter :: Meter -> MeterUpdate
409updateMeter (Meter totalsizev sv bv displaymeter) new = do
410	now <- getPOSIXTime
411	let curms = MeterState new now
412	oldms <- swapMVar sv curms
413	when (meterBytesProcessed oldms /= new) $ do
414		totalsize <- readMVar totalsizev
415		displaymeter bv totalsize oldms curms
416
417-- | Display meter to a Handle.
418displayMeterHandle :: Handle -> RenderMeter -> DisplayMeter
419displayMeterHandle h rendermeter v msize old new = do
420	let s = rendermeter msize old new
421	olds <- swapMVar v s
422	-- Avoid writing when the rendered meter has not changed.
423	when (olds /= s) $ do
424		let padding = replicate (length olds - length s) ' '
425		hPutStr h ('\r':s ++ padding)
426		hFlush h
427
428-- | Clear meter displayed by displayMeterHandle. May be called before
429-- outputting something else, followed by more calls to displayMeterHandle.
430clearMeterHandle :: Meter -> Handle -> IO ()
431clearMeterHandle (Meter _ _ v _) h = do
432	olds <- readMVar v
433	hPutStr h $ '\r' : replicate (length olds) ' ' ++ "\r"
434	hFlush h
435
436-- | Display meter in the form:
437--   10%  1.3MiB  300 KiB/s 16m40s
438-- or when total size is not known:
439--   1.3 MiB      300 KiB/s
440bandwidthMeter :: RenderMeter
441bandwidthMeter mtotalsize (MeterState (BytesProcessed old) before) (MeterState (BytesProcessed new) now) =
442	unwords $ catMaybes
443		[ Just percentamount
444		-- Pad enough for max width: "100%  xxxx.xx KiB  xxxx KiB/s"
445		, Just $ replicate (29 - length percentamount - length rate) ' '
446		, Just rate
447		, estimatedcompletion
448		]
449  where
450	amount = roughSize' memoryUnits True 2 new
451	percentamount = case mtotalsize of
452		Just (TotalSize totalsize) ->
453			let p = showPercentage 0 $
454				percentage totalsize (min new totalsize)
455			in p ++ replicate (6 - length p) ' ' ++ amount
456		Nothing -> amount
457	rate = roughSize' memoryUnits True 0 bytespersecond ++ "/s"
458	bytespersecond
459		| duration == 0 = fromIntegral transferred
460		| otherwise = floor $ fromIntegral transferred / duration
461	transferred = max 0 (new - old)
462	duration = max 0 (now - before)
463	estimatedcompletion = case mtotalsize of
464		Just (TotalSize totalsize)
465			| bytespersecond > 0 ->
466				Just $ fromDuration $ Duration $
467					(totalsize - new) `div` bytespersecond
468		_ -> Nothing
469
470instance Proto.Serializable BytesProcessed where
471	serialize (BytesProcessed n) = show n
472	deserialize = BytesProcessed <$$> readish
473