1{-# LANGUAGE NamedFieldPuns #-} 2{-# LANGUAGE OverloadedStrings #-} 3{-# LANGUAGE RecordWildCards #-} 4 5module Network.HTTP2.Arch.Sender ( 6 frameSender 7 , fillBuilderBodyGetNext 8 , fillFileBodyGetNext 9 , fillStreamBodyGetNext 10 , runTrailersMaker 11 ) where 12 13import Control.Concurrent.STM 14import qualified Control.Exception as E 15import qualified Data.ByteString as BS 16import Data.ByteString.Builder (Builder) 17import qualified Data.ByteString.Builder.Extra as B 18import Foreign.Ptr (plusPtr) 19import Network.ByteOrder 20 21import Imports 22import Network.HPACK (setLimitForEncoding, toHeaderTable) 23import Network.HTTP2.Arch.Config 24import Network.HTTP2.Arch.Context 25import Network.HTTP2.Arch.EncodeFrame 26import Network.HTTP2.Arch.File 27import Network.HTTP2.Arch.HPACK 28import Network.HTTP2.Arch.Manager hiding (start) 29import Network.HTTP2.Arch.Queue 30import Network.HTTP2.Arch.Stream 31import Network.HTTP2.Arch.Types 32import Network.HTTP2.Frame 33 34---------------------------------------------------------------- 35 36data Leftover = LZero 37 | LOne B.BufferWriter 38 | LTwo ByteString B.BufferWriter 39 40---------------------------------------------------------------- 41 42{-# INLINE getStreamWindowSize #-} 43getStreamWindowSize :: Stream -> IO WindowSize 44getStreamWindowSize Stream{streamWindow} = readTVarIO streamWindow 45 46{-# INLINE waitStreamWindowSize #-} 47waitStreamWindowSize :: Stream -> IO () 48waitStreamWindowSize Stream{streamWindow} = atomically $ do 49 w <- readTVar streamWindow 50 check (w > 0) 51 52{-# INLINE waitStreaming #-} 53waitStreaming :: TBQueue a -> IO () 54waitStreaming tbq = atomically $ do 55 isEmpty <- isEmptyTBQueue tbq 56 check (not isEmpty) 57 58data Switch = C Control 59 | O (Output Stream) 60 | Flush 61 62frameSender :: Context -> Config -> Manager -> IO () 63frameSender ctx@Context{outputQ,controlQ,connectionWindow,encodeDynamicTable} 64 Config{..} 65 mgr = loop 0 `E.catch` ignore 66 where 67 dequeue off = do 68 isEmpty <- isEmptyTQueue controlQ 69 if isEmpty then do 70 w <- readTVar connectionWindow 71 check (w > 0) 72 emp <- isEmptyTQueue outputQ 73 if emp then 74 if off /= 0 then return Flush else retry 75 else 76 O <$> readTQueue outputQ 77 else 78 C <$> readTQueue controlQ 79 80 hardLimit = confBufferSize - 512 81 82 loop off = do 83 x <- atomically $ dequeue off 84 case x of 85 C ctl -> do 86 when (off /= 0) $ flushN off 87 off' <- control ctl off 88 when (off' >= 0) $ loop off' 89 O out -> do 90 off' <- outputOrEnqueueAgain out off 91 case off' of 92 0 -> loop 0 93 _ | off' > hardLimit -> flushN off' >> loop 0 94 | otherwise -> loop off' 95 Flush -> flushN off >> loop 0 96 97 control CFinish _ = return (-1) 98 control (CGoaway frame) _ = confSendAll frame >> return (-1) 99 control (CFrame frame) _ = confSendAll frame >> return 0 100 control (CSettings frame alist) _ = do 101 confSendAll frame 102 setLimit alist 103 return 0 104 control (CSettings0 frame1 frame2 alist) off = do -- off == 0, just in case 105 let buf = confWriteBuffer `plusPtr` off 106 off' = off + BS.length frame1 + BS.length frame2 107 buf' <- copy buf frame1 108 void $ copy buf' frame2 109 setLimit alist 110 return off' 111 112 {-# INLINE setLimit #-} 113 setLimit alist = case lookup SettingsHeaderTableSize alist of 114 Nothing -> return () 115 Just siz -> setLimitForEncoding siz encodeDynamicTable 116 117 output out@(Output strm OutObj{} (ONext curr tlrmkr) _ sentinel) off0 lim = do 118 -- Data frame payload 119 let payloadOff = off0 + frameHeaderLength 120 datBuf = confWriteBuffer `plusPtr` payloadOff 121 datBufSiz = confBufferSize - payloadOff 122 Next datPayloadLen mnext <- curr datBuf datBufSiz lim -- checkme 123 NextTrailersMaker tlrmkr' <- runTrailersMaker tlrmkr datBuf datPayloadLen 124 fillDataHeaderEnqueueNext strm off0 datPayloadLen mnext tlrmkr' sentinel out 125 126 output out@(Output strm (OutObj hdr body tlrmkr) OObj mtbq _) off0 lim = do 127 -- Header frame and Continuation frame 128 let sid = streamNumber strm 129 endOfStream = case body of 130 OutBodyNone -> True 131 _ -> False 132 (ths,_) <- toHeaderTable $ fixHeaders hdr 133 kvlen <- headerContinue sid ths endOfStream off0 134 off <- sendHeadersIfNecessary $ off0 + frameHeaderLength + kvlen 135 case body of 136 OutBodyNone -> do 137 -- halfClosedLocal calls closed which removes 138 -- the stream from stream table. 139 when (isServer ctx) $ halfClosedLocal ctx strm Finished 140 return off 141 OutBodyFile (FileSpec path fileoff bytecount) -> do 142 (pread, sentinel') <- confPositionReadMaker path 143 refresh <- case sentinel' of 144 Closer closer -> timeoutClose mgr closer 145 Refresher refresher -> return refresher 146 let next = fillFileBodyGetNext pread fileoff bytecount refresh 147 out' = out { outputType = ONext next tlrmkr } 148 output out' off lim 149 OutBodyBuilder builder -> do 150 let next = fillBuilderBodyGetNext builder 151 out' = out { outputType = ONext next tlrmkr } 152 output out' off lim 153 OutBodyStreaming _ -> do 154 let tbq = fromJust mtbq 155 takeQ = atomically $ tryReadTBQueue tbq 156 next = fillStreamBodyGetNext takeQ 157 out' = out { outputType = ONext next tlrmkr } 158 output out' off lim 159 160 output out@(Output strm _ (OPush ths pid) _ _) off0 lim = do 161 -- Creating a push promise header 162 -- Frame id should be associated stream id from the client. 163 let sid = streamNumber strm 164 len <- pushPromise pid sid ths off0 165 off <- sendHeadersIfNecessary $ off0 + frameHeaderLength + len 166 output out{outputType=OObj} off lim 167 168 output _ _ _ = undefined -- never reach 169 170 outputOrEnqueueAgain :: Output Stream -> Int -> IO Int 171 outputOrEnqueueAgain out@(Output strm _ otyp _ _) off = E.handle resetStream $ do 172 state <- readStreamState strm 173 if isHalfClosedLocal state then 174 return off 175 else case otyp of 176 OWait wait -> do 177 -- Checking if all push are done. 178 forkAndEnqueueWhenReady wait outputQ out{outputType=OObj} mgr 179 return off 180 _ -> case mtbq of 181 Just tbq -> checkStreaming tbq 182 _ -> checkStreamWindowSize 183 where 184 mtbq = outputStrmQ out 185 checkStreaming tbq = do 186 isEmpty <- atomically $ isEmptyTBQueue tbq 187 if isEmpty then do 188 forkAndEnqueueWhenReady (waitStreaming tbq) outputQ out mgr 189 return off 190 else 191 checkStreamWindowSize 192 checkStreamWindowSize = do 193 sws <- getStreamWindowSize strm 194 if sws == 0 then do 195 forkAndEnqueueWhenReady (waitStreamWindowSize strm) outputQ out mgr 196 return off 197 else do 198 cws <- readTVarIO connectionWindow -- not 0 199 let lim = min cws sws 200 output out off lim 201 resetStream e = do 202 closed ctx strm (ResetByMe e) 203 let rst = resetFrame InternalError $ streamNumber strm 204 enqueueControl controlQ $ CFrame rst 205 return off 206 207 {-# INLINE flushN #-} 208 -- Flush the connection buffer to the socket, where the first 'n' bytes of 209 -- the buffer are filled. 210 flushN :: Int -> IO () 211 flushN n = bufferIO confWriteBuffer n confSendAll 212 213 headerContinue sid ths endOfStream off = do 214 let offkv = off + frameHeaderLength 215 let bufkv = confWriteBuffer `plusPtr` offkv 216 limkv = confBufferSize - offkv 217 (hs,kvlen) <- hpackEncodeHeader ctx bufkv limkv ths 218 let flag0 = case hs of 219 [] -> setEndHeader defaultFlags 220 _ -> defaultFlags 221 flag = if endOfStream then setEndStream flag0 else flag0 222 let buf = confWriteBuffer `plusPtr` off 223 fillFrameHeader FrameHeaders kvlen sid flag buf 224 continue sid kvlen hs 225 226 bufHeaderPayload = confWriteBuffer `plusPtr` frameHeaderLength 227 headerPayloadLim = confBufferSize - frameHeaderLength 228 229 continue _ kvlen [] = return kvlen 230 continue sid kvlen ths = do 231 flushN $ kvlen + frameHeaderLength 232 -- Now off is 0 233 (ths', kvlen') <- hpackEncodeHeaderLoop ctx bufHeaderPayload headerPayloadLim ths 234 when (ths == ths') $ E.throwIO $ ConnectionError CompressionError "cannot compress the header" 235 let flag = case ths' of 236 [] -> setEndHeader defaultFlags 237 _ -> defaultFlags 238 fillFrameHeader FrameContinuation kvlen' sid flag confWriteBuffer 239 continue sid kvlen' ths' 240 241 {-# INLINE sendHeadersIfNecessary #-} 242 -- Send headers if there is not room for a 1-byte data frame, and return 243 -- the offset of the next frame's first header byte. 244 sendHeadersIfNecessary off 245 -- True if the connection buffer has room for a 1-byte data frame. 246 | off + frameHeaderLength < confBufferSize = return off 247 | otherwise = do 248 flushN off 249 return 0 250 251 fillDataHeaderEnqueueNext strm@Stream{streamWindow,streamNumber} 252 off datPayloadLen Nothing tlrmkr tell _ = do 253 let buf = confWriteBuffer `plusPtr` off 254 off' = off + frameHeaderLength + datPayloadLen 255 (mtrailers, flag) <- do 256 Trailers trailers <- tlrmkr Nothing 257 if null trailers then 258 return (Nothing, setEndStream defaultFlags) 259 else 260 return (Just trailers, defaultFlags) 261 fillFrameHeader FrameData datPayloadLen streamNumber flag buf 262 off'' <- handleTrailers mtrailers off' 263 void tell 264 when (isServer ctx) $ halfClosedLocal ctx strm Finished 265 atomically $ modifyTVar' connectionWindow (subtract datPayloadLen) 266 atomically $ modifyTVar' streamWindow (subtract datPayloadLen) 267 return off'' 268 where 269 handleTrailers Nothing off0 = return off0 270 handleTrailers (Just trailers) off0 = do 271 (ths,_) <- toHeaderTable trailers 272 kvlen <- headerContinue streamNumber ths True off0 273 sendHeadersIfNecessary $ off0 + frameHeaderLength + kvlen 274 275 fillDataHeaderEnqueueNext Stream{streamWindow,streamNumber} 276 off datPayloadLen (Just next) tlrmkr _ out = do 277 let buf = confWriteBuffer `plusPtr` off 278 off' = off + frameHeaderLength + datPayloadLen 279 flag = defaultFlags 280 fillFrameHeader FrameData datPayloadLen streamNumber flag buf 281 atomically $ modifyTVar' connectionWindow (subtract datPayloadLen) 282 atomically $ modifyTVar' streamWindow (subtract datPayloadLen) 283 let out' = out { outputType = ONext next tlrmkr } 284 enqueueOutput outputQ out' 285 return off' 286 287 pushPromise pid sid ths off = do 288 let offsid = off + frameHeaderLength -- checkme 289 bufsid = confWriteBuffer `plusPtr` offsid 290 poke32 (fromIntegral sid) bufsid 0 291 let offkv = offsid + 4 292 bufkv = confWriteBuffer `plusPtr` offkv 293 limkv = confBufferSize - offkv 294 (_,kvlen) <- hpackEncodeHeader ctx bufkv limkv ths 295 let flag = setEndHeader defaultFlags -- No EndStream flag 296 buf = confWriteBuffer `plusPtr` off 297 len = kvlen + 4 298 fillFrameHeader FramePushPromise len pid flag buf 299 return len 300 301 {-# INLINE fillFrameHeader #-} 302 fillFrameHeader ftyp len sid flag buf = encodeFrameHeaderBuf ftyp hinfo buf 303 where 304 hinfo = FrameHeader len flag sid 305 306 {-# INLINE ignore #-} 307 ignore :: E.SomeException -> IO () 308 ignore _ = return () 309 310-- | Running trailers-maker. 311-- 312-- > bufferIO buf siz $ \bs -> tlrmkr (Just bs) 313runTrailersMaker :: TrailersMaker -> Buffer -> Int -> IO NextTrailersMaker 314runTrailersMaker tlrmkr buf siz = bufferIO buf siz $ \bs -> tlrmkr (Just bs) 315 316---------------------------------------------------------------- 317 318fillBuilderBodyGetNext :: Builder -> DynaNext 319fillBuilderBodyGetNext bb buf siz lim = do 320 let room = min siz lim 321 (len, signal) <- B.runBuilder bb buf room 322 return $ nextForBuilder len signal 323 324fillFileBodyGetNext :: PositionRead -> FileOffset -> ByteCount -> IO () -> DynaNext 325fillFileBodyGetNext pread start bytecount refresh buf siz lim = do 326 let room = min siz lim 327 len <- pread start (mini room bytecount) buf 328 let len' = fromIntegral len 329 return $ nextForFile len' pread (start + len) (bytecount - len) refresh 330 331fillStreamBodyGetNext :: IO (Maybe StreamingChunk) -> DynaNext 332fillStreamBodyGetNext takeQ buf siz lim = do 333 let room = min siz lim 334 (leftover, cont, len) <- runStreamBuilder buf room takeQ 335 return $ nextForStream takeQ leftover cont len 336 337---------------------------------------------------------------- 338 339fillBufBuilder :: Leftover -> DynaNext 340fillBufBuilder leftover buf0 siz0 lim = do 341 let room = min siz0 lim 342 case leftover of 343 LZero -> error "fillBufBuilder: LZero" 344 LOne writer -> do 345 (len, signal) <- writer buf0 room 346 getNext len signal 347 LTwo bs writer 348 | BS.length bs <= room -> do 349 buf1 <- copy buf0 bs 350 let len1 = BS.length bs 351 (len2, signal) <- writer buf1 (room - len1) 352 getNext (len1 + len2) signal 353 | otherwise -> do 354 let (bs1,bs2) = BS.splitAt room bs 355 void $ copy buf0 bs1 356 getNext room (B.Chunk bs2 writer) 357 where 358 getNext l s = return $ nextForBuilder l s 359 360nextForBuilder :: BytesFilled -> B.Next -> Next 361nextForBuilder len B.Done 362 = Next len Nothing 363nextForBuilder len (B.More _ writer) 364 = Next len $ Just (fillBufBuilder (LOne writer)) 365nextForBuilder len (B.Chunk bs writer) 366 = Next len $ Just (fillBufBuilder (LTwo bs writer)) 367 368---------------------------------------------------------------- 369 370runStreamBuilder :: Buffer -> BufferSize -> IO (Maybe StreamingChunk) 371 -> IO (Leftover, Bool, BytesFilled) 372runStreamBuilder buf0 room0 takeQ = loop buf0 room0 0 373 where 374 loop buf room total = do 375 mbuilder <- takeQ 376 case mbuilder of 377 Nothing -> return (LZero, True, total) 378 Just (StreamingBuilder builder) -> do 379 (len, signal) <- B.runBuilder builder buf room 380 let total' = total + len 381 case signal of 382 B.Done -> loop (buf `plusPtr` len) (room - len) total' 383 B.More _ writer -> return (LOne writer, True, total') 384 B.Chunk bs writer -> return (LTwo bs writer, True, total') 385 Just StreamingFlush -> return (LZero, True, total) 386 Just StreamingFinished -> return (LZero, False, total) 387 388fillBufStream :: Leftover -> IO (Maybe StreamingChunk) -> DynaNext 389fillBufStream leftover0 takeQ buf0 siz0 lim0 = do 390 let room0 = min siz0 lim0 391 case leftover0 of 392 LZero -> do 393 (leftover, cont, len) <- runStreamBuilder buf0 room0 takeQ 394 getNext leftover cont len 395 LOne writer -> write writer buf0 room0 0 396 LTwo bs writer 397 | BS.length bs <= room0 -> do 398 buf1 <- copy buf0 bs 399 let len = BS.length bs 400 write writer buf1 (room0 - len) len 401 | otherwise -> do 402 let (bs1,bs2) = BS.splitAt room0 bs 403 void $ copy buf0 bs1 404 getNext (LTwo bs2 writer) True room0 405 where 406 getNext l b r = return $ nextForStream takeQ l b r 407 write writer1 buf room sofar = do 408 (len, signal) <- writer1 buf room 409 case signal of 410 B.Done -> do 411 (leftover, cont, extra) <- runStreamBuilder (buf `plusPtr` len) (room - len) takeQ 412 let total = sofar + len + extra 413 getNext leftover cont total 414 B.More _ writer -> do 415 let total = sofar + len 416 getNext (LOne writer) True total 417 B.Chunk bs writer -> do 418 let total = sofar + len 419 getNext (LTwo bs writer) True total 420 421nextForStream :: IO (Maybe StreamingChunk) 422 -> Leftover -> Bool -> BytesFilled 423 -> Next 424nextForStream _ _ False len = Next len Nothing 425nextForStream takeQ leftOrZero True len = 426 Next len $ Just (fillBufStream leftOrZero takeQ) 427 428---------------------------------------------------------------- 429 430fillBufFile :: PositionRead -> FileOffset -> ByteCount -> IO () -> DynaNext 431fillBufFile pread start bytes refresh buf siz lim = do 432 let room = min siz lim 433 len <- pread start (mini room bytes) buf 434 refresh 435 let len' = fromIntegral len 436 return $ nextForFile len' pread (start + len) (bytes - len) refresh 437 438nextForFile :: BytesFilled -> PositionRead -> FileOffset -> ByteCount -> IO () -> Next 439nextForFile 0 _ _ _ _ = Next 0 Nothing 440nextForFile len _ _ 0 _ = Next len Nothing 441nextForFile len pread start bytes refresh = 442 Next len $ Just (fillBufFile pread start bytes refresh) 443 444{-# INLINE mini #-} 445mini :: Int -> Int64 -> Int64 446mini i n 447 | fromIntegral i < n = fromIntegral i 448 | otherwise = n 449