1{- A pool of "git-annex transferrer" processes
2 -
3 - Copyright 2013-2021 Joey Hess <id@joeyh.name>
4 -
5 - Licensed under the GNU AGPL version 3 or higher.
6 -}
7
8{-# LANGUAGE OverloadedStrings #-}
9{-# LANGUAGE RankNTypes #-}
10{-# LANGUAGE CPP #-}
11
12module Annex.TransferrerPool where
13
14import Annex.Common
15import qualified Annex
16import Types.TransferrerPool
17import Types.Transferrer
18import Types.Transfer
19import qualified Types.Remote as Remote
20import Types.Messages
21import Types.CleanupActions
22import Messages.Serialized
23import Annex.Path
24import Annex.StallDetection
25import Utility.Batch
26import Utility.Metered
27import qualified Utility.SimpleProtocol as Proto
28
29import Control.Concurrent
30import Control.Concurrent.Async
31import Control.Concurrent.STM hiding (check)
32import Control.Monad.IO.Class (MonadIO)
33import qualified Data.Map as M
34#ifndef mingw32_HOST_OS
35import System.Posix.Signals
36import System.Posix.Process (getProcessGroupIDOf)
37#endif
38
39type SignalActionsVar = TVar (M.Map SignalAction (Int -> IO ()))
40
41data RunTransferrer = RunTransferrer String [CommandParam] BatchCommandMaker
42
43mkRunTransferrer :: BatchCommandMaker -> Annex RunTransferrer
44mkRunTransferrer batchmaker = RunTransferrer
45	<$> liftIO programPath
46	<*> gitAnnexChildProcessParams "transferrer" []
47	<*> pure batchmaker
48
49{- Runs an action with a Transferrer from the pool. -}
50withTransferrer :: (Transferrer -> Annex a) -> Annex a
51withTransferrer a = do
52	rt <- mkRunTransferrer nonBatchCommandMaker
53	pool <- Annex.getRead Annex.transferrerpool
54	let nocheck = pure (pure True)
55	signalactonsvar <- Annex.getRead Annex.signalactions
56	withTransferrer' False signalactonsvar nocheck rt pool a
57
58withTransferrer'
59	:: (MonadIO m, MonadMask m)
60	=> Bool
61	-- ^ When minimizeprocesses is True, only one Transferrer is left
62	-- running in the pool at a time. So if this needed to start a
63	-- new Transferrer, it's stopped when done. Otherwise, idle
64	-- processes are left in the pool for use later.
65	-> SignalActionsVar
66	-> MkCheckTransferrer
67	-> RunTransferrer
68	-> TransferrerPool
69	-> (Transferrer -> m a)
70	-> m a
71withTransferrer' minimizeprocesses signalactonsvar mkcheck rt pool a = do
72	(mi, leftinpool) <- liftIO $ atomically (popTransferrerPool pool)
73	(i@(TransferrerPoolItem _ check), t) <- liftIO $ case mi of
74		Nothing -> do
75			t <- mkTransferrer signalactonsvar rt
76			i <- mkTransferrerPoolItem mkcheck t
77			return (i, t)
78		Just i -> checkTransferrerPoolItem signalactonsvar rt i
79	a t `finally` returntopool leftinpool check t i
80  where
81	returntopool leftinpool check t i
82		| not minimizeprocesses || leftinpool == 0 =
83			-- If the transferrer got killed, the handles will
84			-- be closed, so it should not be returned to the
85			-- pool.
86			liftIO $ whenM (hIsOpen (transferrerWrite t)) $
87				liftIO $ atomically $ pushTransferrerPool pool i
88		| otherwise = liftIO $ do
89			void $ forkIO $ transferrerShutdown t
90			atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check
91
92{- Check if a Transferrer from the pool is still ok to be used.
93 - If not, stop it and start a new one. -}
94checkTransferrerPoolItem :: SignalActionsVar -> RunTransferrer -> TransferrerPoolItem -> IO (TransferrerPoolItem, Transferrer)
95checkTransferrerPoolItem signalactonsvar rt i = case i of
96	TransferrerPoolItem (Just t) check -> ifM check
97		( return (i, t)
98		, do
99			transferrerShutdown t
100			new check
101		)
102	TransferrerPoolItem Nothing check -> new check
103  where
104	new check = do
105		t <- mkTransferrer signalactonsvar rt
106		return (TransferrerPoolItem (Just t) check, t)
107
108data TransferRequestLevel = AnnexLevel | AssistantLevel
109	deriving (Show)
110
111{- Requests that a Transferrer perform a Transfer, and waits for it to
112 - finish.
113 -
114 - When a stall is detected, kills the Transferrer.
115 -
116 - If the transfer failed or stalled, returns TransferInfo with an
117 - updated bytesComplete reflecting how much data has been transferred.
118 -}
119performTransfer
120	:: (Monad m, MonadIO m, MonadMask m)
121	=> Maybe StallDetection
122	-> TransferRequestLevel
123	-> (forall a. Annex a -> m a)
124	-- ^ Run an annex action in the monad. Will not be used with
125	-- actions that block for a long time.
126	-> Maybe Remote
127	-> Transfer
128	-> TransferInfo
129	-> Transferrer
130	-> m (Either TransferInfo ())
131performTransfer stalldetection level runannex r t info transferrer = do
132	bpv <- liftIO $ newTVarIO zeroBytesProcessed
133	ifM (catchBoolIO $ bracket setup cleanup (go bpv))
134		( return (Right ())
135		, do
136			n <- liftIO $ atomically $
137				fromBytesProcessed <$> readTVar bpv
138			return $ Left $ info { bytesComplete = Just n }
139		)
140  where
141	setup = do
142		liftIO $ sendRequest level t r
143			(associatedFile info)
144			(transferrerWrite transferrer)
145		metervar <- liftIO $ newTVarIO Nothing
146		stalledvar <- liftIO $ newTVarIO False
147		tid <- liftIO $ async $
148			detectStalls stalldetection metervar $ do
149				atomically $ writeTVar stalledvar True
150				killTransferrer transferrer
151		return (metervar, tid, stalledvar)
152
153	cleanup (_, tid, stalledvar) = do
154		liftIO $ uninterruptibleCancel tid
155		whenM (liftIO $ atomically $ readTVar stalledvar) $ do
156			runannex $ showLongNote "Transfer stalled"
157			-- Close handles, to prevent the transferrer being
158			-- reused since the process was killed.
159			liftIO $ hClose $ transferrerRead transferrer
160			liftIO $ hClose $ transferrerWrite transferrer
161
162	go bpv (metervar, _, _) = relaySerializedOutput
163		(liftIO $ readResponse (transferrerRead transferrer))
164		(liftIO . sendSerializedOutputResponse (transferrerWrite transferrer))
165		(updatemeter bpv metervar)
166		runannex
167
168	updatemeter bpv metervar (Just n) = liftIO $ do
169		atomically $ writeTVar metervar (Just n)
170		atomically $ writeTVar bpv n
171	updatemeter _bpv metervar Nothing = liftIO $
172		atomically $ writeTVar metervar Nothing
173
174{- Starts a new git-annex transfer process, setting up handles
175 - that will be used to communicate with it. -}
176mkTransferrer :: SignalActionsVar -> RunTransferrer -> IO Transferrer
177mkTransferrer signalactonsvar (RunTransferrer program params batchmaker) = do
178	{- It runs as a batch job. -}
179	let (program', params') = batchmaker (program, params)
180	{- It's put into its own group so that the whole group can be
181	 - killed to stop a transfer. -}
182	(Just writeh, Just readh, _, ph) <- createProcess
183		(proc program' $ toCommand params')
184		{ create_group = True
185		, std_in = CreatePipe
186		, std_out = CreatePipe
187		}
188
189	{- Set up signal propagation, so eg ctrl-c will also interrupt
190	 - the processes in the transferrer's process group.
191	 -
192	 - There is a race between the process being created and this point.
193	 - If a signal is received before this can run, it is not sent to
194	 - the transferrer. This leaves the transferrer waiting for the
195	 - first message on stdin to tell what to do. If the signal kills
196	 - this parent process, the transferrer will then get a sigpipe
197	 - and die too. If the signal suspends this parent process,
198	 - it's ok to leave the transferrer running, as it's waiting on
199	 - the pipe until this process wakes back up.
200	 -}
201#ifndef mingw32_HOST_OS
202	pid <- getPid ph
203	unregistersignalprop <- case pid of
204		Just p -> getProcessGroupIDOf p >>= \pgrp -> do
205			atomically $ modifyTVar' signalactonsvar $
206				M.insert (PropagateSignalProcessGroup p) $ \sig ->
207					signalProcessGroup (fromIntegral sig) pgrp
208			return $ atomically $ modifyTVar' signalactonsvar $
209				M.delete (PropagateSignalProcessGroup p)
210		Nothing -> return noop
211#else
212	let unregistersignalprop = noop
213#endif
214
215	return $ Transferrer
216		{ transferrerRead = readh
217		, transferrerWrite = writeh
218		, transferrerHandle = ph
219		, transferrerShutdown = do
220			hClose readh
221			hClose writeh
222			void $ waitForProcess ph
223			unregistersignalprop
224		}
225
226-- | Send a request to perform a transfer.
227sendRequest :: TransferRequestLevel -> Transfer -> Maybe Remote -> AssociatedFile -> Handle -> IO ()
228sendRequest level t mremote afile h = do
229	let tr = maybe
230		(TransferRemoteUUID (transferUUID t))
231		(TransferRemoteName . Remote.name)
232		mremote
233	let f = case (level, transferDirection t) of
234		(AnnexLevel, Upload) -> UploadRequest
235		(AnnexLevel, Download) -> DownloadRequest
236		(AssistantLevel, Upload) -> AssistantUploadRequest
237		(AssistantLevel, Download) -> AssistantDownloadRequest
238	let r = f tr (transferKey t) (TransferAssociatedFile afile)
239	let l = unwords $ Proto.formatMessage r
240	debug "Annex.TransferrerPool" ("> " ++ l)
241	hPutStrLn h l
242	hFlush h
243
244sendSerializedOutputResponse :: Handle -> SerializedOutputResponse -> IO ()
245sendSerializedOutputResponse h sor = do
246	let l = unwords $ Proto.formatMessage $
247		TransferSerializedOutputResponse sor
248	debug "Annex.TransferrerPool" ("> " ++ show l)
249	hPutStrLn h l
250	hFlush h
251
252-- | Read a response to a transfer request.
253--
254-- Before the final response, this will return whatever SerializedOutput
255-- should be displayed as the transfer is performed.
256readResponse :: Handle -> IO (Either SerializedOutput Bool)
257readResponse h = do
258	l <- liftIO $ hGetLine h
259	debug "Annex.TransferrerPool" ("< " ++ l)
260	case Proto.parseMessage l of
261		Just (TransferOutput so) -> return (Left so)
262		Just (TransferResult r) -> return (Right r)
263		Nothing -> transferrerProtocolError l
264
265transferrerProtocolError :: String -> a
266transferrerProtocolError l = giveup $ "transferrer protocol error: " ++ show l
267
268{- Kill the transferrer, and all its child processes. -}
269killTransferrer :: Transferrer -> IO ()
270killTransferrer t = do
271	interruptProcessGroupOf $ transferrerHandle t
272	threadDelay 50000 -- 0.05 second grace period
273	terminateProcess $ transferrerHandle t
274
275{- Stop all transferrers in the pool. -}
276emptyTransferrerPool :: Annex ()
277emptyTransferrerPool = do
278	poolvar <- Annex.getRead Annex.transferrerpool
279	pool <- liftIO $ atomically $ swapTVar poolvar []
280	liftIO $ forM_ pool $ \case
281		TransferrerPoolItem (Just t) _ -> transferrerShutdown t
282		TransferrerPoolItem Nothing _ -> noop
283