1{- External remote protocol async extension. 2 - 3 - Copyright 2020 Joey Hess <id@joeyh.name> 4 - 5 - Licensed under the GNU AGPL version 3 or higher. 6 -} 7 8{-# LANGUAGE RankNTypes #-} 9{-# LANGUAGE BangPatterns #-} 10 11module Remote.External.AsyncExtension (runRelayToExternalAsync) where 12 13import Common 14import Annex 15import Messages 16import Remote.External.Types 17import qualified Utility.SimpleProtocol as Proto 18 19import Control.Concurrent.Async 20import Control.Concurrent.STM 21import Control.Concurrent.STM.TBMChan 22import qualified Data.Map.Strict as M 23 24-- | Starts a thread that will handle all communication with the external 25-- process. The input ExternalState communicates directly with the external 26-- process. 27runRelayToExternalAsync :: External -> ExternalState -> (Annex () -> IO ()) -> IO ExternalAsyncRelay 28runRelayToExternalAsync external st annexrunner = do 29 jidmap <- newTVarIO M.empty 30 sendq <- newSendQueue 31 nextjid <- newTVarIO (JobId 1) 32 sender <- async $ sendloop st sendq 33 receiver <- async $ receiveloop external st jidmap sendq sender annexrunner 34 return $ ExternalAsyncRelay $ do 35 receiveq <- newReceiveQueue 36 jid <- atomically $ do 37 jid@(JobId n) <- readTVar nextjid 38 let !jid' = JobId (succ n) 39 writeTVar nextjid jid' 40 modifyTVar' jidmap $ M.insert jid receiveq 41 return jid 42 return $ ExternalState 43 { externalSend = \msg -> 44 atomically $ writeTBMChan sendq 45 (toAsyncWrapped msg, jid) 46 , externalReceive = atomically (readTBMChan receiveq) 47 -- This shuts down the whole relay. 48 , externalShutdown = shutdown external st sendq sender receiver 49 -- These three TMVars are shared amoung all 50 -- ExternalStates that use this relay; they're 51 -- common state about the external process. 52 , externalPrepared = externalPrepared st 53 , externalConfig = externalConfig st 54 , externalConfigChanges = externalConfigChanges st 55 } 56 57type ReceiveQueue = TBMChan String 58 59type SendQueue = TBMChan (AsyncWrapped, JobId) 60 61type JidMap = TVar (M.Map JobId ReceiveQueue) 62 63newReceiveQueue :: IO ReceiveQueue 64newReceiveQueue = newTBMChanIO 10 65 66newSendQueue :: IO SendQueue 67newSendQueue = newTBMChanIO 10 68 69receiveloop :: External -> ExternalState -> JidMap -> SendQueue -> Async () -> (Annex () -> IO ()) -> IO () 70receiveloop external st jidmap sendq sendthread annexrunner = externalReceive st >>= \case 71 Just l -> case parseMessage l :: Maybe AsyncMessage of 72 Just (AsyncMessage jid msg) -> 73 M.lookup jid <$> readTVarIO jidmap >>= \case 74 Just c -> do 75 atomically $ writeTBMChan c msg 76 receiveloop external st jidmap sendq sendthread annexrunner 77 Nothing -> protoerr "unknown job number" 78 Nothing -> case parseMessage l :: Maybe ExceptionalMessage of 79 Just _ -> do 80 -- ERROR is relayed to all listeners 81 m <- readTVarIO jidmap 82 forM_ (M.elems m) $ \c -> 83 atomically $ writeTBMChan c l 84 receiveloop external st jidmap sendq sendthread annexrunner 85 Nothing -> protoerr "unexpected non-async message" 86 Nothing -> closeandshutdown 87 where 88 protoerr s = do 89 annexrunner $ warning $ "async external special remote protocol error: " ++ s 90 closeandshutdown 91 92 closeandshutdown = do 93 dummy <- async noop 94 shutdown external st sendq sendthread dummy True 95 m <- atomically $ readTVar jidmap 96 forM_ (M.elems m) (atomically . closeTBMChan) 97 98sendloop :: ExternalState -> SendQueue -> IO () 99sendloop st sendq = atomically (readTBMChan sendq) >>= \case 100 Just (wrappedmsg, jid) -> do 101 case wrappedmsg of 102 AsyncWrappedRemoteResponse msg -> 103 externalSend st $ wrapjid msg jid 104 AsyncWrappedRequest msg -> 105 externalSend st $ wrapjid msg jid 106 AsyncWrappedExceptionalMessage msg -> 107 externalSend st msg 108 AsyncWrappedAsyncMessage msg -> 109 externalSend st msg 110 sendloop st sendq 111 Nothing -> return () 112 where 113 wrapjid msg jid = AsyncMessage jid $ unwords $ Proto.formatMessage msg 114 115shutdown :: External -> ExternalState -> SendQueue -> Async () -> Async () -> Bool -> IO () 116shutdown external st sendq sendthread receivethread b = do 117 -- Receive thread is normally blocked reading from a handle. 118 -- That can block closing the handle, so it needs to be canceled. 119 cancel receivethread 120 -- Cleanly shutdown the send thread as well, allowing it to finish 121 -- writing anything that was buffered. 122 atomically $ closeTBMChan sendq 123 wait sendthread 124 r <- atomically $ do 125 r <- tryTakeTMVar (externalAsync external) 126 putTMVar (externalAsync external) 127 UncheckedExternalAsync 128 return r 129 case r of 130 Just (ExternalAsync _) -> externalShutdown st b 131 _ -> noop 132