1{- External remote protocol async extension.
2 -
3 - Copyright 2020 Joey Hess <id@joeyh.name>
4 -
5 - Licensed under the GNU AGPL version 3 or higher.
6 -}
7
8{-# LANGUAGE RankNTypes #-}
9{-# LANGUAGE BangPatterns #-}
10
11module Remote.External.AsyncExtension (runRelayToExternalAsync) where
12
13import Common
14import Annex
15import Messages
16import Remote.External.Types
17import qualified Utility.SimpleProtocol as Proto
18
19import Control.Concurrent.Async
20import Control.Concurrent.STM
21import Control.Concurrent.STM.TBMChan
22import qualified Data.Map.Strict as M
23
24-- | Starts a thread that will handle all communication with the external
25-- process. The input ExternalState communicates directly with the external
26-- process.
27runRelayToExternalAsync :: External -> ExternalState -> (Annex () -> IO ()) -> IO ExternalAsyncRelay
28runRelayToExternalAsync external st annexrunner = do
29	jidmap <- newTVarIO M.empty
30	sendq <- newSendQueue
31	nextjid <- newTVarIO (JobId 1)
32	sender <- async $ sendloop st sendq
33	receiver <- async $ receiveloop external st jidmap sendq sender annexrunner
34	return $ ExternalAsyncRelay $ do
35		receiveq <- newReceiveQueue
36		jid <- atomically $ do
37			jid@(JobId n) <- readTVar nextjid
38			let !jid' = JobId (succ n)
39			writeTVar nextjid jid'
40			modifyTVar' jidmap $ M.insert jid receiveq
41			return jid
42		return $ ExternalState
43			{ externalSend = \msg ->
44				atomically $ writeTBMChan sendq
45					(toAsyncWrapped msg, jid)
46			, externalReceive = atomically (readTBMChan receiveq)
47			-- This shuts down the whole relay.
48			, externalShutdown = shutdown external st sendq sender receiver
49			-- These three TMVars are shared amoung all
50			-- ExternalStates that use this relay; they're
51			-- common state about the external process.
52			, externalPrepared = externalPrepared st
53			, externalConfig = externalConfig st
54			, externalConfigChanges = externalConfigChanges st
55			}
56
57type ReceiveQueue = TBMChan String
58
59type SendQueue = TBMChan (AsyncWrapped, JobId)
60
61type JidMap = TVar (M.Map JobId ReceiveQueue)
62
63newReceiveQueue :: IO ReceiveQueue
64newReceiveQueue = newTBMChanIO 10
65
66newSendQueue :: IO SendQueue
67newSendQueue = newTBMChanIO 10
68
69receiveloop :: External -> ExternalState -> JidMap -> SendQueue -> Async () -> (Annex () -> IO ()) -> IO ()
70receiveloop external st jidmap sendq sendthread annexrunner = externalReceive st >>= \case
71	Just l -> case parseMessage l :: Maybe AsyncMessage of
72		Just (AsyncMessage jid msg) ->
73			M.lookup jid <$> readTVarIO jidmap >>= \case
74				Just c -> do
75					atomically $ writeTBMChan c msg
76					receiveloop external st jidmap sendq sendthread annexrunner
77				Nothing -> protoerr "unknown job number"
78		Nothing -> case parseMessage l :: Maybe ExceptionalMessage of
79			Just _ -> do
80				-- ERROR is relayed to all listeners
81				m <- readTVarIO jidmap
82				forM_ (M.elems m) $ \c ->
83					atomically  $ writeTBMChan c l
84				receiveloop external st jidmap sendq sendthread annexrunner
85			Nothing -> protoerr "unexpected non-async message"
86	Nothing -> closeandshutdown
87  where
88	protoerr s = do
89		annexrunner $ warning $ "async external special remote protocol error: " ++ s
90		closeandshutdown
91
92	closeandshutdown = do
93		dummy <- async noop
94		shutdown external st sendq sendthread dummy True
95		m <- atomically $ readTVar jidmap
96		forM_ (M.elems m) (atomically . closeTBMChan)
97
98sendloop :: ExternalState -> SendQueue -> IO ()
99sendloop st sendq = atomically (readTBMChan sendq) >>= \case
100	Just (wrappedmsg, jid) -> do
101		case wrappedmsg of
102			AsyncWrappedRemoteResponse msg ->
103				externalSend st $ wrapjid msg jid
104			AsyncWrappedRequest msg ->
105				externalSend st $ wrapjid msg jid
106			AsyncWrappedExceptionalMessage msg ->
107				externalSend st msg
108			AsyncWrappedAsyncMessage msg ->
109				externalSend st msg
110		sendloop st sendq
111	Nothing -> return ()
112  where
113	wrapjid msg jid = AsyncMessage jid $ unwords $ Proto.formatMessage msg
114
115shutdown :: External -> ExternalState -> SendQueue -> Async () -> Async () -> Bool -> IO ()
116shutdown external st sendq sendthread receivethread b = do
117	-- Receive thread is normally blocked reading from a handle.
118	-- That can block closing the handle, so it needs to be canceled.
119	cancel receivethread
120	-- Cleanly shutdown the send thread as well, allowing it to finish
121	-- writing anything that was buffered.
122	atomically $ closeTBMChan sendq
123	wait sendthread
124	r <- atomically $ do
125		r <- tryTakeTMVar (externalAsync external)
126		putTMVar (externalAsync external)
127			UncheckedExternalAsync
128		return r
129	case r of
130		Just (ExternalAsync _) -> externalShutdown st b
131		_ -> noop
132