1{- P2P protocol, IO implementation
2 -
3 - Copyright 2016-2018 Joey Hess <id@joeyh.name>
4 -
5 - Licensed under the GNU AGPL version 3 or higher.
6 -}
7
8{-# LANGUAGE RankNTypes, FlexibleContexts, OverloadedStrings, CPP #-}
9
10module P2P.IO
11	( RunProto
12	, RunState(..)
13	, mkRunState
14	, P2PConnection(..)
15	, ConnIdent(..)
16	, ClosableConnection(..)
17	, stdioP2PConnection
18	, connectPeer
19	, closeConnection
20	, serveUnixSocket
21	, setupHandle
22	, ProtoFailure(..)
23	, describeProtoFailure
24	, runNetProto
25	, runNet
26	) where
27
28import Common
29import P2P.Protocol
30import P2P.Address
31import Git
32import Git.Command
33import Utility.AuthToken
34import Utility.SimpleProtocol
35import Utility.Metered
36import Utility.Tor
37import Utility.FileMode
38import Utility.Debug
39import Types.UUID
40import Annex.ChangedRefs
41import qualified Utility.RawFilePath as R
42
43import Control.Monad.Free
44import Control.Monad.IO.Class
45import System.IO.Error
46import Network.Socket
47import Control.Concurrent
48import Control.Concurrent.Async
49import Control.Concurrent.STM
50import qualified Data.ByteString as B
51import qualified Data.ByteString.Lazy as L
52import qualified Network.Socket as S
53
54-- Type of interpreters of the Proto free monad.
55type RunProto m = forall a. Proto a -> m (Either ProtoFailure a)
56
57data ProtoFailure
58	= ProtoFailureMessage String
59	| ProtoFailureException SomeException
60	| ProtoFailureIOError IOError
61
62describeProtoFailure :: ProtoFailure -> String
63describeProtoFailure (ProtoFailureMessage s) = s
64describeProtoFailure (ProtoFailureException e) = show e
65describeProtoFailure (ProtoFailureIOError e) = show e
66
67data RunState
68	= Serving UUID (Maybe ChangedRefsHandle) (TVar ProtocolVersion)
69	| Client (TVar ProtocolVersion)
70
71mkRunState :: (TVar ProtocolVersion -> RunState) -> IO RunState
72mkRunState mk = do
73	tvar <- newTVarIO defaultProtocolVersion
74	return (mk tvar)
75
76data P2PConnection = P2PConnection
77	{ connRepo :: Repo
78	, connCheckAuth :: (AuthToken -> Bool)
79	, connIhdl :: Handle
80	, connOhdl :: Handle
81	, connIdent :: ConnIdent
82	}
83
84-- Identifier for a connection, only used for debugging.
85newtype ConnIdent = ConnIdent (Maybe String)
86
87data ClosableConnection conn
88	= OpenConnection conn
89	| ClosedConnection
90
91-- P2PConnection using stdio.
92stdioP2PConnection :: Git.Repo -> P2PConnection
93stdioP2PConnection g = P2PConnection
94	{ connRepo = g
95	, connCheckAuth = const False
96	, connIhdl = stdin
97	, connOhdl = stdout
98	, connIdent = ConnIdent Nothing
99	}
100
101-- Opens a connection to a peer. Does not authenticate with it.
102connectPeer :: Git.Repo -> P2PAddress -> IO P2PConnection
103connectPeer g (TorAnnex onionaddress onionport) = do
104	h <- setupHandle =<< connectHiddenService onionaddress onionport
105	return $ P2PConnection
106		{ connRepo = g
107		, connCheckAuth = const False
108		, connIhdl = h
109		, connOhdl = h
110		, connIdent = ConnIdent Nothing
111		}
112
113closeConnection :: P2PConnection -> IO ()
114closeConnection conn = do
115	hClose (connIhdl conn)
116	hClose (connOhdl conn)
117
118-- Serves the protocol on a unix socket.
119--
120-- The callback is run to serve a connection, and is responsible for
121-- closing the Handle when done.
122--
123-- Note that while the callback is running, other connections won't be
124-- processed, so longterm work should be run in a separate thread by
125-- the callback.
126serveUnixSocket :: FilePath -> (Handle -> IO ()) -> IO ()
127serveUnixSocket unixsocket serveconn = do
128	removeWhenExistsWith R.removeLink (toRawFilePath unixsocket)
129	soc <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
130	S.bind soc (S.SockAddrUnix unixsocket)
131	-- Allow everyone to read and write to the socket,
132	-- so a daemon like tor, that is probably running as a different
133	-- de sock $ addModes
134	-- user, can access it.
135	--
136        -- Connections have to authenticate to do anything,
137        -- so it's fine that other local users can connect to the
138        -- socket.
139	modifyFileMode (toRawFilePath unixsocket) $ addModes
140		[groupReadMode, groupWriteMode, otherReadMode, otherWriteMode]
141	S.listen soc 2
142	forever $ do
143		(conn, _) <- S.accept soc
144		setupHandle conn >>= serveconn
145
146setupHandle :: Socket -> IO Handle
147setupHandle s = do
148	h <- socketToHandle s ReadWriteMode
149	hSetBuffering h LineBuffering
150	hSetBinaryMode h False
151	return h
152
153-- Purposefully incomplete interpreter of Proto.
154--
155-- This only runs Net actions. No Local actions will be run
156-- (those need the Annex monad) -- if the interpreter reaches any,
157-- it returns Nothing.
158runNetProto :: RunState -> P2PConnection -> Proto a -> IO (Either ProtoFailure a)
159runNetProto runst conn = go
160  where
161	go :: RunProto IO
162	go (Pure v) = return (Right v)
163	go (Free (Net n)) = runNet runst conn go n
164	go (Free (Local _)) = return $ Left $
165		ProtoFailureMessage "unexpected annex operation attempted"
166
167-- Interpreter of the Net part of Proto.
168--
169-- An interpreter of Proto has to be provided, to handle the rest of Proto
170-- actions.
171runNet :: (MonadIO m, MonadMask m) => RunState -> P2PConnection -> RunProto m -> NetF (Proto a) -> m (Either ProtoFailure a)
172runNet runst conn runner f = case f of
173	SendMessage m next -> do
174		v <- liftIO $ tryNonAsync $ do
175			let l = unwords (formatMessage m)
176			debugMessage conn "P2P >" m
177			hPutStrLn (connOhdl conn) l
178			hFlush (connOhdl conn)
179		case v of
180			Left e -> return $ Left $ ProtoFailureException e
181			Right () -> runner next
182	ReceiveMessage next -> do
183		v <- liftIO $ tryIOError $ getProtocolLine (connIhdl conn)
184		case v of
185			Left e -> return $ Left $ ProtoFailureIOError e
186			Right Nothing -> return $ Left $
187				ProtoFailureMessage "protocol error"
188			Right (Just l) -> case parseMessage l of
189				Just m -> do
190					liftIO $ debugMessage conn "P2P <" m
191					runner (next (Just m))
192				Nothing -> runner (next Nothing)
193	SendBytes len b p next -> do
194		v <- liftIO $ tryNonAsync $ do
195			ok <- sendExactly len b (connOhdl conn) p
196			hFlush (connOhdl conn)
197			return ok
198		case v of
199			Right True -> runner next
200			Right False -> return $ Left $
201				ProtoFailureMessage "short data write"
202			Left e -> return $ Left $ ProtoFailureException e
203	ReceiveBytes len p next -> do
204		v <- liftIO $ tryNonAsync $ receiveExactly len (connIhdl conn) p
205		case v of
206			Left e -> return $ Left $ ProtoFailureException e
207			Right b -> runner (next b)
208	CheckAuthToken _u t next -> do
209		let authed = connCheckAuth conn t
210		runner (next authed)
211	Relay hin hout next -> do
212		v <- liftIO $ runRelay runnerio hin hout
213		case v of
214			Left e -> return $ Left e
215			Right exitcode -> runner (next exitcode)
216	RelayService service next -> do
217		v <- liftIO $ runRelayService conn runnerio service
218		case v of
219			Left e -> return $ Left e
220			Right () -> runner next
221	SetProtocolVersion v next -> do
222		liftIO $ atomically $ writeTVar versiontvar v
223		runner next
224	GetProtocolVersion next ->
225		liftIO (readTVarIO versiontvar) >>= runner . next
226  where
227	-- This is only used for running Net actions when relaying,
228	-- so it's ok to use runNetProto, despite it not supporting
229	-- all Proto actions.
230	runnerio = runNetProto runst conn
231	versiontvar = case runst of
232		Serving _ _ tv -> tv
233		Client tv -> tv
234
235debugMessage :: P2PConnection -> String -> Message -> IO ()
236debugMessage conn prefix m = do
237	tid <- myThreadId
238	debug "P2P.IO" $ concat $ catMaybes $
239		[ (\ident -> "[" ++ ident ++ "] ") <$> mident
240		, Just $ "[" ++ show tid ++ "] "
241		, Just $ prefix ++ " " ++ unwords (formatMessage safem)
242		]
243  where
244	safem = case m of
245		AUTH u _ -> AUTH u nullAuthToken
246		_ -> m
247	ConnIdent mident = connIdent conn
248
249-- Send exactly the specified number of bytes or returns False.
250--
251-- The ByteString can be larger or smaller than the specified length.
252-- For example, it can be lazily streaming from a file that gets
253-- appended to, or truncated.
254--
255-- Must avoid sending too many bytes as it would confuse the other end.
256-- This is easily dealt with by truncating it.
257--
258-- If too few bytes are sent, the only option is to give up on this
259-- connection. False is returned to indicate this problem.
260sendExactly :: Len -> L.ByteString -> Handle -> MeterUpdate -> IO Bool
261sendExactly (Len n) b h p = do
262	sent <- meteredWrite' p (B.hPut h) (L.take (fromIntegral n) b)
263	return (fromBytesProcessed sent == n)
264
265receiveExactly :: Len -> Handle -> MeterUpdate -> IO L.ByteString
266receiveExactly (Len n) h p = hGetMetered h (Just n) p
267
268runRelay :: RunProto IO -> RelayHandle -> RelayHandle -> IO (Either ProtoFailure ExitCode)
269runRelay runner (RelayHandle hout) (RelayHandle hin) =
270	bracket setup cleanup go
271		`catchNonAsync` (return . Left . ProtoFailureException)
272  where
273	setup = do
274		v <- newEmptyMVar
275		t1 <- async $ relayFeeder runner v hin
276		t2 <- async $ relayReader v hout
277		return (v, t1, t2)
278
279	cleanup (_, t1, t2) = do
280		hClose hin
281		hClose hout
282		cancel t1
283		cancel t2
284
285	go (v, _, _) = relayHelper runner v
286
287runRelayService :: P2PConnection -> RunProto IO -> Service -> IO (Either ProtoFailure ())
288runRelayService conn runner service =
289	withCreateProcess serviceproc' go
290		`catchNonAsync` (return . Left . ProtoFailureException)
291  where
292	cmd = case service of
293		UploadPack -> "upload-pack"
294		ReceivePack -> "receive-pack"
295
296	serviceproc = gitCreateProcess
297		[ Param cmd
298		, File (fromRawFilePath (repoPath (connRepo conn)))
299		] (connRepo conn)
300	serviceproc' = serviceproc
301		{ std_out = CreatePipe
302		, std_in = CreatePipe
303		}
304
305	go (Just hin) (Just hout) _ pid = do
306		v <- newEmptyMVar
307		r <- withAsync (relayFeeder runner v hin) $ \_ ->
308			withAsync (relayReader v hout) $ \_ ->
309				withAsync (waitexit v pid) $ \_ -> do
310					r <- runrelay v
311					hClose hin
312					hClose hout
313					return r
314		void $ waitForProcess pid
315		return r
316	go _ _ _ _ = error "internal"
317
318	runrelay v = relayHelper runner v >>= \case
319		Left e -> return $ Left e
320		Right exitcode -> runner $
321			net $ relayToPeer (RelayDone exitcode)
322
323	waitexit v pid = putMVar v . RelayDone =<< waitForProcess pid
324
325-- Processes RelayData as it is put into the MVar.
326relayHelper :: RunProto IO -> MVar RelayData -> IO (Either ProtoFailure ExitCode)
327relayHelper runner v = loop
328  where
329	loop = do
330		d <- takeMVar v
331		case d of
332			RelayToPeer b -> do
333				r <- runner $ net $ relayToPeer (RelayToPeer b)
334				case r of
335					Left e -> return (Left e)
336					Right () -> loop
337			RelayDone exitcode -> do
338				_ <- runner $ net $ relayToPeer (RelayDone exitcode)
339				return (Right exitcode)
340			RelayFromPeer _ -> loop -- not handled here
341
342-- Takes input from the peer, and sends it to the relay process's stdin.
343-- Repeats until the peer tells it it's done or hangs up.
344relayFeeder :: RunProto IO -> MVar RelayData -> Handle -> IO ()
345relayFeeder runner v hin = loop
346  where
347	loop = do
348		mrd <- runner $ net relayFromPeer
349		case mrd of
350			Left _e ->
351				putMVar v (RelayDone (ExitFailure 1))
352			Right (RelayDone exitcode) ->
353				putMVar v (RelayDone exitcode)
354			Right (RelayFromPeer b) -> do
355				L.hPut hin b
356				hFlush hin
357				loop
358			Right (RelayToPeer _) -> loop -- not handled here
359
360-- Reads input from the Handle and puts it into the MVar for relaying to
361-- the peer. Continues until EOF on the Handle.
362relayReader :: MVar RelayData -> Handle -> IO ()
363relayReader v hout = loop
364  where
365	loop = do
366		bs <- getsome []
367		case bs of
368			[] -> return ()
369			_ -> do
370				putMVar v $ RelayToPeer (L.fromChunks bs)
371				loop
372
373	-- Wait for the first available chunk. Then, without blocking,
374	-- try to get more chunks, in case a stream of chunks is being
375	-- written in close succession.
376	--
377	-- On Windows, hGetNonBlocking is broken, so avoid using it there.
378	getsome [] = do
379		b <- B.hGetSome hout chunk
380		if B.null b
381			then return []
382#ifndef mingw32_HOST_OS
383			else getsome [b]
384#else
385			else return [b]
386#endif
387	getsome bs = do
388		b <- B.hGetNonBlocking hout chunk
389		if B.null b
390			then return (reverse bs)
391			else getsome (b:bs)
392
393	chunk = 65536
394