1# -*- test-case-name: twisted.web.test.test_http2 -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6HTTP2 Implementation
7
8This is the basic server-side protocol implementation used by the Twisted
9Web server for HTTP2.  This functionality is intended to be combined with the
10HTTP/1.1 and HTTP/1.0 functionality in twisted.web.http to provide complete
11protocol support for HTTP-type protocols.
12
13This API is currently considered private because it's in early draft form. When
14it has stabilised, it'll be made public.
15"""
16
17
18import io
19from collections import deque
20from typing import List
21
22from zope.interface import implementer
23
24import h2.config  # type: ignore[import]
25import h2.connection  # type: ignore[import]
26import h2.errors  # type: ignore[import]
27import h2.events  # type: ignore[import]
28import h2.exceptions  # type: ignore[import]
29import priority  # type: ignore[import]
30
31from twisted.internet._producer_helpers import _PullToPush
32from twisted.internet.defer import Deferred
33from twisted.internet.error import ConnectionLost
34from twisted.internet.interfaces import (
35    IConsumer,
36    IProtocol,
37    IPushProducer,
38    ISSLTransport,
39    ITransport,
40)
41from twisted.internet.protocol import Protocol
42from twisted.logger import Logger
43from twisted.protocols.policies import TimeoutMixin
44from twisted.python.failure import Failure
45from twisted.web.error import ExcessiveBufferingError
46
47# This API is currently considered private.
48__all__: List[str] = []
49
50
51_END_STREAM_SENTINEL = object()
52
53
54@implementer(IProtocol, IPushProducer)
55class H2Connection(Protocol, TimeoutMixin):
56    """
57    A class representing a single HTTP/2 connection.
58
59    This implementation of L{IProtocol} works hand in hand with L{H2Stream}.
60    This is because we have the requirement to register multiple producers for
61    a single HTTP/2 connection, one for each stream. The standard Twisted
62    interfaces don't really allow for this, so instead there's a custom
63    interface between the two objects that allows them to work hand-in-hand here.
64
65    @ivar conn: The HTTP/2 connection state machine.
66    @type conn: L{h2.connection.H2Connection}
67
68    @ivar streams: A mapping of stream IDs to L{H2Stream} objects, used to call
69        specific methods on streams when events occur.
70    @type streams: L{dict}, mapping L{int} stream IDs to L{H2Stream} objects.
71
72    @ivar priority: A HTTP/2 priority tree used to ensure that responses are
73        prioritised appropriately.
74    @type priority: L{priority.PriorityTree}
75
76    @ivar _consumerBlocked: A flag tracking whether or not the L{IConsumer}
77        that is consuming this data has asked us to stop producing.
78    @type _consumerBlocked: L{bool}
79
80    @ivar _sendingDeferred: A L{Deferred} used to restart the data-sending loop
81        when more response data has been produced. Will not be present if there
82        is outstanding data still to send.
83    @type _consumerBlocked: A L{twisted.internet.defer.Deferred}, or L{None}
84
85    @ivar _outboundStreamQueues: A map of stream IDs to queues, used to store
86        data blocks that are yet to be sent on the connection. These are used
87        both to handle producers that do not respect L{IConsumer} but also to
88        allow priority to multiplex data appropriately.
89    @type _outboundStreamQueues: A L{dict} mapping L{int} stream IDs to
90        L{collections.deque} queues, which contain either L{bytes} objects or
91        C{_END_STREAM_SENTINEL}.
92
93    @ivar _sender: A handle to the data-sending loop, allowing it to be
94        terminated if needed.
95    @type _sender: L{twisted.internet.task.LoopingCall}
96
97    @ivar abortTimeout: The number of seconds to wait after we attempt to shut
98        the transport down cleanly to give up and forcibly terminate it. This
99        is only used when we time a connection out, to prevent errors causing
100        the FD to get leaked. If this is L{None}, we will wait forever.
101    @type abortTimeout: L{int}
102
103    @ivar _abortingCall: The L{twisted.internet.base.DelayedCall} that will be
104        used to forcibly close the transport if it doesn't close cleanly.
105    @type _abortingCall: L{twisted.internet.base.DelayedCall}
106    """
107
108    factory = None
109    site = None
110    abortTimeout = 15
111
112    _log = Logger()
113    _abortingCall = None
114
115    def __init__(self, reactor=None):
116        config = h2.config.H2Configuration(client_side=False, header_encoding=None)
117        self.conn = h2.connection.H2Connection(config=config)
118        self.streams = {}
119
120        self.priority = priority.PriorityTree()
121        self._consumerBlocked = None
122        self._sendingDeferred = None
123        self._outboundStreamQueues = {}
124        self._streamCleanupCallbacks = {}
125        self._stillProducing = True
126
127        # Limit the number of buffered control frame (e.g. PING and
128        # SETTINGS) bytes.
129        self._maxBufferedControlFrameBytes = 1024 * 17
130        self._bufferedControlFrames = deque()
131        self._bufferedControlFrameBytes = 0
132
133        if reactor is None:
134            from twisted.internet import reactor
135        self._reactor = reactor
136
137        # Start the data sending function.
138        self._reactor.callLater(0, self._sendPrioritisedData)
139
140    # Implementation of IProtocol
141    def connectionMade(self):
142        """
143        Called by the reactor when a connection is received. May also be called
144        by the L{twisted.web.http._GenericHTTPChannelProtocol} during upgrade
145        to HTTP/2.
146        """
147        self.setTimeout(self.timeOut)
148        self.conn.initiate_connection()
149        self.transport.write(self.conn.data_to_send())
150
151    def dataReceived(self, data):
152        """
153        Called whenever a chunk of data is received from the transport.
154
155        @param data: The data received from the transport.
156        @type data: L{bytes}
157        """
158        try:
159            events = self.conn.receive_data(data)
160        except h2.exceptions.ProtocolError:
161            stillActive = self._tryToWriteControlData()
162            if stillActive:
163                self.transport.loseConnection()
164                self.connectionLost(Failure(), _cancelTimeouts=False)
165            return
166
167        # Only reset the timeout if we've received an actual H2
168        # protocol message
169        self.resetTimeout()
170
171        for event in events:
172            if isinstance(event, h2.events.RequestReceived):
173                self._requestReceived(event)
174            elif isinstance(event, h2.events.DataReceived):
175                self._requestDataReceived(event)
176            elif isinstance(event, h2.events.StreamEnded):
177                self._requestEnded(event)
178            elif isinstance(event, h2.events.StreamReset):
179                self._requestAborted(event)
180            elif isinstance(event, h2.events.WindowUpdated):
181                self._handleWindowUpdate(event)
182            elif isinstance(event, h2.events.PriorityUpdated):
183                self._handlePriorityUpdate(event)
184            elif isinstance(event, h2.events.ConnectionTerminated):
185                self.transport.loseConnection()
186                self.connectionLost(
187                    Failure(ConnectionLost("Remote peer sent GOAWAY")),
188                    _cancelTimeouts=False,
189                )
190
191        self._tryToWriteControlData()
192
193    def timeoutConnection(self):
194        """
195        Called when the connection has been inactive for
196        L{self.timeOut<twisted.protocols.policies.TimeoutMixin.timeOut>}
197        seconds. Cleanly tears the connection down, attempting to notify the
198        peer if needed.
199
200        We override this method to add two extra bits of functionality:
201
202         - We want to log the timeout.
203         - We want to send a GOAWAY frame indicating that the connection is
204           being terminated, and whether it was clean or not. We have to do this
205           before the connection is torn down.
206        """
207        self._log.info("Timing out client {client}", client=self.transport.getPeer())
208
209        # Check whether there are open streams. If there are, we're going to
210        # want to use the error code PROTOCOL_ERROR. If there aren't, use
211        # NO_ERROR.
212        if self.conn.open_outbound_streams > 0 or self.conn.open_inbound_streams > 0:
213            error_code = h2.errors.ErrorCodes.PROTOCOL_ERROR
214        else:
215            error_code = h2.errors.ErrorCodes.NO_ERROR
216
217        self.conn.close_connection(error_code=error_code)
218        self.transport.write(self.conn.data_to_send())
219
220        # Don't let the client hold this connection open too long.
221        if self.abortTimeout is not None:
222            # We use self.callLater because that's what TimeoutMixin does, even
223            # though we have a perfectly good reactor sitting around. See
224            # https://twistedmatrix.com/trac/ticket/8488.
225            self._abortingCall = self.callLater(
226                self.abortTimeout, self.forceAbortClient
227            )
228
229        # We're done, throw the connection away.
230        self.transport.loseConnection()
231
232    def forceAbortClient(self):
233        """
234        Called if C{abortTimeout} seconds have passed since the timeout fired,
235        and the connection still hasn't gone away. This can really only happen
236        on extremely bad connections or when clients are maliciously attempting
237        to keep connections open.
238        """
239        self._log.info(
240            "Forcibly timing out client: {client}", client=self.transport.getPeer()
241        )
242        # We want to lose track of the _abortingCall so that no-one tries to
243        # cancel it.
244        self._abortingCall = None
245        self.transport.abortConnection()
246
247    def connectionLost(self, reason, _cancelTimeouts=True):
248        """
249        Called when the transport connection is lost.
250
251        Informs all outstanding response handlers that the connection
252        has been lost, and cleans up all internal state.
253
254        @param reason: See L{IProtocol.connectionLost}
255
256        @param _cancelTimeouts: Propagate the C{reason} to this
257            connection's streams but don't cancel any timers, so that
258            peers who never read the data we've written are eventually
259            timed out.
260        """
261        self._stillProducing = False
262        if _cancelTimeouts:
263            self.setTimeout(None)
264
265        for stream in self.streams.values():
266            stream.connectionLost(reason)
267
268        for streamID in list(self.streams.keys()):
269            self._requestDone(streamID)
270
271        # If we were going to force-close the transport, we don't have to now.
272        if _cancelTimeouts and self._abortingCall is not None:
273            self._abortingCall.cancel()
274            self._abortingCall = None
275
276    # Implementation of IPushProducer
277    #
278    # Here's how we handle IPushProducer. We have multiple outstanding
279    # H2Streams. Each of these exposes an IConsumer interface to the response
280    # handler that allows it to push data into the H2Stream. The H2Stream then
281    # writes the data into the H2Connection object.
282    #
283    # The H2Connection needs to manage these writes to account for:
284    #
285    # - flow control
286    # - priority
287    #
288    # We manage each of these in different ways.
289    #
290    # For flow control, we simply use the equivalent of the IPushProducer
291    # interface. We simply tell the H2Stream: "Hey, you can't send any data
292    # right now, sorry!". When that stream becomes unblocked, we free it up
293    # again. This allows the H2Stream to propagate this backpressure up the
294    # chain.
295    #
296    # For priority, we need to keep a backlog of data frames that we can send,
297    # and interleave them appropriately. This backlog is most sensibly kept in
298    # the H2Connection object itself. We keep one queue per stream, which is
299    # where the writes go, and then we have a loop that manages popping these
300    # streams off in priority order.
301    #
302    # Logically then, we go as follows:
303    #
304    # 1. Stream calls writeDataToStream(). This causes a DataFrame to be placed
305    #    on the queue for that stream. It also informs the priority
306    #    implementation that this stream is unblocked.
307    # 2. The _sendPrioritisedData() function spins in a tight loop. Each
308    #    iteration it asks the priority implementation which stream should send
309    #    next, and pops a data frame off that stream's queue. If, after sending
310    #    that frame, there is no data left on that stream's queue, the function
311    #    informs the priority implementation that the stream is blocked.
312    #
313    # If all streams are blocked, or if there are no outstanding streams, the
314    # _sendPrioritisedData function waits to be awoken when more data is ready
315    # to send.
316    #
317    # Note that all of this only applies to *data*. Headers and other control
318    # frames deliberately skip this processing as they are not subject to flow
319    # control or priority constraints. Instead, they are stored in their own buffer
320    # which is used primarily to detect excessive buffering.
321    def stopProducing(self):
322        """
323        Stop producing data.
324
325        This tells the L{H2Connection} that its consumer has died, so it must
326        stop producing data for good.
327        """
328        self.connectionLost(Failure(ConnectionLost("Producing stopped")))
329
330    def pauseProducing(self):
331        """
332        Pause producing data.
333
334        Tells the L{H2Connection} that it has produced too much data to process
335        for the time being, and to stop until resumeProducing() is called.
336        """
337        self._consumerBlocked = Deferred()
338        # Ensure pending control data (if any) are sent first.
339        self._consumerBlocked.addCallback(self._flushBufferedControlData)
340
341    def resumeProducing(self):
342        """
343        Resume producing data.
344
345        This tells the L{H2Connection} to re-add itself to the main loop and
346        produce more data for the consumer.
347        """
348        if self._consumerBlocked is not None:
349            d = self._consumerBlocked
350            self._consumerBlocked = None
351            d.callback(None)
352
353    def _sendPrioritisedData(self, *args):
354        """
355        The data sending loop. This function repeatedly calls itself, either
356        from L{Deferred}s or from
357        L{reactor.callLater<twisted.internet.interfaces.IReactorTime.callLater>}
358
359        This function sends data on streams according to the rules of HTTP/2
360        priority. It ensures that the data from each stream is interleved
361        according to the priority signalled by the client, making sure that the
362        connection is used with maximal efficiency.
363
364        This function will execute if data is available: if all data is
365        exhausted, the function will place a deferred onto the L{H2Connection}
366        object and wait until it is called to resume executing.
367        """
368        # If producing has stopped, we're done. Don't reschedule ourselves
369        if not self._stillProducing:
370            return
371
372        stream = None
373
374        while stream is None:
375            try:
376                stream = next(self.priority)
377            except priority.DeadlockError:
378                # All streams are currently blocked or not progressing. Wait
379                # until a new one becomes available.
380                assert self._sendingDeferred is None
381                self._sendingDeferred = Deferred()
382                self._sendingDeferred.addCallback(self._sendPrioritisedData)
383                return
384
385        # Wait behind the transport.
386        if self._consumerBlocked is not None:
387            self._consumerBlocked.addCallback(self._sendPrioritisedData)
388            return
389
390        self.resetTimeout()
391
392        remainingWindow = self.conn.local_flow_control_window(stream)
393        frameData = self._outboundStreamQueues[stream].popleft()
394        maxFrameSize = min(self.conn.max_outbound_frame_size, remainingWindow)
395
396        if frameData is _END_STREAM_SENTINEL:
397            # There's no error handling here even though this can throw
398            # ProtocolError because we really shouldn't encounter this problem.
399            # If we do, that's a nasty bug.
400            self.conn.end_stream(stream)
401            self.transport.write(self.conn.data_to_send())
402
403            # Clean up the stream
404            self._requestDone(stream)
405        else:
406            # Respect the max frame size.
407            if len(frameData) > maxFrameSize:
408                excessData = frameData[maxFrameSize:]
409                frameData = frameData[:maxFrameSize]
410                self._outboundStreamQueues[stream].appendleft(excessData)
411
412            # There's deliberately no error handling here, because this just
413            # absolutely should not happen.
414            # If for whatever reason the max frame length is zero and so we
415            # have no frame data to send, don't send any.
416            if frameData:
417                self.conn.send_data(stream, frameData)
418                self.transport.write(self.conn.data_to_send())
419
420            # If there's no data left, this stream is now blocked.
421            if not self._outboundStreamQueues[stream]:
422                self.priority.block(stream)
423
424            # Also, if the stream's flow control window is exhausted, tell it
425            # to stop.
426            if self.remainingOutboundWindow(stream) <= 0:
427                self.streams[stream].flowControlBlocked()
428
429        self._reactor.callLater(0, self._sendPrioritisedData)
430
431    # Internal functions.
432    def _requestReceived(self, event):
433        """
434        Internal handler for when a request has been received.
435
436        @param event: The Hyper-h2 event that encodes information about the
437            received request.
438        @type event: L{h2.events.RequestReceived}
439        """
440        stream = H2Stream(
441            event.stream_id,
442            self,
443            event.headers,
444            self.requestFactory,
445            self.site,
446            self.factory,
447        )
448        self.streams[event.stream_id] = stream
449        self._streamCleanupCallbacks[event.stream_id] = Deferred()
450        self._outboundStreamQueues[event.stream_id] = deque()
451
452        # Add the stream to the priority tree but immediately block it.
453        try:
454            self.priority.insert_stream(event.stream_id)
455        except priority.DuplicateStreamError:
456            # Stream already in the tree. This can happen if we received a
457            # PRIORITY frame before a HEADERS frame. Just move on: we set the
458            # stream up properly in _handlePriorityUpdate.
459            pass
460        else:
461            self.priority.block(event.stream_id)
462
463    def _requestDataReceived(self, event):
464        """
465        Internal handler for when a chunk of data is received for a given
466        request.
467
468        @param event: The Hyper-h2 event that encodes information about the
469            received data.
470        @type event: L{h2.events.DataReceived}
471        """
472        stream = self.streams[event.stream_id]
473        stream.receiveDataChunk(event.data, event.flow_controlled_length)
474
475    def _requestEnded(self, event):
476        """
477        Internal handler for when a request is complete, and we expect no
478        further data for that request.
479
480        @param event: The Hyper-h2 event that encodes information about the
481            completed stream.
482        @type event: L{h2.events.StreamEnded}
483        """
484        stream = self.streams[event.stream_id]
485        stream.requestComplete()
486
487    def _requestAborted(self, event):
488        """
489        Internal handler for when a request is aborted by a remote peer.
490
491        @param event: The Hyper-h2 event that encodes information about the
492            reset stream.
493        @type event: L{h2.events.StreamReset}
494        """
495        stream = self.streams[event.stream_id]
496        stream.connectionLost(
497            Failure(ConnectionLost("Stream reset with code %s" % event.error_code))
498        )
499        self._requestDone(event.stream_id)
500
501    def _handlePriorityUpdate(self, event):
502        """
503        Internal handler for when a stream priority is updated.
504
505        @param event: The Hyper-h2 event that encodes information about the
506            stream reprioritization.
507        @type event: L{h2.events.PriorityUpdated}
508        """
509        try:
510            self.priority.reprioritize(
511                stream_id=event.stream_id,
512                depends_on=event.depends_on or None,
513                weight=event.weight,
514                exclusive=event.exclusive,
515            )
516        except priority.MissingStreamError:
517            # A PRIORITY frame arrived before the HEADERS frame that would
518            # trigger us to insert the stream into the tree. That's fine: we
519            # can create the stream here and mark it as blocked.
520            self.priority.insert_stream(
521                stream_id=event.stream_id,
522                depends_on=event.depends_on or None,
523                weight=event.weight,
524                exclusive=event.exclusive,
525            )
526            self.priority.block(event.stream_id)
527
528    def writeHeaders(self, version, code, reason, headers, streamID):
529        """
530        Called by L{twisted.web.http.Request} objects to write a complete set
531        of HTTP headers to a stream.
532
533        @param version: The HTTP version in use. Unused in HTTP/2.
534        @type version: L{bytes}
535
536        @param code: The HTTP status code to write.
537        @type code: L{bytes}
538
539        @param reason: The HTTP reason phrase to write. Unused in HTTP/2.
540        @type reason: L{bytes}
541
542        @param headers: The headers to write to the stream.
543        @type headers: L{twisted.web.http_headers.Headers}
544
545        @param streamID: The ID of the stream to write the headers to.
546        @type streamID: L{int}
547        """
548        headers.insert(0, (b":status", code))
549
550        try:
551            self.conn.send_headers(streamID, headers)
552        except h2.exceptions.StreamClosedError:
553            # Stream was closed by the client at some point. We need to not
554            # explode here: just swallow the error. That's what write() does
555            # when a connection is lost, so that's what we do too.
556            return
557        else:
558            self._tryToWriteControlData()
559
560    def writeDataToStream(self, streamID, data):
561        """
562        May be called by L{H2Stream} objects to write response data to a given
563        stream. Writes a single data frame.
564
565        @param streamID: The ID of the stream to write the data to.
566        @type streamID: L{int}
567
568        @param data: The data chunk to write to the stream.
569        @type data: L{bytes}
570        """
571        self._outboundStreamQueues[streamID].append(data)
572
573        # There's obviously no point unblocking this stream and the sending
574        # loop if the data can't actually be sent, so confirm that there's
575        # some room to send data.
576        if self.conn.local_flow_control_window(streamID) > 0:
577            self.priority.unblock(streamID)
578            if self._sendingDeferred is not None:
579                d = self._sendingDeferred
580                self._sendingDeferred = None
581                d.callback(streamID)
582
583        if self.remainingOutboundWindow(streamID) <= 0:
584            self.streams[streamID].flowControlBlocked()
585
586    def endRequest(self, streamID):
587        """
588        Called by L{H2Stream} objects to signal completion of a response.
589
590        @param streamID: The ID of the stream to write the data to.
591        @type streamID: L{int}
592        """
593        self._outboundStreamQueues[streamID].append(_END_STREAM_SENTINEL)
594        self.priority.unblock(streamID)
595        if self._sendingDeferred is not None:
596            d = self._sendingDeferred
597            self._sendingDeferred = None
598            d.callback(streamID)
599
600    def abortRequest(self, streamID):
601        """
602        Called by L{H2Stream} objects to request early termination of a stream.
603        This emits a RstStream frame and then removes all stream state.
604
605        @param streamID: The ID of the stream to write the data to.
606        @type streamID: L{int}
607        """
608        self.conn.reset_stream(streamID)
609        stillActive = self._tryToWriteControlData()
610        if stillActive:
611            self._requestDone(streamID)
612
613    def _requestDone(self, streamID):
614        """
615        Called internally by the data sending loop to clean up state that was
616        being used for the stream. Called when the stream is complete.
617
618        @param streamID: The ID of the stream to clean up state for.
619        @type streamID: L{int}
620        """
621        del self._outboundStreamQueues[streamID]
622        self.priority.remove_stream(streamID)
623        del self.streams[streamID]
624        cleanupCallback = self._streamCleanupCallbacks.pop(streamID)
625        cleanupCallback.callback(streamID)
626
627    def remainingOutboundWindow(self, streamID):
628        """
629        Called to determine how much room is left in the send window for a
630        given stream. Allows us to handle blocking and unblocking producers.
631
632        @param streamID: The ID of the stream whose flow control window we'll
633            check.
634        @type streamID: L{int}
635
636        @return: The amount of room remaining in the send window for the given
637            stream, including the data queued to be sent.
638        @rtype: L{int}
639        """
640        # TODO: This involves a fair bit of looping and computation for
641        # something that is called a lot. Consider caching values somewhere.
642        windowSize = self.conn.local_flow_control_window(streamID)
643        sendQueue = self._outboundStreamQueues[streamID]
644        alreadyConsumed = sum(
645            len(chunk) for chunk in sendQueue if chunk is not _END_STREAM_SENTINEL
646        )
647
648        return windowSize - alreadyConsumed
649
650    def _handleWindowUpdate(self, event):
651        """
652        Manage flow control windows.
653
654        Streams that are blocked on flow control will register themselves with
655        the connection. This will fire deferreds that wake those streams up and
656        allow them to continue processing.
657
658        @param event: The Hyper-h2 event that encodes information about the
659            flow control window change.
660        @type event: L{h2.events.WindowUpdated}
661        """
662        streamID = event.stream_id
663
664        if streamID:
665            if not self._streamIsActive(streamID):
666                # We may have already cleaned up our stream state, making this
667                # a late WINDOW_UPDATE frame. That's fine: the update is
668                # unnecessary but benign. We'll ignore it.
669                return
670
671            # If we haven't got any data to send, don't unblock the stream. If
672            # we do, we'll eventually get an exception inside the
673            # _sendPrioritisedData loop some time later.
674            if self._outboundStreamQueues.get(streamID):
675                self.priority.unblock(streamID)
676            self.streams[streamID].windowUpdated()
677        else:
678            # Update strictly applies to all streams.
679            for stream in self.streams.values():
680                stream.windowUpdated()
681
682                # If we still have data to send for this stream, unblock it.
683                if self._outboundStreamQueues.get(stream.streamID):
684                    self.priority.unblock(stream.streamID)
685
686    def getPeer(self):
687        """
688        Get the remote address of this connection.
689
690        Treat this method with caution.  It is the unfortunate result of the
691        CGI and Jabber standards, but should not be considered reliable for
692        the usual host of reasons; port forwarding, proxying, firewalls, IP
693        masquerading, etc.
694
695        @return: An L{IAddress} provider.
696        """
697        return self.transport.getPeer()
698
699    def getHost(self):
700        """
701        Similar to getPeer, but returns an address describing this side of the
702        connection.
703
704        @return: An L{IAddress} provider.
705        """
706        return self.transport.getHost()
707
708    def openStreamWindow(self, streamID, increment):
709        """
710        Open the stream window by a given increment.
711
712        @param streamID: The ID of the stream whose window needs to be opened.
713        @type streamID: L{int}
714
715        @param increment: The amount by which the stream window must be
716        incremented.
717        @type increment: L{int}
718        """
719        self.conn.acknowledge_received_data(increment, streamID)
720        self._tryToWriteControlData()
721
722    def _isSecure(self):
723        """
724        Returns L{True} if this channel is using a secure transport.
725
726        @returns: L{True} if this channel is secure.
727        @rtype: L{bool}
728        """
729        # A channel is secure if its transport is ISSLTransport.
730        return ISSLTransport(self.transport, None) is not None
731
732    def _send100Continue(self, streamID):
733        """
734        Sends a 100 Continue response, used to signal to clients that further
735        processing will be performed.
736
737        @param streamID: The ID of the stream that needs the 100 Continue
738        response
739        @type streamID: L{int}
740        """
741        headers = [(b":status", b"100")]
742        self.conn.send_headers(headers=headers, stream_id=streamID)
743        self._tryToWriteControlData()
744
745    def _respondToBadRequestAndDisconnect(self, streamID):
746        """
747        This is a quick and dirty way of responding to bad requests.
748
749        As described by HTTP standard we should be patient and accept the
750        whole request from the client before sending a polite bad request
751        response, even in the case when clients send tons of data.
752
753        Unlike in the HTTP/1.1 case, this does not actually disconnect the
754        underlying transport: there's no need. This instead just sends a 400
755        response and terminates the stream.
756
757        @param streamID: The ID of the stream that needs the 100 Continue
758        response
759        @type streamID: L{int}
760        """
761        headers = [(b":status", b"400")]
762        self.conn.send_headers(headers=headers, stream_id=streamID, end_stream=True)
763        stillActive = self._tryToWriteControlData()
764        if stillActive:
765            stream = self.streams[streamID]
766            stream.connectionLost(Failure(ConnectionLost("Invalid request")))
767            self._requestDone(streamID)
768
769    def _streamIsActive(self, streamID):
770        """
771        Checks whether Twisted has still got state for a given stream and so
772        can process events for that stream.
773
774        @param streamID: The ID of the stream that needs processing.
775        @type streamID: L{int}
776
777        @return: Whether the stream still has state allocated.
778        @rtype: L{bool}
779        """
780        return streamID in self.streams
781
782    def _tryToWriteControlData(self):
783        """
784        Checks whether the connection is blocked on flow control and,
785        if it isn't, writes any buffered control data.
786
787        @return: L{True} if the connection is still active and
788            L{False} if it was aborted because too many bytes have
789            been written but not consumed by the other end.
790        """
791        bufferedBytes = self.conn.data_to_send()
792        if not bufferedBytes:
793            return True
794
795        if self._consumerBlocked is None and not self._bufferedControlFrames:
796            # The consumer isn't blocked, and we don't have any buffered frames:
797            # write this directly.
798            self.transport.write(bufferedBytes)
799            return True
800        else:
801            # Either the consumer is blocked or we have buffered frames. If the
802            # consumer is blocked, we'll write this when we unblock. If we have
803            # buffered frames, we have presumably been re-entered from
804            # transport.write, and so to avoid reordering issues we'll buffer anyway.
805            self._bufferedControlFrames.append(bufferedBytes)
806            self._bufferedControlFrameBytes += len(bufferedBytes)
807
808            if self._bufferedControlFrameBytes >= self._maxBufferedControlFrameBytes:
809                maxBuffCtrlFrameBytes = self._maxBufferedControlFrameBytes
810                self._log.error(
811                    "Maximum number of control frame bytes buffered: "
812                    "{bufferedControlFrameBytes} > = "
813                    "{maxBufferedControlFrameBytes}. "
814                    "Aborting connection to client: {client} ",
815                    bufferedControlFrameBytes=self._bufferedControlFrameBytes,
816                    maxBufferedControlFrameBytes=maxBuffCtrlFrameBytes,
817                    client=self.transport.getPeer(),
818                )
819                # We've exceeded a reasonable buffer size for max buffered
820                # control frames. This is a denial of service risk, so we're
821                # going to drop this connection.
822                self.transport.abortConnection()
823                self.connectionLost(Failure(ExcessiveBufferingError()))
824                return False
825            return True
826
827    def _flushBufferedControlData(self, *args):
828        """
829        Called when the connection is marked writable again after being marked unwritable.
830        Attempts to flush buffered control data if there is any.
831        """
832        # To respect backpressure here we send each write in order, paying attention to whether
833        # we got blocked
834        while self._consumerBlocked is None and self._bufferedControlFrames:
835            nextWrite = self._bufferedControlFrames.popleft()
836            self._bufferedControlFrameBytes -= len(nextWrite)
837            self.transport.write(nextWrite)
838
839
840@implementer(ITransport, IConsumer, IPushProducer)
841class H2Stream:
842    """
843    A class representing a single HTTP/2 stream.
844
845    This class works hand-in-hand with L{H2Connection}. It acts to provide an
846    implementation of L{ITransport}, L{IConsumer}, and L{IProducer} that work
847    for a single HTTP/2 connection, while tightly cleaving to the interface
848    provided by those interfaces. It does this by having a tight coupling to
849    L{H2Connection}, which allows associating many of the functions of
850    L{ITransport}, L{IConsumer}, and L{IProducer} to objects on a
851    stream-specific level.
852
853    @ivar streamID: The numerical stream ID that this object corresponds to.
854    @type streamID: L{int}
855
856    @ivar producing: Whether this stream is currently allowed to produce data
857        to its consumer.
858    @type producing: L{bool}
859
860    @ivar command: The HTTP verb used on the request.
861    @type command: L{unicode}
862
863    @ivar path: The HTTP path used on the request.
864    @type path: L{unicode}
865
866    @ivar producer: The object producing the response, if any.
867    @type producer: L{IProducer}
868
869    @ivar site: The L{twisted.web.server.Site} object this stream belongs to,
870        if any.
871    @type site: L{twisted.web.server.Site}
872
873    @ivar factory: The L{twisted.web.http.HTTPFactory} object that constructed
874        this stream's parent connection.
875    @type factory: L{twisted.web.http.HTTPFactory}
876
877    @ivar _producerProducing: Whether the producer stored in producer is
878        currently producing data.
879    @type _producerProducing: L{bool}
880
881    @ivar _inboundDataBuffer: Any data that has been received from the network
882        but has not yet been received by the consumer.
883    @type _inboundDataBuffer: A L{collections.deque} containing L{bytes}
884
885    @ivar _conn: A reference to the connection this stream belongs to.
886    @type _conn: L{H2Connection}
887
888    @ivar _request: A request object that this stream corresponds to.
889    @type _request: L{twisted.web.iweb.IRequest}
890
891    @ivar _buffer: A buffer containing data produced by the producer that could
892        not be sent on the network at this time.
893    @type _buffer: L{io.BytesIO}
894    """
895
896    # We need a transport property for t.w.h.Request, but HTTP/2 doesn't want
897    # to expose it. So we just set it to None.
898    transport = None
899
900    def __init__(self, streamID, connection, headers, requestFactory, site, factory):
901        """
902        Initialize this HTTP/2 stream.
903
904        @param streamID: The numerical stream ID that this object corresponds
905            to.
906        @type streamID: L{int}
907
908        @param connection: The HTTP/2 connection this stream belongs to.
909        @type connection: L{H2Connection}
910
911        @param headers: The HTTP/2 request headers.
912        @type headers: A L{list} of L{tuple}s of header name and header value,
913            both as L{bytes}.
914
915        @param requestFactory: A function that builds appropriate request
916            request objects.
917        @type requestFactory: A callable that returns a
918            L{twisted.web.iweb.IRequest}.
919
920        @param site: The L{twisted.web.server.Site} object this stream belongs
921            to, if any.
922        @type site: L{twisted.web.server.Site}
923
924        @param factory: The L{twisted.web.http.HTTPFactory} object that
925            constructed this stream's parent connection.
926        @type factory: L{twisted.web.http.HTTPFactory}
927        """
928
929        self.streamID = streamID
930        self.site = site
931        self.factory = factory
932        self.producing = True
933        self.command = None
934        self.path = None
935        self.producer = None
936        self._producerProducing = False
937        self._hasStreamingProducer = None
938        self._inboundDataBuffer = deque()
939        self._conn = connection
940        self._request = requestFactory(self, queued=False)
941        self._buffer = io.BytesIO()
942
943        self._convertHeaders(headers)
944
945    def _convertHeaders(self, headers):
946        """
947        This method converts the HTTP/2 header set into something that looks
948        like HTTP/1.1. In particular, it strips the 'special' headers and adds
949        a Host: header.
950
951        @param headers: The HTTP/2 header set.
952        @type headers: A L{list} of L{tuple}s of header name and header value,
953            both as L{bytes}.
954        """
955        gotLength = False
956
957        for header in headers:
958            if not header[0].startswith(b":"):
959                gotLength = _addHeaderToRequest(self._request, header) or gotLength
960            elif header[0] == b":method":
961                self.command = header[1]
962            elif header[0] == b":path":
963                self.path = header[1]
964            elif header[0] == b":authority":
965                # This is essentially the Host: header from HTTP/1.1
966                _addHeaderToRequest(self._request, (b"host", header[1]))
967
968        if not gotLength:
969            if self.command in (b"GET", b"HEAD"):
970                self._request.gotLength(0)
971            else:
972                self._request.gotLength(None)
973
974        self._request.parseCookies()
975        expectContinue = self._request.requestHeaders.getRawHeaders(b"expect")
976        if expectContinue and expectContinue[0].lower() == b"100-continue":
977            self._send100Continue()
978
979    # Methods called by the H2Connection
980    def receiveDataChunk(self, data, flowControlledLength):
981        """
982        Called when the connection has received a chunk of data from the
983        underlying transport. If the stream has been registered with a
984        consumer, and is currently able to push data, immediately passes it
985        through. Otherwise, buffers the chunk until we can start producing.
986
987        @param data: The chunk of data that was received.
988        @type data: L{bytes}
989
990        @param flowControlledLength: The total flow controlled length of this
991            chunk, which is used when we want to re-open the window. May be
992            different to C{len(data)}.
993        @type flowControlledLength: L{int}
994        """
995        if not self.producing:
996            # Buffer data.
997            self._inboundDataBuffer.append((data, flowControlledLength))
998        else:
999            self._request.handleContentChunk(data)
1000            self._conn.openStreamWindow(self.streamID, flowControlledLength)
1001
1002    def requestComplete(self):
1003        """
1004        Called by the L{H2Connection} when the all data for a request has been
1005        received. Currently, with the legacy L{twisted.web.http.Request}
1006        object, just calls requestReceived unless the producer wants us to be
1007        quiet.
1008        """
1009        if self.producing:
1010            self._request.requestReceived(self.command, self.path, b"HTTP/2")
1011        else:
1012            self._inboundDataBuffer.append((_END_STREAM_SENTINEL, None))
1013
1014    def connectionLost(self, reason):
1015        """
1016        Called by the L{H2Connection} when a connection is lost or a stream is
1017        reset.
1018
1019        @param reason: The reason the connection was lost.
1020        @type reason: L{str}
1021        """
1022        self._request.connectionLost(reason)
1023
1024    def windowUpdated(self):
1025        """
1026        Called by the L{H2Connection} when this stream's flow control window
1027        has been opened.
1028        """
1029        # If we don't have a producer, we have no-one to tell.
1030        if not self.producer:
1031            return
1032
1033        # If we're not blocked on flow control, we don't care.
1034        if self._producerProducing:
1035            return
1036
1037        # We check whether the stream's flow control window is actually above
1038        # 0, and then, if a producer is registered and we still have space in
1039        # the window, we unblock it.
1040        remainingWindow = self._conn.remainingOutboundWindow(self.streamID)
1041        if not remainingWindow > 0:
1042            return
1043
1044        # We have a producer and space in the window, so that producer can
1045        # start producing again!
1046        self._producerProducing = True
1047        self.producer.resumeProducing()
1048
1049    def flowControlBlocked(self):
1050        """
1051        Called by the L{H2Connection} when this stream's flow control window
1052        has been exhausted.
1053        """
1054        if not self.producer:
1055            return
1056
1057        if self._producerProducing:
1058            self.producer.pauseProducing()
1059            self._producerProducing = False
1060
1061    # Methods called by the consumer (usually an IRequest).
1062    def writeHeaders(self, version, code, reason, headers):
1063        """
1064        Called by the consumer to write headers to the stream.
1065
1066        @param version: The HTTP version.
1067        @type version: L{bytes}
1068
1069        @param code: The status code.
1070        @type code: L{int}
1071
1072        @param reason: The reason phrase. Ignored in HTTP/2.
1073        @type reason: L{bytes}
1074
1075        @param headers: The HTTP response headers.
1076        @type headers: Any iterable of two-tuples of L{bytes}, representing header
1077            names and header values.
1078        """
1079        self._conn.writeHeaders(version, code, reason, headers, self.streamID)
1080
1081    def requestDone(self, request):
1082        """
1083        Called by a consumer to clean up whatever permanent state is in use.
1084
1085        @param request: The request calling the method.
1086        @type request: L{twisted.web.iweb.IRequest}
1087        """
1088        self._conn.endRequest(self.streamID)
1089
1090    def _send100Continue(self):
1091        """
1092        Sends a 100 Continue response, used to signal to clients that further
1093        processing will be performed.
1094        """
1095        self._conn._send100Continue(self.streamID)
1096
1097    def _respondToBadRequestAndDisconnect(self):
1098        """
1099        This is a quick and dirty way of responding to bad requests.
1100
1101        As described by HTTP standard we should be patient and accept the
1102        whole request from the client before sending a polite bad request
1103        response, even in the case when clients send tons of data.
1104
1105        Unlike in the HTTP/1.1 case, this does not actually disconnect the
1106        underlying transport: there's no need. This instead just sends a 400
1107        response and terminates the stream.
1108        """
1109        self._conn._respondToBadRequestAndDisconnect(self.streamID)
1110
1111    # Implementation: ITransport
1112    def write(self, data):
1113        """
1114        Write a single chunk of data into a data frame.
1115
1116        @param data: The data chunk to send.
1117        @type data: L{bytes}
1118        """
1119        self._conn.writeDataToStream(self.streamID, data)
1120        return
1121
1122    def writeSequence(self, iovec):
1123        """
1124        Write a sequence of chunks of data into data frames.
1125
1126        @param iovec: A sequence of chunks to send.
1127        @type iovec: An iterable of L{bytes} chunks.
1128        """
1129        for chunk in iovec:
1130            self.write(chunk)
1131
1132    def loseConnection(self):
1133        """
1134        Close the connection after writing all pending data.
1135        """
1136        self._conn.endRequest(self.streamID)
1137
1138    def abortConnection(self):
1139        """
1140        Forcefully abort the connection by sending a RstStream frame.
1141        """
1142        self._conn.abortRequest(self.streamID)
1143
1144    def getPeer(self):
1145        """
1146        Get information about the peer.
1147        """
1148        return self._conn.getPeer()
1149
1150    def getHost(self):
1151        """
1152        Similar to getPeer, but for this side of the connection.
1153        """
1154        return self._conn.getHost()
1155
1156    def isSecure(self):
1157        """
1158        Returns L{True} if this channel is using a secure transport.
1159
1160        @returns: L{True} if this channel is secure.
1161        @rtype: L{bool}
1162        """
1163        return self._conn._isSecure()
1164
1165    # Implementation: IConsumer
1166    def registerProducer(self, producer, streaming):
1167        """
1168        Register to receive data from a producer.
1169
1170        This sets self to be a consumer for a producer.  When this object runs
1171        out of data (as when a send(2) call on a socket succeeds in moving the
1172        last data from a userspace buffer into a kernelspace buffer), it will
1173        ask the producer to resumeProducing().
1174
1175        For L{IPullProducer} providers, C{resumeProducing} will be called once
1176        each time data is required.
1177
1178        For L{IPushProducer} providers, C{pauseProducing} will be called
1179        whenever the write buffer fills up and C{resumeProducing} will only be
1180        called when it empties.
1181
1182        @param producer: The producer to register.
1183        @type producer: L{IProducer} provider
1184
1185        @param streaming: L{True} if C{producer} provides L{IPushProducer},
1186        L{False} if C{producer} provides L{IPullProducer}.
1187        @type streaming: L{bool}
1188
1189        @raise RuntimeError: If a producer is already registered.
1190
1191        @return: L{None}
1192        """
1193        if self.producer:
1194            raise ValueError(
1195                "registering producer %s before previous one (%s) was "
1196                "unregistered" % (producer, self.producer)
1197            )
1198
1199        if not streaming:
1200            self.hasStreamingProducer = False
1201            producer = _PullToPush(producer, self)
1202            producer.startStreaming()
1203        else:
1204            self.hasStreamingProducer = True
1205
1206        self.producer = producer
1207        self._producerProducing = True
1208
1209    def unregisterProducer(self):
1210        """
1211        @see: L{IConsumer.unregisterProducer}
1212        """
1213        # When the producer is unregistered, we're done.
1214        if self.producer is not None and not self.hasStreamingProducer:
1215            self.producer.stopStreaming()
1216
1217        self._producerProducing = False
1218        self.producer = None
1219        self.hasStreamingProducer = None
1220
1221    # Implementation: IPushProducer
1222    def stopProducing(self):
1223        """
1224        @see: L{IProducer.stopProducing}
1225        """
1226        self.producing = False
1227        self.abortConnection()
1228
1229    def pauseProducing(self):
1230        """
1231        @see: L{IPushProducer.pauseProducing}
1232        """
1233        self.producing = False
1234
1235    def resumeProducing(self):
1236        """
1237        @see: L{IPushProducer.resumeProducing}
1238        """
1239        self.producing = True
1240        consumedLength = 0
1241
1242        while self.producing and self._inboundDataBuffer:
1243            # Allow for pauseProducing to be called in response to a call to
1244            # resumeProducing.
1245            chunk, flowControlledLength = self._inboundDataBuffer.popleft()
1246
1247            if chunk is _END_STREAM_SENTINEL:
1248                self.requestComplete()
1249            else:
1250                consumedLength += flowControlledLength
1251                self._request.handleContentChunk(chunk)
1252
1253        self._conn.openStreamWindow(self.streamID, consumedLength)
1254
1255
1256def _addHeaderToRequest(request, header):
1257    """
1258    Add a header tuple to a request header object.
1259
1260    @param request: The request to add the header tuple to.
1261    @type request: L{twisted.web.http.Request}
1262
1263    @param header: The header tuple to add to the request.
1264    @type header: A L{tuple} with two elements, the header name and header
1265        value, both as L{bytes}.
1266
1267    @return: If the header being added was the C{Content-Length} header.
1268    @rtype: L{bool}
1269    """
1270    requestHeaders = request.requestHeaders
1271    name, value = header
1272    values = requestHeaders.getRawHeaders(name)
1273
1274    if values is not None:
1275        values.append(value)
1276    else:
1277        requestHeaders.setRawHeaders(name, [value])
1278
1279    if name == b"content-length":
1280        request.gotLength(int(value))
1281        return True
1282
1283    return False
1284