1{-# LANGUAGE NamedFieldPuns #-}
2{-# LANGUAGE OverloadedStrings #-}
3{-# LANGUAGE RecordWildCards #-}
4
5module Network.HTTP2.Arch.Sender (
6    frameSender
7  , fillBuilderBodyGetNext
8  , fillFileBodyGetNext
9  , fillStreamBodyGetNext
10  , runTrailersMaker
11  ) where
12
13import Control.Concurrent.STM
14import qualified Control.Exception as E
15import qualified Data.ByteString as BS
16import Data.ByteString.Builder (Builder)
17import qualified Data.ByteString.Builder.Extra as B
18import Foreign.Ptr (plusPtr)
19import Network.ByteOrder
20
21import Imports
22import Network.HPACK (setLimitForEncoding, toHeaderTable)
23import Network.HTTP2.Arch.Config
24import Network.HTTP2.Arch.Context
25import Network.HTTP2.Arch.EncodeFrame
26import Network.HTTP2.Arch.File
27import Network.HTTP2.Arch.HPACK
28import Network.HTTP2.Arch.Manager hiding (start)
29import Network.HTTP2.Arch.Queue
30import Network.HTTP2.Arch.Stream
31import Network.HTTP2.Arch.Types
32import Network.HTTP2.Frame
33
34----------------------------------------------------------------
35
36data Leftover = LZero
37              | LOne B.BufferWriter
38              | LTwo ByteString B.BufferWriter
39
40----------------------------------------------------------------
41
42{-# INLINE getStreamWindowSize #-}
43getStreamWindowSize :: Stream -> IO WindowSize
44getStreamWindowSize Stream{streamWindow} = readTVarIO streamWindow
45
46{-# INLINE waitStreamWindowSize #-}
47waitStreamWindowSize :: Stream -> IO ()
48waitStreamWindowSize Stream{streamWindow} = atomically $ do
49    w <- readTVar streamWindow
50    check (w > 0)
51
52{-# INLINE waitStreaming #-}
53waitStreaming :: TBQueue a -> IO ()
54waitStreaming tbq = atomically $ do
55    isEmpty <- isEmptyTBQueue tbq
56    check (not isEmpty)
57
58data Switch = C Control
59            | O (Output Stream)
60            | Flush
61
62frameSender :: Context -> Config -> Manager -> IO ()
63frameSender ctx@Context{outputQ,controlQ,connectionWindow,encodeDynamicTable}
64            Config{..}
65            mgr = loop 0 `E.catch` ignore
66  where
67    dequeue off = do
68        isEmpty <- isEmptyTQueue controlQ
69        if isEmpty then do
70            w <- readTVar connectionWindow
71            check (w > 0)
72            emp <- isEmptyTQueue outputQ
73            if emp then
74                if off /= 0 then return Flush else retry
75              else
76                O <$> readTQueue outputQ
77          else
78            C <$> readTQueue controlQ
79
80    hardLimit = confBufferSize - 512
81
82    loop off = do
83        x <- atomically $ dequeue off
84        case x of
85            C ctl -> do
86                when (off /= 0) $ flushN off
87                off' <- control ctl off
88                when (off' >= 0) $ loop off'
89            O out -> do
90                off' <- outputOrEnqueueAgain out off
91                case off' of
92                    0                    -> loop 0
93                    _ | off' > hardLimit -> flushN off' >> loop 0
94                      | otherwise        -> loop off'
95            Flush -> flushN off >> loop 0
96
97    control CFinish         _ = return (-1)
98    control (CGoaway frame) _ = confSendAll frame >> return (-1)
99    control (CFrame frame)  _ = confSendAll frame >> return 0
100    control (CSettings frame alist) _ = do
101        confSendAll frame
102        setLimit alist
103        return 0
104    control (CSettings0 frame1 frame2 alist) off = do -- off == 0, just in case
105        let buf = confWriteBuffer `plusPtr` off
106            off' = off + BS.length frame1 + BS.length frame2
107        buf' <- copy buf frame1
108        void $ copy buf' frame2
109        setLimit alist
110        return off'
111
112    {-# INLINE setLimit #-}
113    setLimit alist = case lookup SettingsHeaderTableSize alist of
114        Nothing  -> return ()
115        Just siz -> setLimitForEncoding siz encodeDynamicTable
116
117    output out@(Output strm OutObj{} (ONext curr tlrmkr) _ sentinel) off0 lim = do
118        -- Data frame payload
119        let payloadOff = off0 + frameHeaderLength
120            datBuf     = confWriteBuffer `plusPtr` payloadOff
121            datBufSiz  = confBufferSize - payloadOff
122        Next datPayloadLen mnext <- curr datBuf datBufSiz lim -- checkme
123        NextTrailersMaker tlrmkr' <- runTrailersMaker tlrmkr datBuf datPayloadLen
124        fillDataHeaderEnqueueNext strm off0 datPayloadLen mnext tlrmkr' sentinel out
125
126    output out@(Output strm (OutObj hdr body tlrmkr) OObj mtbq _) off0 lim = do
127        -- Header frame and Continuation frame
128        let sid = streamNumber strm
129            endOfStream = case body of
130                OutBodyNone -> True
131                _           -> False
132        (ths,_) <- toHeaderTable $ fixHeaders hdr
133        kvlen <- headerContinue sid ths endOfStream off0
134        off <- sendHeadersIfNecessary $ off0 + frameHeaderLength + kvlen
135        case body of
136            OutBodyNone -> do
137                -- halfClosedLocal calls closed which removes
138                -- the stream from stream table.
139                when (isServer ctx) $ halfClosedLocal ctx strm Finished
140                return off
141            OutBodyFile (FileSpec path fileoff bytecount) -> do
142                (pread, sentinel') <- confPositionReadMaker path
143                refresh <- case sentinel' of
144                             Closer closer       -> timeoutClose mgr closer
145                             Refresher refresher -> return refresher
146                let next = fillFileBodyGetNext pread fileoff bytecount refresh
147                    out' = out { outputType = ONext next tlrmkr }
148                output out' off lim
149            OutBodyBuilder builder -> do
150                let next = fillBuilderBodyGetNext builder
151                    out' = out { outputType = ONext next tlrmkr }
152                output out' off lim
153            OutBodyStreaming _ -> do
154                let tbq = fromJust mtbq
155                    takeQ = atomically $ tryReadTBQueue tbq
156                    next = fillStreamBodyGetNext takeQ
157                    out' = out { outputType = ONext next tlrmkr }
158                output out' off lim
159
160    output out@(Output strm _ (OPush ths pid) _ _) off0 lim = do
161        -- Creating a push promise header
162        -- Frame id should be associated stream id from the client.
163        let sid = streamNumber strm
164        len <- pushPromise pid sid ths off0
165        off <- sendHeadersIfNecessary $ off0 + frameHeaderLength + len
166        output out{outputType=OObj} off lim
167
168    output _ _ _ = undefined -- never reach
169
170    outputOrEnqueueAgain :: Output Stream -> Int -> IO Int
171    outputOrEnqueueAgain out@(Output strm _ otyp _ _) off = E.handle resetStream $ do
172        state <- readStreamState strm
173        if isHalfClosedLocal state then
174            return off
175          else case otyp of
176                 OWait wait -> do
177                     -- Checking if all push are done.
178                     forkAndEnqueueWhenReady wait outputQ out{outputType=OObj} mgr
179                     return off
180                 _ -> case mtbq of
181                        Just tbq -> checkStreaming tbq
182                        _        -> checkStreamWindowSize
183      where
184        mtbq = outputStrmQ out
185        checkStreaming tbq = do
186            isEmpty <- atomically $ isEmptyTBQueue tbq
187            if isEmpty then do
188                forkAndEnqueueWhenReady (waitStreaming tbq) outputQ out mgr
189                return off
190              else
191                checkStreamWindowSize
192        checkStreamWindowSize = do
193            sws <- getStreamWindowSize strm
194            if sws == 0 then do
195                forkAndEnqueueWhenReady (waitStreamWindowSize strm) outputQ out mgr
196                return off
197              else do
198                cws <- readTVarIO connectionWindow -- not 0
199                let lim = min cws sws
200                output out off lim
201        resetStream e = do
202            closed ctx strm (ResetByMe e)
203            let rst = resetFrame InternalError $ streamNumber strm
204            enqueueControl controlQ $ CFrame rst
205            return off
206
207    {-# INLINE flushN #-}
208    -- Flush the connection buffer to the socket, where the first 'n' bytes of
209    -- the buffer are filled.
210    flushN :: Int -> IO ()
211    flushN n = bufferIO confWriteBuffer n confSendAll
212
213    headerContinue sid ths endOfStream off = do
214        let offkv = off + frameHeaderLength
215        let bufkv = confWriteBuffer `plusPtr` offkv
216            limkv = confBufferSize - offkv
217        (hs,kvlen) <- hpackEncodeHeader ctx bufkv limkv ths
218        let flag0 = case hs of
219                [] -> setEndHeader defaultFlags
220                _  -> defaultFlags
221            flag = if endOfStream then setEndStream flag0 else flag0
222        let buf = confWriteBuffer `plusPtr` off
223        fillFrameHeader FrameHeaders kvlen sid flag buf
224        continue sid kvlen hs
225
226    bufHeaderPayload = confWriteBuffer `plusPtr` frameHeaderLength
227    headerPayloadLim = confBufferSize - frameHeaderLength
228
229    continue _   kvlen [] = return kvlen
230    continue sid kvlen ths = do
231        flushN $ kvlen + frameHeaderLength
232        -- Now off is 0
233        (ths', kvlen') <- hpackEncodeHeaderLoop ctx bufHeaderPayload headerPayloadLim ths
234        when (ths == ths') $ E.throwIO $ ConnectionError CompressionError "cannot compress the header"
235        let flag = case ths' of
236                [] -> setEndHeader defaultFlags
237                _  -> defaultFlags
238        fillFrameHeader FrameContinuation kvlen' sid flag confWriteBuffer
239        continue sid kvlen' ths'
240
241    {-# INLINE sendHeadersIfNecessary #-}
242    -- Send headers if there is not room for a 1-byte data frame, and return
243    -- the offset of the next frame's first header byte.
244    sendHeadersIfNecessary off
245      -- True if the connection buffer has room for a 1-byte data frame.
246      | off + frameHeaderLength < confBufferSize = return off
247      | otherwise = do
248          flushN off
249          return 0
250
251    fillDataHeaderEnqueueNext strm@Stream{streamWindow,streamNumber}
252                   off datPayloadLen Nothing tlrmkr tell _ = do
253        let buf  = confWriteBuffer `plusPtr` off
254            off' = off + frameHeaderLength + datPayloadLen
255        (mtrailers, flag) <- do
256              Trailers trailers <- tlrmkr Nothing
257              if null trailers then
258                  return (Nothing, setEndStream defaultFlags)
259                else
260                  return (Just trailers, defaultFlags)
261        fillFrameHeader FrameData datPayloadLen streamNumber flag buf
262        off'' <- handleTrailers mtrailers off'
263        void tell
264        when (isServer ctx) $ halfClosedLocal ctx strm Finished
265        atomically $ modifyTVar' connectionWindow (subtract datPayloadLen)
266        atomically $ modifyTVar' streamWindow (subtract datPayloadLen)
267        return off''
268      where
269        handleTrailers Nothing off0 = return off0
270        handleTrailers (Just trailers) off0 = do
271            (ths,_) <- toHeaderTable trailers
272            kvlen <- headerContinue streamNumber ths True off0
273            sendHeadersIfNecessary $ off0 + frameHeaderLength + kvlen
274
275    fillDataHeaderEnqueueNext Stream{streamWindow,streamNumber}
276                   off datPayloadLen (Just next) tlrmkr _ out = do
277        let buf  = confWriteBuffer `plusPtr` off
278            off' = off + frameHeaderLength + datPayloadLen
279            flag  = defaultFlags
280        fillFrameHeader FrameData datPayloadLen streamNumber flag buf
281        atomically $ modifyTVar' connectionWindow (subtract datPayloadLen)
282        atomically $ modifyTVar' streamWindow (subtract datPayloadLen)
283        let out' = out { outputType = ONext next tlrmkr }
284        enqueueOutput outputQ out'
285        return off'
286
287    pushPromise pid sid ths off = do
288        let offsid = off + frameHeaderLength -- checkme
289            bufsid = confWriteBuffer `plusPtr` offsid
290        poke32 (fromIntegral sid) bufsid 0
291        let offkv  = offsid + 4
292            bufkv  = confWriteBuffer `plusPtr` offkv
293            limkv  = confBufferSize - offkv
294        (_,kvlen) <- hpackEncodeHeader ctx bufkv limkv ths
295        let flag = setEndHeader defaultFlags -- No EndStream flag
296            buf = confWriteBuffer `plusPtr` off
297            len = kvlen + 4
298        fillFrameHeader FramePushPromise len pid flag buf
299        return len
300
301    {-# INLINE fillFrameHeader #-}
302    fillFrameHeader ftyp len sid flag buf = encodeFrameHeaderBuf ftyp hinfo buf
303      where
304        hinfo = FrameHeader len flag sid
305
306    {-# INLINE ignore #-}
307    ignore :: E.SomeException -> IO ()
308    ignore _ = return ()
309
310-- | Running trailers-maker.
311--
312-- > bufferIO buf siz $ \bs -> tlrmkr (Just bs)
313runTrailersMaker :: TrailersMaker -> Buffer -> Int -> IO NextTrailersMaker
314runTrailersMaker tlrmkr buf siz = bufferIO buf siz $ \bs -> tlrmkr (Just bs)
315
316----------------------------------------------------------------
317
318fillBuilderBodyGetNext :: Builder -> DynaNext
319fillBuilderBodyGetNext bb buf siz lim = do
320    let room = min siz lim
321    (len, signal) <- B.runBuilder bb buf room
322    return $ nextForBuilder len signal
323
324fillFileBodyGetNext :: PositionRead -> FileOffset -> ByteCount -> IO () -> DynaNext
325fillFileBodyGetNext pread start bytecount refresh buf siz lim = do
326    let room = min siz lim
327    len <- pread start (mini room bytecount) buf
328    let len' = fromIntegral len
329    return $ nextForFile len' pread (start + len) (bytecount - len) refresh
330
331fillStreamBodyGetNext :: IO (Maybe StreamingChunk) -> DynaNext
332fillStreamBodyGetNext takeQ buf siz lim = do
333    let room = min siz lim
334    (leftover, cont, len) <- runStreamBuilder buf room takeQ
335    return $ nextForStream takeQ leftover cont len
336
337----------------------------------------------------------------
338
339fillBufBuilder :: Leftover -> DynaNext
340fillBufBuilder leftover buf0 siz0 lim = do
341    let room = min siz0 lim
342    case leftover of
343        LZero -> error "fillBufBuilder: LZero"
344        LOne writer -> do
345            (len, signal) <- writer buf0 room
346            getNext len signal
347        LTwo bs writer
348          | BS.length bs <= room -> do
349              buf1 <- copy buf0 bs
350              let len1 = BS.length bs
351              (len2, signal) <- writer buf1 (room - len1)
352              getNext (len1 + len2) signal
353          | otherwise -> do
354              let (bs1,bs2) = BS.splitAt room bs
355              void $ copy buf0 bs1
356              getNext room (B.Chunk bs2 writer)
357  where
358    getNext l s = return $ nextForBuilder l s
359
360nextForBuilder :: BytesFilled -> B.Next -> Next
361nextForBuilder len B.Done
362    = Next len Nothing
363nextForBuilder len (B.More _ writer)
364    = Next len $ Just (fillBufBuilder (LOne writer))
365nextForBuilder len (B.Chunk bs writer)
366    = Next len $ Just (fillBufBuilder (LTwo bs writer))
367
368----------------------------------------------------------------
369
370runStreamBuilder :: Buffer -> BufferSize -> IO (Maybe StreamingChunk)
371                 -> IO (Leftover, Bool, BytesFilled)
372runStreamBuilder buf0 room0 takeQ = loop buf0 room0 0
373  where
374    loop buf room total = do
375        mbuilder <- takeQ
376        case mbuilder of
377            Nothing      -> return (LZero, True, total)
378            Just (StreamingBuilder builder) -> do
379                (len, signal) <- B.runBuilder builder buf room
380                let total' = total + len
381                case signal of
382                    B.Done -> loop (buf `plusPtr` len) (room - len) total'
383                    B.More  _ writer  -> return (LOne writer, True, total')
384                    B.Chunk bs writer -> return (LTwo bs writer, True, total')
385            Just StreamingFlush       -> return (LZero, True, total)
386            Just StreamingFinished    -> return (LZero, False, total)
387
388fillBufStream :: Leftover -> IO (Maybe StreamingChunk) -> DynaNext
389fillBufStream leftover0 takeQ buf0 siz0 lim0 = do
390    let room0 = min siz0 lim0
391    case leftover0 of
392        LZero -> do
393            (leftover, cont, len) <- runStreamBuilder buf0 room0 takeQ
394            getNext leftover cont len
395        LOne writer -> write writer buf0 room0 0
396        LTwo bs writer
397          | BS.length bs <= room0 -> do
398              buf1 <- copy buf0 bs
399              let len = BS.length bs
400              write writer buf1 (room0 - len) len
401          | otherwise -> do
402              let (bs1,bs2) = BS.splitAt room0 bs
403              void $ copy buf0 bs1
404              getNext (LTwo bs2 writer) True room0
405  where
406    getNext l b r = return $ nextForStream takeQ l b r
407    write writer1 buf room sofar = do
408        (len, signal) <- writer1 buf room
409        case signal of
410            B.Done -> do
411                (leftover, cont, extra) <- runStreamBuilder (buf `plusPtr` len) (room - len) takeQ
412                let total = sofar + len + extra
413                getNext leftover cont total
414            B.More  _ writer -> do
415                let total = sofar + len
416                getNext (LOne writer) True total
417            B.Chunk bs writer -> do
418                let total = sofar + len
419                getNext (LTwo bs writer) True total
420
421nextForStream :: IO (Maybe StreamingChunk)
422              -> Leftover -> Bool -> BytesFilled
423              -> Next
424nextForStream _ _ False len = Next len Nothing
425nextForStream takeQ leftOrZero True len =
426    Next len $ Just (fillBufStream leftOrZero takeQ)
427
428----------------------------------------------------------------
429
430fillBufFile :: PositionRead -> FileOffset -> ByteCount -> IO () -> DynaNext
431fillBufFile pread start bytes refresh buf siz lim = do
432    let room = min siz lim
433    len <- pread start (mini room bytes) buf
434    refresh
435    let len' = fromIntegral len
436    return $ nextForFile len' pread (start + len) (bytes - len) refresh
437
438nextForFile :: BytesFilled -> PositionRead -> FileOffset -> ByteCount -> IO () -> Next
439nextForFile 0   _  _     _     _       = Next 0   Nothing
440nextForFile len _  _     0     _       = Next len Nothing
441nextForFile len pread start bytes refresh =
442    Next len $ Just (fillBufFile pread start bytes refresh)
443
444{-# INLINE mini #-}
445mini :: Int -> Int64 -> Int64
446mini i n
447  | fromIntegral i < n = fromIntegral i
448  | otherwise          = n
449