1{- P2P protocol, IO implementation 2 - 3 - Copyright 2016-2018 Joey Hess <id@joeyh.name> 4 - 5 - Licensed under the GNU AGPL version 3 or higher. 6 -} 7 8{-# LANGUAGE RankNTypes, FlexibleContexts, OverloadedStrings, CPP #-} 9 10module P2P.IO 11 ( RunProto 12 , RunState(..) 13 , mkRunState 14 , P2PConnection(..) 15 , ConnIdent(..) 16 , ClosableConnection(..) 17 , stdioP2PConnection 18 , connectPeer 19 , closeConnection 20 , serveUnixSocket 21 , setupHandle 22 , ProtoFailure(..) 23 , describeProtoFailure 24 , runNetProto 25 , runNet 26 ) where 27 28import Common 29import P2P.Protocol 30import P2P.Address 31import Git 32import Git.Command 33import Utility.AuthToken 34import Utility.SimpleProtocol 35import Utility.Metered 36import Utility.Tor 37import Utility.FileMode 38import Utility.Debug 39import Types.UUID 40import Annex.ChangedRefs 41import qualified Utility.RawFilePath as R 42 43import Control.Monad.Free 44import Control.Monad.IO.Class 45import System.IO.Error 46import Network.Socket 47import Control.Concurrent 48import Control.Concurrent.Async 49import Control.Concurrent.STM 50import qualified Data.ByteString as B 51import qualified Data.ByteString.Lazy as L 52import qualified Network.Socket as S 53 54-- Type of interpreters of the Proto free monad. 55type RunProto m = forall a. Proto a -> m (Either ProtoFailure a) 56 57data ProtoFailure 58 = ProtoFailureMessage String 59 | ProtoFailureException SomeException 60 | ProtoFailureIOError IOError 61 62describeProtoFailure :: ProtoFailure -> String 63describeProtoFailure (ProtoFailureMessage s) = s 64describeProtoFailure (ProtoFailureException e) = show e 65describeProtoFailure (ProtoFailureIOError e) = show e 66 67data RunState 68 = Serving UUID (Maybe ChangedRefsHandle) (TVar ProtocolVersion) 69 | Client (TVar ProtocolVersion) 70 71mkRunState :: (TVar ProtocolVersion -> RunState) -> IO RunState 72mkRunState mk = do 73 tvar <- newTVarIO defaultProtocolVersion 74 return (mk tvar) 75 76data P2PConnection = P2PConnection 77 { connRepo :: Repo 78 , connCheckAuth :: (AuthToken -> Bool) 79 , connIhdl :: Handle 80 , connOhdl :: Handle 81 , connIdent :: ConnIdent 82 } 83 84-- Identifier for a connection, only used for debugging. 85newtype ConnIdent = ConnIdent (Maybe String) 86 87data ClosableConnection conn 88 = OpenConnection conn 89 | ClosedConnection 90 91-- P2PConnection using stdio. 92stdioP2PConnection :: Git.Repo -> P2PConnection 93stdioP2PConnection g = P2PConnection 94 { connRepo = g 95 , connCheckAuth = const False 96 , connIhdl = stdin 97 , connOhdl = stdout 98 , connIdent = ConnIdent Nothing 99 } 100 101-- Opens a connection to a peer. Does not authenticate with it. 102connectPeer :: Git.Repo -> P2PAddress -> IO P2PConnection 103connectPeer g (TorAnnex onionaddress onionport) = do 104 h <- setupHandle =<< connectHiddenService onionaddress onionport 105 return $ P2PConnection 106 { connRepo = g 107 , connCheckAuth = const False 108 , connIhdl = h 109 , connOhdl = h 110 , connIdent = ConnIdent Nothing 111 } 112 113closeConnection :: P2PConnection -> IO () 114closeConnection conn = do 115 hClose (connIhdl conn) 116 hClose (connOhdl conn) 117 118-- Serves the protocol on a unix socket. 119-- 120-- The callback is run to serve a connection, and is responsible for 121-- closing the Handle when done. 122-- 123-- Note that while the callback is running, other connections won't be 124-- processed, so longterm work should be run in a separate thread by 125-- the callback. 126serveUnixSocket :: FilePath -> (Handle -> IO ()) -> IO () 127serveUnixSocket unixsocket serveconn = do 128 removeWhenExistsWith R.removeLink (toRawFilePath unixsocket) 129 soc <- S.socket S.AF_UNIX S.Stream S.defaultProtocol 130 S.bind soc (S.SockAddrUnix unixsocket) 131 -- Allow everyone to read and write to the socket, 132 -- so a daemon like tor, that is probably running as a different 133 -- de sock $ addModes 134 -- user, can access it. 135 -- 136 -- Connections have to authenticate to do anything, 137 -- so it's fine that other local users can connect to the 138 -- socket. 139 modifyFileMode (toRawFilePath unixsocket) $ addModes 140 [groupReadMode, groupWriteMode, otherReadMode, otherWriteMode] 141 S.listen soc 2 142 forever $ do 143 (conn, _) <- S.accept soc 144 setupHandle conn >>= serveconn 145 146setupHandle :: Socket -> IO Handle 147setupHandle s = do 148 h <- socketToHandle s ReadWriteMode 149 hSetBuffering h LineBuffering 150 hSetBinaryMode h False 151 return h 152 153-- Purposefully incomplete interpreter of Proto. 154-- 155-- This only runs Net actions. No Local actions will be run 156-- (those need the Annex monad) -- if the interpreter reaches any, 157-- it returns Nothing. 158runNetProto :: RunState -> P2PConnection -> Proto a -> IO (Either ProtoFailure a) 159runNetProto runst conn = go 160 where 161 go :: RunProto IO 162 go (Pure v) = return (Right v) 163 go (Free (Net n)) = runNet runst conn go n 164 go (Free (Local _)) = return $ Left $ 165 ProtoFailureMessage "unexpected annex operation attempted" 166 167-- Interpreter of the Net part of Proto. 168-- 169-- An interpreter of Proto has to be provided, to handle the rest of Proto 170-- actions. 171runNet :: (MonadIO m, MonadMask m) => RunState -> P2PConnection -> RunProto m -> NetF (Proto a) -> m (Either ProtoFailure a) 172runNet runst conn runner f = case f of 173 SendMessage m next -> do 174 v <- liftIO $ tryNonAsync $ do 175 let l = unwords (formatMessage m) 176 debugMessage conn "P2P >" m 177 hPutStrLn (connOhdl conn) l 178 hFlush (connOhdl conn) 179 case v of 180 Left e -> return $ Left $ ProtoFailureException e 181 Right () -> runner next 182 ReceiveMessage next -> do 183 v <- liftIO $ tryIOError $ getProtocolLine (connIhdl conn) 184 case v of 185 Left e -> return $ Left $ ProtoFailureIOError e 186 Right Nothing -> return $ Left $ 187 ProtoFailureMessage "protocol error" 188 Right (Just l) -> case parseMessage l of 189 Just m -> do 190 liftIO $ debugMessage conn "P2P <" m 191 runner (next (Just m)) 192 Nothing -> runner (next Nothing) 193 SendBytes len b p next -> do 194 v <- liftIO $ tryNonAsync $ do 195 ok <- sendExactly len b (connOhdl conn) p 196 hFlush (connOhdl conn) 197 return ok 198 case v of 199 Right True -> runner next 200 Right False -> return $ Left $ 201 ProtoFailureMessage "short data write" 202 Left e -> return $ Left $ ProtoFailureException e 203 ReceiveBytes len p next -> do 204 v <- liftIO $ tryNonAsync $ receiveExactly len (connIhdl conn) p 205 case v of 206 Left e -> return $ Left $ ProtoFailureException e 207 Right b -> runner (next b) 208 CheckAuthToken _u t next -> do 209 let authed = connCheckAuth conn t 210 runner (next authed) 211 Relay hin hout next -> do 212 v <- liftIO $ runRelay runnerio hin hout 213 case v of 214 Left e -> return $ Left e 215 Right exitcode -> runner (next exitcode) 216 RelayService service next -> do 217 v <- liftIO $ runRelayService conn runnerio service 218 case v of 219 Left e -> return $ Left e 220 Right () -> runner next 221 SetProtocolVersion v next -> do 222 liftIO $ atomically $ writeTVar versiontvar v 223 runner next 224 GetProtocolVersion next -> 225 liftIO (readTVarIO versiontvar) >>= runner . next 226 where 227 -- This is only used for running Net actions when relaying, 228 -- so it's ok to use runNetProto, despite it not supporting 229 -- all Proto actions. 230 runnerio = runNetProto runst conn 231 versiontvar = case runst of 232 Serving _ _ tv -> tv 233 Client tv -> tv 234 235debugMessage :: P2PConnection -> String -> Message -> IO () 236debugMessage conn prefix m = do 237 tid <- myThreadId 238 debug "P2P.IO" $ concat $ catMaybes $ 239 [ (\ident -> "[" ++ ident ++ "] ") <$> mident 240 , Just $ "[" ++ show tid ++ "] " 241 , Just $ prefix ++ " " ++ unwords (formatMessage safem) 242 ] 243 where 244 safem = case m of 245 AUTH u _ -> AUTH u nullAuthToken 246 _ -> m 247 ConnIdent mident = connIdent conn 248 249-- Send exactly the specified number of bytes or returns False. 250-- 251-- The ByteString can be larger or smaller than the specified length. 252-- For example, it can be lazily streaming from a file that gets 253-- appended to, or truncated. 254-- 255-- Must avoid sending too many bytes as it would confuse the other end. 256-- This is easily dealt with by truncating it. 257-- 258-- If too few bytes are sent, the only option is to give up on this 259-- connection. False is returned to indicate this problem. 260sendExactly :: Len -> L.ByteString -> Handle -> MeterUpdate -> IO Bool 261sendExactly (Len n) b h p = do 262 sent <- meteredWrite' p (B.hPut h) (L.take (fromIntegral n) b) 263 return (fromBytesProcessed sent == n) 264 265receiveExactly :: Len -> Handle -> MeterUpdate -> IO L.ByteString 266receiveExactly (Len n) h p = hGetMetered h (Just n) p 267 268runRelay :: RunProto IO -> RelayHandle -> RelayHandle -> IO (Either ProtoFailure ExitCode) 269runRelay runner (RelayHandle hout) (RelayHandle hin) = 270 bracket setup cleanup go 271 `catchNonAsync` (return . Left . ProtoFailureException) 272 where 273 setup = do 274 v <- newEmptyMVar 275 t1 <- async $ relayFeeder runner v hin 276 t2 <- async $ relayReader v hout 277 return (v, t1, t2) 278 279 cleanup (_, t1, t2) = do 280 hClose hin 281 hClose hout 282 cancel t1 283 cancel t2 284 285 go (v, _, _) = relayHelper runner v 286 287runRelayService :: P2PConnection -> RunProto IO -> Service -> IO (Either ProtoFailure ()) 288runRelayService conn runner service = 289 withCreateProcess serviceproc' go 290 `catchNonAsync` (return . Left . ProtoFailureException) 291 where 292 cmd = case service of 293 UploadPack -> "upload-pack" 294 ReceivePack -> "receive-pack" 295 296 serviceproc = gitCreateProcess 297 [ Param cmd 298 , File (fromRawFilePath (repoPath (connRepo conn))) 299 ] (connRepo conn) 300 serviceproc' = serviceproc 301 { std_out = CreatePipe 302 , std_in = CreatePipe 303 } 304 305 go (Just hin) (Just hout) _ pid = do 306 v <- newEmptyMVar 307 r <- withAsync (relayFeeder runner v hin) $ \_ -> 308 withAsync (relayReader v hout) $ \_ -> 309 withAsync (waitexit v pid) $ \_ -> do 310 r <- runrelay v 311 hClose hin 312 hClose hout 313 return r 314 void $ waitForProcess pid 315 return r 316 go _ _ _ _ = error "internal" 317 318 runrelay v = relayHelper runner v >>= \case 319 Left e -> return $ Left e 320 Right exitcode -> runner $ 321 net $ relayToPeer (RelayDone exitcode) 322 323 waitexit v pid = putMVar v . RelayDone =<< waitForProcess pid 324 325-- Processes RelayData as it is put into the MVar. 326relayHelper :: RunProto IO -> MVar RelayData -> IO (Either ProtoFailure ExitCode) 327relayHelper runner v = loop 328 where 329 loop = do 330 d <- takeMVar v 331 case d of 332 RelayToPeer b -> do 333 r <- runner $ net $ relayToPeer (RelayToPeer b) 334 case r of 335 Left e -> return (Left e) 336 Right () -> loop 337 RelayDone exitcode -> do 338 _ <- runner $ net $ relayToPeer (RelayDone exitcode) 339 return (Right exitcode) 340 RelayFromPeer _ -> loop -- not handled here 341 342-- Takes input from the peer, and sends it to the relay process's stdin. 343-- Repeats until the peer tells it it's done or hangs up. 344relayFeeder :: RunProto IO -> MVar RelayData -> Handle -> IO () 345relayFeeder runner v hin = loop 346 where 347 loop = do 348 mrd <- runner $ net relayFromPeer 349 case mrd of 350 Left _e -> 351 putMVar v (RelayDone (ExitFailure 1)) 352 Right (RelayDone exitcode) -> 353 putMVar v (RelayDone exitcode) 354 Right (RelayFromPeer b) -> do 355 L.hPut hin b 356 hFlush hin 357 loop 358 Right (RelayToPeer _) -> loop -- not handled here 359 360-- Reads input from the Handle and puts it into the MVar for relaying to 361-- the peer. Continues until EOF on the Handle. 362relayReader :: MVar RelayData -> Handle -> IO () 363relayReader v hout = loop 364 where 365 loop = do 366 bs <- getsome [] 367 case bs of 368 [] -> return () 369 _ -> do 370 putMVar v $ RelayToPeer (L.fromChunks bs) 371 loop 372 373 -- Wait for the first available chunk. Then, without blocking, 374 -- try to get more chunks, in case a stream of chunks is being 375 -- written in close succession. 376 -- 377 -- On Windows, hGetNonBlocking is broken, so avoid using it there. 378 getsome [] = do 379 b <- B.hGetSome hout chunk 380 if B.null b 381 then return [] 382#ifndef mingw32_HOST_OS 383 else getsome [b] 384#else 385 else return [b] 386#endif 387 getsome bs = do 388 b <- B.hGetNonBlocking hout chunk 389 if B.null b 390 then return (reverse bs) 391 else getsome (b:bs) 392 393 chunk = 65536 394