1{-# LANGUAGE NamedFieldPuns #-} 2 3module Network.HTTP2.Arch.Context where 4 5import Control.Concurrent.STM 6import Data.IORef 7import Network.HTTP.Types (Method) 8 9import Imports hiding (insert) 10import Network.HPACK 11import Network.HTTP2.Arch.Cache (Cache, emptyCache) 12import qualified Network.HTTP2.Arch.Cache as Cache 13import Network.HTTP2.Arch.Rate 14import Network.HTTP2.Arch.Stream 15import Network.HTTP2.Arch.Types 16import Network.HTTP2.Frame 17 18data Role = Client | Server deriving (Eq,Show) 19 20---------------------------------------------------------------- 21 22data RoleInfo = ServerInfo { 23 inputQ :: TQueue (Input Stream) 24 } 25 | ClientInfo { 26 scheme :: ByteString 27 , authority :: ByteString 28 , cache :: IORef (Cache (Method,ByteString) Stream) 29 } 30 31newServerInfo :: IO RoleInfo 32newServerInfo = ServerInfo <$> newTQueueIO 33 34newClientInfo :: ByteString -> ByteString -> Int -> IO RoleInfo 35newClientInfo scm auth lim = ClientInfo scm auth <$> newIORef (emptyCache lim) 36 37insertCache :: Method -> ByteString -> Stream -> RoleInfo -> IO () 38insertCache m path v (ClientInfo _ _ ref) = atomicModifyIORef' ref $ \c -> 39 (Cache.insert (m,path) v c, ()) 40insertCache _ _ _ _ = error "insertCache" 41 42lookupCache :: Method -> ByteString -> RoleInfo -> IO (Maybe Stream) 43lookupCache m path (ClientInfo _ _ ref) = Cache.lookup (m,path) <$> readIORef ref 44lookupCache _ _ _ = error "lookupCache" 45 46---------------------------------------------------------------- 47 48-- | The context for HTTP/2 connection. 49data Context = Context { 50 role :: Role 51 , roleInfo :: RoleInfo 52 -- HTTP/2 settings received from a browser 53 , http2settings :: IORef Settings 54 , firstSettings :: IORef Bool 55 , streamTable :: StreamTable 56 , concurrency :: IORef Int 57 -- | RFC 7540 says "Other frames (from any stream) MUST NOT 58 -- occur between the HEADERS frame and any CONTINUATION 59 -- frames that might follow". This field is used to implement 60 -- this requirement. 61 , continued :: IORef (Maybe StreamId) 62 , myStreamId :: IORef StreamId 63 , peerStreamId :: IORef StreamId 64 , outputQ :: TQueue (Output Stream) 65 , controlQ :: TQueue Control 66 , encodeDynamicTable :: DynamicTable 67 , decodeDynamicTable :: DynamicTable 68 -- the connection window for data from a server to a browser. 69 , connectionWindow :: TVar WindowSize 70 , pingRate :: Rate 71 , settingsRate :: Rate 72 , emptyFrameRate :: Rate 73 } 74 75---------------------------------------------------------------- 76 77newContext :: RoleInfo -> IO Context 78newContext rinfo = 79 Context rl rinfo 80 <$> newIORef defaultSettings 81 <*> newIORef False 82 <*> newStreamTable 83 <*> newIORef 0 84 <*> newIORef Nothing 85 <*> newIORef sid0 86 <*> newIORef 0 87 <*> newTQueueIO 88 <*> newTQueueIO 89 <*> newDynamicTableForEncoding defaultDynamicTableSize 90 <*> newDynamicTableForDecoding defaultDynamicTableSize 4096 91 <*> newTVarIO defaultInitialWindowSize 92 <*> newRate 93 <*> newRate 94 <*> newRate 95 where 96 rl = case rinfo of 97 ClientInfo{} -> Client 98 _ -> Server 99 sid0 | rl == Client = 1 100 | otherwise = 2 101 102---------------------------------------------------------------- 103 104isClient :: Context -> Bool 105isClient ctx = role ctx == Client 106 107isServer :: Context -> Bool 108isServer ctx = role ctx == Server 109 110---------------------------------------------------------------- 111 112getMyNewStreamId :: Context -> IO StreamId 113getMyNewStreamId ctx = atomicModifyIORef' (myStreamId ctx) inc2 114 where 115 inc2 n = let n' = n + 2 in (n', n) 116 117getPeerStreamID :: Context -> IO StreamId 118getPeerStreamID ctx = readIORef $ peerStreamId ctx 119 120setPeerStreamID :: Context -> StreamId -> IO () 121setPeerStreamID ctx sid = writeIORef (peerStreamId ctx) sid 122 123---------------------------------------------------------------- 124 125{-# INLINE setStreamState #-} 126setStreamState :: Context -> Stream -> StreamState -> IO () 127setStreamState _ Stream{streamState} val = writeIORef streamState val 128 129opened :: Context -> Stream -> IO () 130opened ctx@Context{concurrency} strm = do 131 atomicModifyIORef' concurrency (\x -> (x+1,())) 132 setStreamState ctx strm (Open JustOpened) 133 134halfClosedRemote :: Context -> Stream -> IO () 135halfClosedRemote ctx stream@Stream{streamState} = do 136 closingCode <- atomicModifyIORef streamState closeHalf 137 traverse_ (closed ctx stream) closingCode 138 where 139 closeHalf :: StreamState -> (StreamState, Maybe ClosedCode) 140 closeHalf x@(Closed _) = (x, Nothing) 141 closeHalf (HalfClosedLocal cc) = (Closed cc, Just cc) 142 closeHalf _ = (HalfClosedRemote, Nothing) 143 144halfClosedLocal :: Context -> Stream -> ClosedCode -> IO () 145halfClosedLocal ctx stream@Stream{streamState} cc = do 146 shouldFinalize <- atomicModifyIORef streamState closeHalf 147 when shouldFinalize $ 148 closed ctx stream cc 149 where 150 closeHalf :: StreamState -> (StreamState, Bool) 151 closeHalf x@(Closed _) = (x, False) 152 closeHalf HalfClosedRemote = (Closed cc, True) 153 closeHalf _ = (HalfClosedLocal cc, False) 154 155closed :: Context -> Stream -> ClosedCode -> IO () 156closed ctx@Context{concurrency,streamTable} strm@Stream{streamNumber} cc = do 157 remove streamTable streamNumber 158 -- TODO: prevent double-counting 159 atomicModifyIORef' concurrency (\x -> (x-1,())) 160 setStreamState ctx strm (Closed cc) -- anyway 161 162openStream :: Context -> StreamId -> FrameTypeId -> IO Stream 163openStream ctx@Context{streamTable, http2settings} sid ftyp = do 164 ws <- initialWindowSize <$> readIORef http2settings 165 newstrm <- newStream sid $ fromIntegral ws 166 when (ftyp == FrameHeaders || ftyp == FramePushPromise) $ opened ctx newstrm 167 insert streamTable sid newstrm 168 return newstrm 169