1{- Metered IO and actions 2 - 3 - Copyright 2012-2021 Joey Hess <id@joeyh.name> 4 - 5 - License: BSD-2-clause 6 -} 7 8{-# LANGUAGE TypeSynonymInstances, BangPatterns #-} 9 10module Utility.Metered ( 11 MeterUpdate, 12 MeterState(..), 13 nullMeterUpdate, 14 combineMeterUpdate, 15 TotalSize(..), 16 BytesProcessed(..), 17 toBytesProcessed, 18 fromBytesProcessed, 19 addBytesProcessed, 20 zeroBytesProcessed, 21 withMeteredFile, 22 meteredWrite, 23 meteredWrite', 24 meteredWriteFile, 25 offsetMeterUpdate, 26 hGetContentsMetered, 27 hGetMetered, 28 defaultChunkSize, 29 watchFileSize, 30 OutputHandler(..), 31 ProgressParser, 32 commandMeter, 33 commandMeter', 34 commandMeterExitCode, 35 commandMeterExitCode', 36 demeterCommand, 37 demeterCommandEnv, 38 avoidProgress, 39 rateLimitMeterUpdate, 40 Meter, 41 mkMeter, 42 setMeterTotalSize, 43 updateMeter, 44 displayMeterHandle, 45 clearMeterHandle, 46 bandwidthMeter, 47) where 48 49import Common 50import Utility.Percentage 51import Utility.DataUnits 52import Utility.HumanTime 53import Utility.SimpleProtocol as Proto 54 55import qualified Data.ByteString.Lazy as L 56import qualified Data.ByteString as S 57import System.IO.Unsafe 58import Foreign.Storable (Storable(sizeOf)) 59import System.Posix.Types 60import Data.Int 61import Control.Concurrent 62import Control.Concurrent.Async 63import Control.Monad.IO.Class (MonadIO) 64import Data.Time.Clock 65import Data.Time.Clock.POSIX 66 67{- An action that can be run repeatedly, updating it on the bytes processed. 68 - 69 - Note that each call receives the total number of bytes processed, so 70 - far, *not* an incremental amount since the last call. -} 71type MeterUpdate = (BytesProcessed -> IO ()) 72 73nullMeterUpdate :: MeterUpdate 74nullMeterUpdate _ = return () 75 76combineMeterUpdate :: MeterUpdate -> MeterUpdate -> MeterUpdate 77combineMeterUpdate a b = \n -> a n >> b n 78 79{- Total number of bytes processed so far. -} 80newtype BytesProcessed = BytesProcessed Integer 81 deriving (Eq, Ord, Show, Read) 82 83class AsBytesProcessed a where 84 toBytesProcessed :: a -> BytesProcessed 85 fromBytesProcessed :: BytesProcessed -> a 86 87instance AsBytesProcessed BytesProcessed where 88 toBytesProcessed = id 89 fromBytesProcessed = id 90 91instance AsBytesProcessed Integer where 92 toBytesProcessed i = BytesProcessed i 93 fromBytesProcessed (BytesProcessed i) = i 94 95instance AsBytesProcessed Int where 96 toBytesProcessed i = BytesProcessed $ toInteger i 97 fromBytesProcessed (BytesProcessed i) = fromInteger i 98 99instance AsBytesProcessed Int64 where 100 toBytesProcessed i = BytesProcessed $ toInteger i 101 fromBytesProcessed (BytesProcessed i) = fromInteger i 102 103instance AsBytesProcessed FileOffset where 104 toBytesProcessed sz = BytesProcessed $ toInteger sz 105 fromBytesProcessed (BytesProcessed sz) = fromInteger sz 106 107addBytesProcessed :: AsBytesProcessed v => BytesProcessed -> v -> BytesProcessed 108addBytesProcessed (BytesProcessed i) v = 109 let (BytesProcessed n) = toBytesProcessed v 110 in BytesProcessed $! i + n 111 112zeroBytesProcessed :: BytesProcessed 113zeroBytesProcessed = BytesProcessed 0 114 115{- Sends the content of a file to an action, updating the meter as it's 116 - consumed. -} 117withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a 118withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h -> 119 hGetContentsMetered h meterupdate >>= a 120 121{- Calls the action repeatedly with chunks from the lazy ByteString. 122 - Updates the meter after each chunk is processed. -} 123meteredWrite :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO () 124meteredWrite meterupdate a = void . meteredWrite' meterupdate a 125 126meteredWrite' :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO BytesProcessed 127meteredWrite' meterupdate a = go zeroBytesProcessed . L.toChunks 128 where 129 go sofar [] = return sofar 130 go sofar (c:cs) = do 131 a c 132 let !sofar' = addBytesProcessed sofar $ S.length c 133 meterupdate sofar' 134 go sofar' cs 135 136meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO () 137meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h -> 138 meteredWrite meterupdate (S.hPut h) b 139 140{- Applies an offset to a MeterUpdate. This can be useful when 141 - performing a sequence of actions, such as multiple meteredWriteFiles, 142 - that all update a common meter progressively. Or when resuming. 143 -} 144offsetMeterUpdate :: MeterUpdate -> BytesProcessed -> MeterUpdate 145offsetMeterUpdate base offset = \n -> base (offset `addBytesProcessed` n) 146 147{- This is like L.hGetContents, but after each chunk is read, a meter 148 - is updated based on the size of the chunk. 149 - 150 - All the usual caveats about using unsafeInterleaveIO apply to the 151 - meter updates, so use caution. 152 -} 153hGetContentsMetered :: Handle -> MeterUpdate -> IO L.ByteString 154hGetContentsMetered h = hGetMetered h Nothing 155 156{- Reads from the Handle, updating the meter after each chunk is read. 157 - 158 - Stops at EOF, or when the requested number of bytes have been read. 159 - Closes the Handle at EOF, but otherwise leaves it open. 160 - 161 - Note that the meter update is run in unsafeInterleaveIO, which means that 162 - it can be run at any time. It's even possible for updates to run out 163 - of order, as different parts of the ByteString are consumed. 164 -} 165hGetMetered :: Handle -> Maybe Integer -> MeterUpdate -> IO L.ByteString 166hGetMetered h wantsize meterupdate = lazyRead zeroBytesProcessed 167 where 168 lazyRead sofar = unsafeInterleaveIO $ loop sofar 169 170 loop sofar = do 171 c <- S.hGet h (nextchunksize (fromBytesProcessed sofar)) 172 if S.null c 173 then do 174 when (wantsize /= Just 0) $ 175 hClose h 176 return L.empty 177 else do 178 let !sofar' = addBytesProcessed sofar (S.length c) 179 meterupdate sofar' 180 if keepgoing (fromBytesProcessed sofar') 181 then do 182 {- unsafeInterleaveIO causes this to be 183 - deferred until the data is read from the 184 - ByteString. -} 185 cs <- lazyRead sofar' 186 return $ L.append (L.fromChunks [c]) cs 187 else return $ L.fromChunks [c] 188 189 keepgoing n = case wantsize of 190 Nothing -> True 191 Just sz -> n < sz 192 193 nextchunksize n = case wantsize of 194 Nothing -> defaultChunkSize 195 Just sz -> 196 let togo = sz - n 197 in if togo < toInteger defaultChunkSize 198 then fromIntegral togo 199 else defaultChunkSize 200 201{- Same default chunk size Lazy ByteStrings use. -} 202defaultChunkSize :: Int 203defaultChunkSize = 32 * k - chunkOverhead 204 where 205 k = 1024 206 chunkOverhead = 2 * sizeOf (1 :: Int) -- GHC specific 207 208{- Runs an action, watching a file as it grows and updating the meter. 209 - 210 - The file may already exist, and the action could throw the original file 211 - away and start over. To avoid reporting the original file size followed 212 - by a smaller size in that case, wait until the file starts growing 213 - before updating the meter for the first time. 214 -} 215watchFileSize :: (MonadIO m, MonadMask m) => FilePath -> MeterUpdate -> m a -> m a 216watchFileSize f p a = bracket 217 (liftIO $ forkIO $ watcher =<< getsz) 218 (liftIO . void . tryIO . killThread) 219 (const a) 220 where 221 watcher oldsz = do 222 threadDelay 500000 -- 0.5 seconds 223 sz <- getsz 224 when (sz > oldsz) $ 225 p sz 226 watcher sz 227 getsz = catchDefaultIO zeroBytesProcessed $ 228 toBytesProcessed <$> getFileSize f' 229 f' = toRawFilePath f 230 231data OutputHandler = OutputHandler 232 { quietMode :: Bool 233 , stderrHandler :: String -> IO () 234 } 235 236{- Parses the String looking for a command's progress output, and returns 237 - Maybe the number of bytes done so far, optionally a total size, 238 - and any any remainder of the string that could be an incomplete 239 - progress output. That remainder should be prepended to future output, 240 - and fed back in. This interface allows the command's output to be read 241 - in any desired size chunk, or even one character at a time. 242 -} 243type ProgressParser = String -> (Maybe BytesProcessed, Maybe TotalSize, String) 244 245newtype TotalSize = TotalSize Integer 246 deriving (Show, Eq) 247 248{- Runs a command and runs a ProgressParser on its output, in order 249 - to update a meter. 250 - 251 - If the Meter is provided, the ProgressParser can report the total size, 252 - which allows creating a Meter before the size is known. 253 -} 254commandMeter :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO Bool 255commandMeter progressparser oh meter meterupdate cmd params = 256 commandMeter' progressparser oh meter meterupdate cmd params id 257 258commandMeter' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO Bool 259commandMeter' progressparser oh meter meterupdate cmd params mkprocess = do 260 ret <- commandMeterExitCode' progressparser oh meter meterupdate cmd params mkprocess 261 return $ case ret of 262 Just ExitSuccess -> True 263 _ -> False 264 265commandMeterExitCode :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO (Maybe ExitCode) 266commandMeterExitCode progressparser oh meter meterupdate cmd params = 267 commandMeterExitCode' progressparser oh meter meterupdate cmd params id 268 269commandMeterExitCode' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO (Maybe ExitCode) 270commandMeterExitCode' progressparser oh mmeter meterupdate cmd params mkprocess = 271 outputFilter cmd params mkprocess Nothing 272 (const $ feedprogress mmeter zeroBytesProcessed []) 273 handlestderr 274 where 275 feedprogress sendtotalsize prev buf h = do 276 b <- S.hGetSome h 80 277 if S.null b 278 then return () 279 else do 280 unless (quietMode oh) $ do 281 S.hPut stdout b 282 hFlush stdout 283 let s = decodeBS b 284 let (mbytes, mtotalsize, buf') = progressparser (buf++s) 285 sendtotalsize' <- case (sendtotalsize, mtotalsize) of 286 (Just meter, Just t) -> do 287 setMeterTotalSize meter t 288 return Nothing 289 _ -> return sendtotalsize 290 case mbytes of 291 Nothing -> feedprogress sendtotalsize' prev buf' h 292 (Just bytes) -> do 293 when (bytes /= prev) $ 294 meterupdate bytes 295 feedprogress sendtotalsize' bytes buf' h 296 297 handlestderr ph h = hGetLineUntilExitOrEOF ph h >>= \case 298 Just l -> do 299 stderrHandler oh l 300 handlestderr ph h 301 Nothing -> return () 302 303{- Runs a command, that may display one or more progress meters on 304 - either stdout or stderr, and prevents the meters from being displayed. 305 - 306 - The other command output is handled as configured by the OutputHandler. 307 -} 308demeterCommand :: OutputHandler -> FilePath -> [CommandParam] -> IO Bool 309demeterCommand oh cmd params = demeterCommandEnv oh cmd params Nothing 310 311demeterCommandEnv :: OutputHandler -> FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO Bool 312demeterCommandEnv oh cmd params environ = do 313 ret <- outputFilter cmd params id environ 314 (\ph outh -> avoidProgress True ph outh stdouthandler) 315 (\ph errh -> avoidProgress True ph errh $ stderrHandler oh) 316 return $ case ret of 317 Just ExitSuccess -> True 318 _ -> False 319 where 320 stdouthandler l = 321 unless (quietMode oh) $ 322 putStrLn l 323 324{- To suppress progress output, while displaying other messages, 325 - filter out lines that contain \r (typically used to reset to the 326 - beginning of the line when updating a progress display). 327 -} 328avoidProgress :: Bool -> ProcessHandle -> Handle -> (String -> IO ()) -> IO () 329avoidProgress doavoid ph h emitter = hGetLineUntilExitOrEOF ph h >>= \case 330 Just s -> do 331 unless (doavoid && '\r' `elem` s) $ 332 emitter s 333 avoidProgress doavoid ph h emitter 334 Nothing -> return () 335 336outputFilter 337 :: FilePath 338 -> [CommandParam] 339 -> (CreateProcess -> CreateProcess) 340 -> Maybe [(String, String)] 341 -> (ProcessHandle -> Handle -> IO ()) 342 -> (ProcessHandle -> Handle -> IO ()) 343 -> IO (Maybe ExitCode) 344outputFilter cmd params mkprocess environ outfilter errfilter = 345 catchMaybeIO $ withCreateProcess p go 346 where 347 go _ (Just outh) (Just errh) ph = do 348 outt <- async $ tryIO (outfilter ph outh) >> hClose outh 349 errt <- async $ tryIO (errfilter ph errh) >> hClose errh 350 ret <- waitForProcess ph 351 wait outt 352 wait errt 353 return ret 354 go _ _ _ _ = error "internal" 355 356 p = mkprocess (proc cmd (toCommand params)) 357 { env = environ 358 , std_out = CreatePipe 359 , std_err = CreatePipe 360 } 361 362-- | Limit a meter to only update once per unit of time. 363-- 364-- It's nice to display the final update to 100%, even if it comes soon 365-- after a previous update. To make that happen, the Meter has to know 366-- its total size. 367rateLimitMeterUpdate :: NominalDiffTime -> Meter -> MeterUpdate -> IO MeterUpdate 368rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do 369 lastupdate <- newMVar (toEnum 0 :: POSIXTime) 370 return $ mu lastupdate 371 where 372 mu lastupdate n@(BytesProcessed i) = readMVar totalsizev >>= \case 373 Just (TotalSize t) | i >= t -> meterupdate n 374 _ -> do 375 now <- getPOSIXTime 376 prev <- takeMVar lastupdate 377 if now - prev >= delta 378 then do 379 putMVar lastupdate now 380 meterupdate n 381 else putMVar lastupdate prev 382 383data Meter = Meter (MVar (Maybe TotalSize)) (MVar MeterState) (MVar String) DisplayMeter 384 385data MeterState = MeterState 386 { meterBytesProcessed :: BytesProcessed 387 , meterTimeStamp :: POSIXTime 388 } deriving (Show) 389 390type DisplayMeter = MVar String -> Maybe TotalSize -> MeterState -> MeterState -> IO () 391 392type RenderMeter = Maybe TotalSize -> MeterState -> MeterState -> String 393 394-- | Make a meter. Pass the total size, if it's known. 395mkMeter :: Maybe TotalSize -> DisplayMeter -> IO Meter 396mkMeter totalsize displaymeter = do 397 ts <- getPOSIXTime 398 Meter 399 <$> newMVar totalsize 400 <*> newMVar (MeterState zeroBytesProcessed ts) 401 <*> newMVar "" 402 <*> pure displaymeter 403 404setMeterTotalSize :: Meter -> TotalSize -> IO () 405setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just 406 407-- | Updates the meter, displaying it if necessary. 408updateMeter :: Meter -> MeterUpdate 409updateMeter (Meter totalsizev sv bv displaymeter) new = do 410 now <- getPOSIXTime 411 let curms = MeterState new now 412 oldms <- swapMVar sv curms 413 when (meterBytesProcessed oldms /= new) $ do 414 totalsize <- readMVar totalsizev 415 displaymeter bv totalsize oldms curms 416 417-- | Display meter to a Handle. 418displayMeterHandle :: Handle -> RenderMeter -> DisplayMeter 419displayMeterHandle h rendermeter v msize old new = do 420 let s = rendermeter msize old new 421 olds <- swapMVar v s 422 -- Avoid writing when the rendered meter has not changed. 423 when (olds /= s) $ do 424 let padding = replicate (length olds - length s) ' ' 425 hPutStr h ('\r':s ++ padding) 426 hFlush h 427 428-- | Clear meter displayed by displayMeterHandle. May be called before 429-- outputting something else, followed by more calls to displayMeterHandle. 430clearMeterHandle :: Meter -> Handle -> IO () 431clearMeterHandle (Meter _ _ v _) h = do 432 olds <- readMVar v 433 hPutStr h $ '\r' : replicate (length olds) ' ' ++ "\r" 434 hFlush h 435 436-- | Display meter in the form: 437-- 10% 1.3MiB 300 KiB/s 16m40s 438-- or when total size is not known: 439-- 1.3 MiB 300 KiB/s 440bandwidthMeter :: RenderMeter 441bandwidthMeter mtotalsize (MeterState (BytesProcessed old) before) (MeterState (BytesProcessed new) now) = 442 unwords $ catMaybes 443 [ Just percentamount 444 -- Pad enough for max width: "100% xxxx.xx KiB xxxx KiB/s" 445 , Just $ replicate (29 - length percentamount - length rate) ' ' 446 , Just rate 447 , estimatedcompletion 448 ] 449 where 450 amount = roughSize' memoryUnits True 2 new 451 percentamount = case mtotalsize of 452 Just (TotalSize totalsize) -> 453 let p = showPercentage 0 $ 454 percentage totalsize (min new totalsize) 455 in p ++ replicate (6 - length p) ' ' ++ amount 456 Nothing -> amount 457 rate = roughSize' memoryUnits True 0 bytespersecond ++ "/s" 458 bytespersecond 459 | duration == 0 = fromIntegral transferred 460 | otherwise = floor $ fromIntegral transferred / duration 461 transferred = max 0 (new - old) 462 duration = max 0 (now - before) 463 estimatedcompletion = case mtotalsize of 464 Just (TotalSize totalsize) 465 | bytespersecond > 0 -> 466 Just $ fromDuration $ Duration $ 467 (totalsize - new) `div` bytespersecond 468 _ -> Nothing 469 470instance Proto.Serializable BytesProcessed where 471 serialize (BytesProcessed n) = show n 472 deserialize = BytesProcessed <$$> readish 473