1{- git-annex concurrent state
2 -
3 - Copyright 2015-2021 Joey Hess <id@joeyh.name>
4 -
5 - Licensed under the GNU AGPL version 3 or higher.
6 -}
7
8module Annex.Concurrent (
9	module Annex.Concurrent,
10	module Annex.Concurrent.Utility
11) where
12
13import Annex
14import Annex.Common
15import Annex.Concurrent.Utility
16import qualified Annex.Queue
17import Annex.Action
18import Types.Concurrency
19import Types.CatFileHandles
20import Annex.CheckAttr
21import Annex.CheckIgnore
22
23import qualified Data.Map as M
24
25setConcurrency :: ConcurrencySetting -> Annex ()
26setConcurrency (ConcurrencyCmdLine s) = setConcurrency' s ConcurrencyCmdLine
27setConcurrency (ConcurrencyGitConfig s) = setConcurrency' s ConcurrencyGitConfig
28
29setConcurrency' :: Concurrency -> (Concurrency -> ConcurrencySetting) -> Annex ()
30setConcurrency' NonConcurrent f =
31	Annex.changeState $ \s -> s
32		{ Annex.concurrency = f NonConcurrent
33		}
34setConcurrency' c f = do
35	cfh <- getState Annex.catfilehandles
36	cfh' <- case cfh of
37		CatFileHandlesNonConcurrent _ -> liftIO catFileHandlesPool
38		CatFileHandlesPool _ -> pure cfh
39	cah <- mkConcurrentCheckAttrHandle c
40	cih <- mkConcurrentCheckIgnoreHandle c
41	Annex.changeState $ \s -> s
42		{ Annex.concurrency = f c
43		, Annex.catfilehandles = cfh'
44		, Annex.checkattrhandle = Just cah
45		, Annex.checkignorehandle = Just cih
46		}
47
48{- Allows forking off a thread that uses a copy of the current AnnexState
49 - to run an Annex action.
50 -
51 - The returned IO action can be used to start the thread.
52 - It returns an Annex action that must be run in the original
53 - calling context to merge the forked AnnexState back into the
54 - current AnnexState.
55 -}
56forkState :: Annex a -> Annex (IO (Annex a))
57forkState a = do
58	rd <- Annex.getRead id
59	st <- dupState
60	return $ do
61		(ret, (newst, _rd)) <- run (st, rd) a
62		return $ do
63			mergeState newst
64			return ret
65
66{- Returns a copy of the current AnnexState that is safe to be
67 - used when forking off a thread.
68 -
69 - After an Annex action is run using this AnnexState, it
70 - should be merged back into the current Annex's state,
71 - by calling mergeState.
72 -}
73dupState :: Annex AnnexState
74dupState = do
75	st <- Annex.getState id
76	-- Make sure that concurrency is enabled, if it was not already,
77	-- so the concurrency-safe resource pools are set up.
78	st' <- case getConcurrency' (Annex.concurrency st) of
79		NonConcurrent -> do
80			setConcurrency (ConcurrencyCmdLine (Concurrent 1))
81			Annex.getState id
82		_ -> return st
83	return $ st'
84		-- each thread has its own repoqueue
85		{ Annex.repoqueue = Nothing
86		-- no errors from this thread yet
87		, Annex.errcounter = 0
88		}
89
90{- Merges the passed AnnexState into the current Annex state.
91 - Also closes various handles in it. -}
92mergeState :: AnnexState -> Annex ()
93mergeState st = do
94	rd <- Annex.getRead id
95	st' <- liftIO $ (fst . snd)
96		<$> run (st, rd) stopNonConcurrentSafeCoProcesses
97	forM_ (M.toList $ Annex.cleanupactions st') $
98		uncurry addCleanupAction
99	Annex.Queue.mergeFrom st'
100	changeState $ \s -> s { errcounter = errcounter s + errcounter st' }
101