1{- P2P protocol, Annex implementation
2 -
3 - Copyright 2016-2021 Joey Hess <id@joeyh.name>
4 -
5 - Licensed under the GNU AGPL version 3 or higher.
6 -}
7
8{-# LANGUAGE RankNTypes, FlexibleContexts #-}
9
10module P2P.Annex
11	( RunState(..)
12	, mkRunState
13	, P2PConnection(..)
14	, runFullProto
15	) where
16
17import Annex.Common
18import Annex.Content
19import Annex.Transfer
20import Annex.ChangedRefs
21import P2P.Protocol
22import P2P.IO
23import Logs.Location
24import Types.NumCopies
25import Utility.Metered
26import Annex.Verify
27
28import Control.Monad.Free
29import Control.Concurrent.STM
30import qualified Data.ByteString as S
31
32-- Full interpreter for Proto, that can receive and send objects.
33runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either ProtoFailure a)
34runFullProto runst conn = go
35  where
36	go :: RunProto Annex
37	go (Pure v) = return (Right v)
38	go (Free (Net n)) = runNet runst conn go n
39	go (Free (Local l)) = runLocal runst go l
40
41runLocal :: RunState -> RunProto Annex -> LocalF (Proto a) -> Annex (Either ProtoFailure a)
42runLocal runst runner a = case a of
43	TmpContentSize k next -> do
44		tmp <- fromRepo $ gitAnnexTmpObjectLocation k
45		size <- liftIO $ catchDefaultIO 0 $ getFileSize tmp
46		runner (next (Len size))
47	FileSize f next -> do
48		size <- liftIO $ catchDefaultIO 0 $ getFileSize (toRawFilePath f)
49		runner (next (Len size))
50	ContentSize k next -> do
51		let getsize = liftIO . catchMaybeIO . getFileSize
52		size <- inAnnex' isJust Nothing getsize k
53		runner (next (Len <$> size))
54	ReadContent k af o sender next -> do
55		let proceed c = do
56			r <- tryNonAsync c
57			case r of
58				Left e -> return $ Left $ ProtoFailureException e
59				Right (Left e) -> return $ Left e
60				Right (Right ok) -> runner (next ok)
61		-- If the content is not present, or the transfer doesn't
62		-- run for any other reason, the sender action still must
63		-- be run, so is given empty and Invalid data.
64		let fallback = runner (sender mempty (return Invalid))
65		v <- tryNonAsync $ prepSendAnnex k
66		case v of
67			Right (Just (f, checkchanged)) -> proceed $ do
68				-- alwaysUpload to allow multiple uploads of the same key.
69				let runtransfer ti = transfer alwaysUpload k af Nothing $ \p ->
70					sinkfile f o checkchanged sender p ti
71				checktransfer runtransfer fallback
72 			Right Nothing -> proceed fallback
73			Left e -> return $ Left $ ProtoFailureException e
74	StoreContent k af o l getb validitycheck next -> do
75		-- This is the same as the retrievalSecurityPolicy of
76		-- Remote.P2P and Remote.Git.
77		let rsp = RetrievalAllKeysSecure
78		v <- tryNonAsync $ do
79			iv <- startVerifyKeyContentIncrementally DefaultVerify k
80			let runtransfer ti =
81				Right <$> transfer download' k af Nothing (\p ->
82					logStatusAfter k $ getViaTmp rsp DefaultVerify k af $ \tmp ->
83						storefile (fromRawFilePath tmp) o l getb iv validitycheck p ti)
84			let fallback = return $ Left $
85				ProtoFailureMessage "transfer already in progress, or unable to take transfer lock"
86			checktransfer runtransfer fallback
87		case v of
88			Left e -> return $ Left $ ProtoFailureException e
89			Right (Left e) -> return $ Left e
90			Right (Right ok) -> runner (next ok)
91	StoreContentTo dest iv o l getb validitycheck next -> do
92		v <- tryNonAsync $ do
93			let runtransfer ti = Right
94				<$> storefile dest o l getb iv validitycheck nullMeterUpdate ti
95			let fallback = return $ Left $
96				ProtoFailureMessage "Transfer failed"
97			checktransfer runtransfer fallback
98		case v of
99			Left e -> return $ Left $ ProtoFailureException e
100			Right (Left e) -> return $ Left e
101			Right (Right ok) -> runner (next ok)
102	SetPresent k u next -> do
103		v <- tryNonAsync $ logChange k u InfoPresent
104		case v of
105			Left e -> return $ Left $ ProtoFailureException e
106			Right () -> runner next
107	CheckContentPresent k next -> do
108		v <- tryNonAsync $ inAnnex k
109		case v of
110			Left e -> return $ Left $ ProtoFailureException e
111			Right result -> runner (next result)
112	RemoveContent k next -> do
113		let cleanup = do
114			logStatus k InfoMissing
115			return True
116		v <- tryNonAsync $
117			ifM (Annex.Content.inAnnex k)
118				( lockContentForRemoval k cleanup $ \contentlock -> do
119					removeAnnex contentlock
120					cleanup
121				, return True
122				)
123		case v of
124			Left e -> return $ Left $ ProtoFailureException e
125			Right result -> runner (next result)
126	TryLockContent k protoaction next -> do
127		v <- tryNonAsync $ lockContentShared k $ \verifiedcopy ->
128			case verifiedcopy of
129				LockedCopy _ -> runner (protoaction True)
130				_ -> runner (protoaction False)
131		-- If locking fails, lockContentShared throws an exception.
132		-- Let the peer know it failed.
133		case v of
134			Left _ -> runner $ do
135				protoaction False
136				next
137			Right _ -> runner next
138	WaitRefChange next -> case runst of
139		Serving _ (Just h) _ -> do
140			v <- tryNonAsync $ liftIO $ waitChangedRefs h
141			case v of
142				Left e -> return $ Left $ ProtoFailureException e
143				Right changedrefs -> runner (next changedrefs)
144		_ -> return $ Left $
145			ProtoFailureMessage "change notification not available"
146	UpdateMeterTotalSize m sz next -> do
147		liftIO $ setMeterTotalSize m sz
148		runner next
149	RunValidityCheck checkaction next -> runner . next =<< checkaction
150  where
151	transfer mk k af sd ta = case runst of
152		-- Update transfer logs when serving.
153		-- Using noRetry because we're the sender.
154		Serving theiruuid _ _ ->
155			mk theiruuid k af sd noRetry ta noNotification
156		-- Transfer logs are updated higher in the stack when
157		-- a client.
158		Client _ -> ta nullMeterUpdate
159
160	resumefromoffset o incrementalverifier p h
161		| o /= 0 = do
162			p' <- case incrementalverifier of
163				Just iv -> do
164					go iv o
165					return p
166				_ -> return $ offsetMeterUpdate p (toBytesProcessed o)
167			-- Make sure the handle is seeked to the offset.
168			-- (Reading the file probably left it there
169			-- when that was done, but let's be sure.)
170			hSeek h AbsoluteSeek o
171			return p'
172		| otherwise = return p
173	  where
174		go iv n
175			| n == 0 = return ()
176			| otherwise = do
177				let c = if n > fromIntegral defaultChunkSize
178					then defaultChunkSize
179					else fromIntegral n
180				b <- S.hGet h c
181				updateIncremental iv b
182				unless (b == S.empty) $
183					go iv (n - fromIntegral (S.length b))
184
185	storefile dest (Offset o) (Len l) getb incrementalverifier validitycheck p ti = do
186		v <- runner getb
187		case v of
188			Right b -> do
189				liftIO $ withBinaryFile dest ReadWriteMode $ \h -> do
190					p' <- resumefromoffset o incrementalverifier p h
191					let writechunk = case incrementalverifier of
192						Nothing -> \c -> S.hPut h c
193						Just iv -> \c -> do
194							S.hPut h c
195							updateIncremental iv c
196					meteredWrite p' writechunk b
197				indicatetransferred ti
198
199				rightsize <- do
200					sz <- liftIO $ getFileSize (toRawFilePath dest)
201					return (toInteger sz == l + o)
202
203				runner validitycheck >>= \case
204					Right (Just Valid) -> case incrementalverifier of
205						Just iv
206							| rightsize -> liftIO (finalizeIncremental iv) >>= \case
207								Nothing -> return (True, UnVerified)
208								Just True -> return (True, Verified)
209								Just False -> return (False, UnVerified)
210							| otherwise -> return (False, UnVerified)
211						Nothing -> return (rightsize, UnVerified)
212					Right (Just Invalid) | l == 0 ->
213						-- Special case, for when
214						-- content was not
215						-- available to send,
216						-- which is indicated by
217						-- sending 0 bytes and
218						-- Invalid.
219						return (False, UnVerified)
220					_ -> do
221						-- Invalid, or old protocol
222						-- version. Validity is not
223						-- known. Force content
224						-- verification.
225						return (rightsize, MustVerify)
226			Left e -> error $ describeProtoFailure e
227
228	sinkfile f (Offset o) checkchanged sender p ti = bracket setup cleanup go
229	  where
230		setup = liftIO $ openBinaryFile f ReadMode
231		cleanup = liftIO . hClose
232		go h = do
233			let p' = offsetMeterUpdate p (toBytesProcessed o)
234			when (o /= 0) $
235				liftIO $ hSeek h AbsoluteSeek o
236			b <- liftIO $ hGetContentsMetered h p'
237
238			let validitycheck = local $ runValidityCheck $
239				checkchanged >>= return . \case
240					False -> Invalid
241					True -> Valid
242			r <- runner (sender b validitycheck)
243			indicatetransferred ti
244			return r
245
246	-- This allows using actions like download and viaTmp
247	-- that may abort a transfer, and clean up the protocol after them.
248	--
249	-- Runs an action that may make a transfer, passing a transfer
250	-- indicator. The action should call indicatetransferred on it,
251	-- only after it's actually sent/received the all data.
252	--
253	-- If the action ends without having called indicatetransferred,
254	-- runs the fallback action, which can close the protoocol
255	-- connection or otherwise clean up after the transfer not having
256	-- occurred.
257	--
258	-- If the action throws an exception, the fallback is not run.
259	checktransfer ta fallback = do
260		ti <- liftIO $ newTVarIO False
261		r <- ta ti
262		ifM (liftIO $ atomically $ readTVar ti)
263			( return r
264			, fallback
265			)
266
267	indicatetransferred ti = liftIO $ atomically $ writeTVar ti True
268