1{- A pool of "git-annex transferrer" processes 2 - 3 - Copyright 2013-2021 Joey Hess <id@joeyh.name> 4 - 5 - Licensed under the GNU AGPL version 3 or higher. 6 -} 7 8{-# LANGUAGE OverloadedStrings #-} 9{-# LANGUAGE RankNTypes #-} 10{-# LANGUAGE CPP #-} 11 12module Annex.TransferrerPool where 13 14import Annex.Common 15import qualified Annex 16import Types.TransferrerPool 17import Types.Transferrer 18import Types.Transfer 19import qualified Types.Remote as Remote 20import Types.Messages 21import Types.CleanupActions 22import Messages.Serialized 23import Annex.Path 24import Annex.StallDetection 25import Utility.Batch 26import Utility.Metered 27import qualified Utility.SimpleProtocol as Proto 28 29import Control.Concurrent 30import Control.Concurrent.Async 31import Control.Concurrent.STM hiding (check) 32import Control.Monad.IO.Class (MonadIO) 33import qualified Data.Map as M 34#ifndef mingw32_HOST_OS 35import System.Posix.Signals 36import System.Posix.Process (getProcessGroupIDOf) 37#endif 38 39type SignalActionsVar = TVar (M.Map SignalAction (Int -> IO ())) 40 41data RunTransferrer = RunTransferrer String [CommandParam] BatchCommandMaker 42 43mkRunTransferrer :: BatchCommandMaker -> Annex RunTransferrer 44mkRunTransferrer batchmaker = RunTransferrer 45 <$> liftIO programPath 46 <*> gitAnnexChildProcessParams "transferrer" [] 47 <*> pure batchmaker 48 49{- Runs an action with a Transferrer from the pool. -} 50withTransferrer :: (Transferrer -> Annex a) -> Annex a 51withTransferrer a = do 52 rt <- mkRunTransferrer nonBatchCommandMaker 53 pool <- Annex.getRead Annex.transferrerpool 54 let nocheck = pure (pure True) 55 signalactonsvar <- Annex.getRead Annex.signalactions 56 withTransferrer' False signalactonsvar nocheck rt pool a 57 58withTransferrer' 59 :: (MonadIO m, MonadMask m) 60 => Bool 61 -- ^ When minimizeprocesses is True, only one Transferrer is left 62 -- running in the pool at a time. So if this needed to start a 63 -- new Transferrer, it's stopped when done. Otherwise, idle 64 -- processes are left in the pool for use later. 65 -> SignalActionsVar 66 -> MkCheckTransferrer 67 -> RunTransferrer 68 -> TransferrerPool 69 -> (Transferrer -> m a) 70 -> m a 71withTransferrer' minimizeprocesses signalactonsvar mkcheck rt pool a = do 72 (mi, leftinpool) <- liftIO $ atomically (popTransferrerPool pool) 73 (i@(TransferrerPoolItem _ check), t) <- liftIO $ case mi of 74 Nothing -> do 75 t <- mkTransferrer signalactonsvar rt 76 i <- mkTransferrerPoolItem mkcheck t 77 return (i, t) 78 Just i -> checkTransferrerPoolItem signalactonsvar rt i 79 a t `finally` returntopool leftinpool check t i 80 where 81 returntopool leftinpool check t i 82 | not minimizeprocesses || leftinpool == 0 = 83 -- If the transferrer got killed, the handles will 84 -- be closed, so it should not be returned to the 85 -- pool. 86 liftIO $ whenM (hIsOpen (transferrerWrite t)) $ 87 liftIO $ atomically $ pushTransferrerPool pool i 88 | otherwise = liftIO $ do 89 void $ forkIO $ transferrerShutdown t 90 atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check 91 92{- Check if a Transferrer from the pool is still ok to be used. 93 - If not, stop it and start a new one. -} 94checkTransferrerPoolItem :: SignalActionsVar -> RunTransferrer -> TransferrerPoolItem -> IO (TransferrerPoolItem, Transferrer) 95checkTransferrerPoolItem signalactonsvar rt i = case i of 96 TransferrerPoolItem (Just t) check -> ifM check 97 ( return (i, t) 98 , do 99 transferrerShutdown t 100 new check 101 ) 102 TransferrerPoolItem Nothing check -> new check 103 where 104 new check = do 105 t <- mkTransferrer signalactonsvar rt 106 return (TransferrerPoolItem (Just t) check, t) 107 108data TransferRequestLevel = AnnexLevel | AssistantLevel 109 deriving (Show) 110 111{- Requests that a Transferrer perform a Transfer, and waits for it to 112 - finish. 113 - 114 - When a stall is detected, kills the Transferrer. 115 - 116 - If the transfer failed or stalled, returns TransferInfo with an 117 - updated bytesComplete reflecting how much data has been transferred. 118 -} 119performTransfer 120 :: (Monad m, MonadIO m, MonadMask m) 121 => Maybe StallDetection 122 -> TransferRequestLevel 123 -> (forall a. Annex a -> m a) 124 -- ^ Run an annex action in the monad. Will not be used with 125 -- actions that block for a long time. 126 -> Maybe Remote 127 -> Transfer 128 -> TransferInfo 129 -> Transferrer 130 -> m (Either TransferInfo ()) 131performTransfer stalldetection level runannex r t info transferrer = do 132 bpv <- liftIO $ newTVarIO zeroBytesProcessed 133 ifM (catchBoolIO $ bracket setup cleanup (go bpv)) 134 ( return (Right ()) 135 , do 136 n <- liftIO $ atomically $ 137 fromBytesProcessed <$> readTVar bpv 138 return $ Left $ info { bytesComplete = Just n } 139 ) 140 where 141 setup = do 142 liftIO $ sendRequest level t r 143 (associatedFile info) 144 (transferrerWrite transferrer) 145 metervar <- liftIO $ newTVarIO Nothing 146 stalledvar <- liftIO $ newTVarIO False 147 tid <- liftIO $ async $ 148 detectStalls stalldetection metervar $ do 149 atomically $ writeTVar stalledvar True 150 killTransferrer transferrer 151 return (metervar, tid, stalledvar) 152 153 cleanup (_, tid, stalledvar) = do 154 liftIO $ uninterruptibleCancel tid 155 whenM (liftIO $ atomically $ readTVar stalledvar) $ do 156 runannex $ showLongNote "Transfer stalled" 157 -- Close handles, to prevent the transferrer being 158 -- reused since the process was killed. 159 liftIO $ hClose $ transferrerRead transferrer 160 liftIO $ hClose $ transferrerWrite transferrer 161 162 go bpv (metervar, _, _) = relaySerializedOutput 163 (liftIO $ readResponse (transferrerRead transferrer)) 164 (liftIO . sendSerializedOutputResponse (transferrerWrite transferrer)) 165 (updatemeter bpv metervar) 166 runannex 167 168 updatemeter bpv metervar (Just n) = liftIO $ do 169 atomically $ writeTVar metervar (Just n) 170 atomically $ writeTVar bpv n 171 updatemeter _bpv metervar Nothing = liftIO $ 172 atomically $ writeTVar metervar Nothing 173 174{- Starts a new git-annex transfer process, setting up handles 175 - that will be used to communicate with it. -} 176mkTransferrer :: SignalActionsVar -> RunTransferrer -> IO Transferrer 177mkTransferrer signalactonsvar (RunTransferrer program params batchmaker) = do 178 {- It runs as a batch job. -} 179 let (program', params') = batchmaker (program, params) 180 {- It's put into its own group so that the whole group can be 181 - killed to stop a transfer. -} 182 (Just writeh, Just readh, _, ph) <- createProcess 183 (proc program' $ toCommand params') 184 { create_group = True 185 , std_in = CreatePipe 186 , std_out = CreatePipe 187 } 188 189 {- Set up signal propagation, so eg ctrl-c will also interrupt 190 - the processes in the transferrer's process group. 191 - 192 - There is a race between the process being created and this point. 193 - If a signal is received before this can run, it is not sent to 194 - the transferrer. This leaves the transferrer waiting for the 195 - first message on stdin to tell what to do. If the signal kills 196 - this parent process, the transferrer will then get a sigpipe 197 - and die too. If the signal suspends this parent process, 198 - it's ok to leave the transferrer running, as it's waiting on 199 - the pipe until this process wakes back up. 200 -} 201#ifndef mingw32_HOST_OS 202 pid <- getPid ph 203 unregistersignalprop <- case pid of 204 Just p -> getProcessGroupIDOf p >>= \pgrp -> do 205 atomically $ modifyTVar' signalactonsvar $ 206 M.insert (PropagateSignalProcessGroup p) $ \sig -> 207 signalProcessGroup (fromIntegral sig) pgrp 208 return $ atomically $ modifyTVar' signalactonsvar $ 209 M.delete (PropagateSignalProcessGroup p) 210 Nothing -> return noop 211#else 212 let unregistersignalprop = noop 213#endif 214 215 return $ Transferrer 216 { transferrerRead = readh 217 , transferrerWrite = writeh 218 , transferrerHandle = ph 219 , transferrerShutdown = do 220 hClose readh 221 hClose writeh 222 void $ waitForProcess ph 223 unregistersignalprop 224 } 225 226-- | Send a request to perform a transfer. 227sendRequest :: TransferRequestLevel -> Transfer -> Maybe Remote -> AssociatedFile -> Handle -> IO () 228sendRequest level t mremote afile h = do 229 let tr = maybe 230 (TransferRemoteUUID (transferUUID t)) 231 (TransferRemoteName . Remote.name) 232 mremote 233 let f = case (level, transferDirection t) of 234 (AnnexLevel, Upload) -> UploadRequest 235 (AnnexLevel, Download) -> DownloadRequest 236 (AssistantLevel, Upload) -> AssistantUploadRequest 237 (AssistantLevel, Download) -> AssistantDownloadRequest 238 let r = f tr (transferKey t) (TransferAssociatedFile afile) 239 let l = unwords $ Proto.formatMessage r 240 debug "Annex.TransferrerPool" ("> " ++ l) 241 hPutStrLn h l 242 hFlush h 243 244sendSerializedOutputResponse :: Handle -> SerializedOutputResponse -> IO () 245sendSerializedOutputResponse h sor = do 246 let l = unwords $ Proto.formatMessage $ 247 TransferSerializedOutputResponse sor 248 debug "Annex.TransferrerPool" ("> " ++ show l) 249 hPutStrLn h l 250 hFlush h 251 252-- | Read a response to a transfer request. 253-- 254-- Before the final response, this will return whatever SerializedOutput 255-- should be displayed as the transfer is performed. 256readResponse :: Handle -> IO (Either SerializedOutput Bool) 257readResponse h = do 258 l <- liftIO $ hGetLine h 259 debug "Annex.TransferrerPool" ("< " ++ l) 260 case Proto.parseMessage l of 261 Just (TransferOutput so) -> return (Left so) 262 Just (TransferResult r) -> return (Right r) 263 Nothing -> transferrerProtocolError l 264 265transferrerProtocolError :: String -> a 266transferrerProtocolError l = giveup $ "transferrer protocol error: " ++ show l 267 268{- Kill the transferrer, and all its child processes. -} 269killTransferrer :: Transferrer -> IO () 270killTransferrer t = do 271 interruptProcessGroupOf $ transferrerHandle t 272 threadDelay 50000 -- 0.05 second grace period 273 terminateProcess $ transferrerHandle t 274 275{- Stop all transferrers in the pool. -} 276emptyTransferrerPool :: Annex () 277emptyTransferrerPool = do 278 poolvar <- Annex.getRead Annex.transferrerpool 279 pool <- liftIO $ atomically $ swapTVar poolvar [] 280 liftIO $ forM_ pool $ \case 281 TransferrerPoolItem (Just t) _ -> transferrerShutdown t 282 TransferrerPoolItem Nothing _ -> noop 283