1{- git-annex concurrent state 2 - 3 - Copyright 2015-2021 Joey Hess <id@joeyh.name> 4 - 5 - Licensed under the GNU AGPL version 3 or higher. 6 -} 7 8module Annex.Concurrent ( 9 module Annex.Concurrent, 10 module Annex.Concurrent.Utility 11) where 12 13import Annex 14import Annex.Common 15import Annex.Concurrent.Utility 16import qualified Annex.Queue 17import Annex.Action 18import Types.Concurrency 19import Types.CatFileHandles 20import Annex.CheckAttr 21import Annex.CheckIgnore 22 23import qualified Data.Map as M 24 25setConcurrency :: ConcurrencySetting -> Annex () 26setConcurrency (ConcurrencyCmdLine s) = setConcurrency' s ConcurrencyCmdLine 27setConcurrency (ConcurrencyGitConfig s) = setConcurrency' s ConcurrencyGitConfig 28 29setConcurrency' :: Concurrency -> (Concurrency -> ConcurrencySetting) -> Annex () 30setConcurrency' NonConcurrent f = 31 Annex.changeState $ \s -> s 32 { Annex.concurrency = f NonConcurrent 33 } 34setConcurrency' c f = do 35 cfh <- getState Annex.catfilehandles 36 cfh' <- case cfh of 37 CatFileHandlesNonConcurrent _ -> liftIO catFileHandlesPool 38 CatFileHandlesPool _ -> pure cfh 39 cah <- mkConcurrentCheckAttrHandle c 40 cih <- mkConcurrentCheckIgnoreHandle c 41 Annex.changeState $ \s -> s 42 { Annex.concurrency = f c 43 , Annex.catfilehandles = cfh' 44 , Annex.checkattrhandle = Just cah 45 , Annex.checkignorehandle = Just cih 46 } 47 48{- Allows forking off a thread that uses a copy of the current AnnexState 49 - to run an Annex action. 50 - 51 - The returned IO action can be used to start the thread. 52 - It returns an Annex action that must be run in the original 53 - calling context to merge the forked AnnexState back into the 54 - current AnnexState. 55 -} 56forkState :: Annex a -> Annex (IO (Annex a)) 57forkState a = do 58 rd <- Annex.getRead id 59 st <- dupState 60 return $ do 61 (ret, (newst, _rd)) <- run (st, rd) a 62 return $ do 63 mergeState newst 64 return ret 65 66{- Returns a copy of the current AnnexState that is safe to be 67 - used when forking off a thread. 68 - 69 - After an Annex action is run using this AnnexState, it 70 - should be merged back into the current Annex's state, 71 - by calling mergeState. 72 -} 73dupState :: Annex AnnexState 74dupState = do 75 st <- Annex.getState id 76 -- Make sure that concurrency is enabled, if it was not already, 77 -- so the concurrency-safe resource pools are set up. 78 st' <- case getConcurrency' (Annex.concurrency st) of 79 NonConcurrent -> do 80 setConcurrency (ConcurrencyCmdLine (Concurrent 1)) 81 Annex.getState id 82 _ -> return st 83 return $ st' 84 -- each thread has its own repoqueue 85 { Annex.repoqueue = Nothing 86 -- no errors from this thread yet 87 , Annex.errcounter = 0 88 } 89 90{- Merges the passed AnnexState into the current Annex state. 91 - Also closes various handles in it. -} 92mergeState :: AnnexState -> Annex () 93mergeState st = do 94 rd <- Annex.getRead id 95 st' <- liftIO $ (fst . snd) 96 <$> run (st, rd) stopNonConcurrentSafeCoProcesses 97 forM_ (M.toList $ Annex.cleanupactions st') $ 98 uncurry addCleanupAction 99 Annex.Queue.mergeFrom st' 100 changeState $ \s -> s { errcounter = errcounter s + errcounter st' } 101