1# Copyright (c) Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Test HTTP/2 support.
6"""
7
8
9import itertools
10
11from zope.interface import directlyProvides, providedBy
12
13from twisted.internet import defer, error, reactor, task
14from twisted.internet.address import IPv4Address
15from twisted.internet.testing import MemoryReactorClock, StringTransport
16from twisted.python import failure
17from twisted.python.compat import iterbytes
18from twisted.test.test_internet import DummyProducer
19from twisted.trial import unittest
20from twisted.web import http
21from twisted.web.test.test_http import (
22    DelayedHTTPHandler,
23    DelayedHTTPHandlerProxy,
24    DummyHTTPHandler,
25    DummyHTTPHandlerProxy,
26    DummyPullProducerHandlerProxy,
27    _IDeprecatedHTTPChannelToRequestInterfaceProxy,
28    _makeRequestProxyFactory,
29)
30
31skipH2 = None
32
33try:
34    # These third-party imports are guaranteed to be present if HTTP/2 support
35    # is compiled in. We do not use them in the main code: only in the tests.
36    import h2  # type: ignore[import]
37    import h2.errors  # type: ignore[import]
38    import h2.exceptions  # type: ignore[import]
39    import hyperframe  # type: ignore[import]
40    import priority  # type: ignore[import]
41    from hpack.hpack import Decoder, Encoder  # type: ignore[import]
42
43    from twisted.web._http2 import H2Connection
44except ImportError:
45    skipH2 = "HTTP/2 support not enabled"
46
47
48# Define some helpers for the rest of these tests.
49class FrameFactory:
50    """
51    A class containing lots of helper methods and state to build frames. This
52    allows test cases to easily build correct HTTP/2 frames to feed to
53    hyper-h2.
54    """
55
56    def __init__(self):
57        self.encoder = Encoder()
58
59    def refreshEncoder(self):
60        self.encoder = Encoder()
61
62    def clientConnectionPreface(self):
63        return b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
64
65    def buildHeadersFrame(self, headers, flags=[], streamID=1, **priorityKwargs):
66        """
67        Builds a single valid headers frame out of the contained headers.
68        """
69        f = hyperframe.frame.HeadersFrame(streamID)
70        f.data = self.encoder.encode(headers)
71        f.flags.add("END_HEADERS")
72        for flag in flags:
73            f.flags.add(flag)
74
75        for k, v in priorityKwargs.items():
76            setattr(f, k, v)
77
78        return f
79
80    def buildDataFrame(self, data, flags=None, streamID=1):
81        """
82        Builds a single data frame out of a chunk of data.
83        """
84        flags = set(flags) if flags is not None else set()
85        f = hyperframe.frame.DataFrame(streamID)
86        f.data = data
87        f.flags = flags
88        return f
89
90    def buildSettingsFrame(self, settings, ack=False):
91        """
92        Builds a single settings frame.
93        """
94        f = hyperframe.frame.SettingsFrame(0)
95        if ack:
96            f.flags.add("ACK")
97
98        f.settings = settings
99        return f
100
101    def buildWindowUpdateFrame(self, streamID, increment):
102        """
103        Builds a single WindowUpdate frame.
104        """
105        f = hyperframe.frame.WindowUpdateFrame(streamID)
106        f.window_increment = increment
107        return f
108
109    def buildGoAwayFrame(self, lastStreamID, errorCode=0, additionalData=b""):
110        """
111        Builds a single GOAWAY frame.
112        """
113        f = hyperframe.frame.GoAwayFrame(0)
114        f.error_code = errorCode
115        f.last_stream_id = lastStreamID
116        f.additional_data = additionalData
117        return f
118
119    def buildRstStreamFrame(self, streamID, errorCode=0):
120        """
121        Builds a single RST_STREAM frame.
122        """
123        f = hyperframe.frame.RstStreamFrame(streamID)
124        f.error_code = errorCode
125        return f
126
127    def buildPriorityFrame(self, streamID, weight, dependsOn=0, exclusive=False):
128        """
129        Builds a single priority frame.
130        """
131        f = hyperframe.frame.PriorityFrame(streamID)
132        f.depends_on = dependsOn
133        f.stream_weight = weight
134        f.exclusive = exclusive
135        return f
136
137    def buildPushPromiseFrame(self, streamID, promisedStreamID, headers, flags=[]):
138        """
139        Builds a single Push Promise frame.
140        """
141        f = hyperframe.frame.PushPromiseFrame(streamID)
142        f.promised_stream_id = promisedStreamID
143        f.data = self.encoder.encode(headers)
144        f.flags = set(flags)
145        f.flags.add("END_HEADERS")
146        return f
147
148
149class FrameBuffer:
150    """
151    A test object that converts data received from Twisted's HTTP/2 stack and
152    turns it into a sequence of hyperframe frame objects.
153
154    This is primarily used to make it easier to write and debug tests: rather
155    than have to serialize the expected frames and then do byte-level
156    comparison (which can be unclear in debugging output), this object makes it
157    possible to work with the frames directly.
158
159    It also ensures that headers are properly decompressed.
160    """
161
162    def __init__(self):
163        self.decoder = Decoder()
164        self._data = b""
165
166    def receiveData(self, data):
167        self._data += data
168
169    def __iter__(self):
170        return self
171
172    def next(self):
173        if len(self._data) < 9:
174            raise StopIteration()
175
176        frame, length = hyperframe.frame.Frame.parse_frame_header(self._data[:9])
177        if len(self._data) < length + 9:
178            raise StopIteration()
179
180        frame.parse_body(memoryview(self._data[9 : 9 + length]))
181        self._data = self._data[9 + length :]
182
183        if isinstance(frame, hyperframe.frame.HeadersFrame):
184            frame.data = self.decoder.decode(frame.data, raw=True)
185
186        return frame
187
188    __next__ = next
189
190
191def buildRequestFrames(headers, data, frameFactory=None, streamID=1):
192    """
193    Provides a sequence of HTTP/2 frames that encode a single HTTP request.
194    This should be used when you want to control the serialization yourself,
195    e.g. because you want to interleave other frames with these. If that's not
196    necessary, prefer L{buildRequestBytes}.
197
198    @param headers: The HTTP/2 headers to send.
199    @type headers: L{list} of L{tuple} of L{bytes}
200
201    @param data: The HTTP data to send. Each list entry will be sent in its own
202    frame.
203    @type data: L{list} of L{bytes}
204
205    @param frameFactory: The L{FrameFactory} that will be used to construct the
206    frames.
207    @type frameFactory: L{FrameFactory}
208
209    @param streamID: The ID of the stream on which to send the request.
210    @type streamID: L{int}
211    """
212    if frameFactory is None:
213        frameFactory = FrameFactory()
214
215    frames = []
216    frames.append(frameFactory.buildHeadersFrame(headers=headers, streamID=streamID))
217    frames.extend(
218        frameFactory.buildDataFrame(chunk, streamID=streamID) for chunk in data
219    )
220    frames[-1].flags.add("END_STREAM")
221    return frames
222
223
224def buildRequestBytes(headers, data, frameFactory=None, streamID=1):
225    """
226    Provides the byte sequence for a collection of HTTP/2 frames representing
227    the provided request.
228
229    @param headers: The HTTP/2 headers to send.
230    @type headers: L{list} of L{tuple} of L{bytes}
231
232    @param data: The HTTP data to send. Each list entry will be sent in its own
233    frame.
234    @type data: L{list} of L{bytes}
235
236    @param frameFactory: The L{FrameFactory} that will be used to construct the
237    frames.
238    @type frameFactory: L{FrameFactory}
239
240    @param streamID: The ID of the stream on which to send the request.
241    @type streamID: L{int}
242    """
243    frames = buildRequestFrames(headers, data, frameFactory, streamID)
244    return b"".join(f.serialize() for f in frames)
245
246
247def framesFromBytes(data):
248    """
249    Given a sequence of bytes, decodes them into frames.
250
251    Note that this method should almost always be called only once, before
252    making some assertions. This is because decoding HTTP/2 frames is extremely
253    stateful, and this function doesn't preserve any of that state between
254    calls.
255
256    @param data: The serialized HTTP/2 frames.
257    @type data: L{bytes}
258
259    @returns: A list of HTTP/2 frames.
260    @rtype: L{list} of L{hyperframe.frame.Frame} subclasses.
261    """
262    buffer = FrameBuffer()
263    buffer.receiveData(data)
264    return list(buffer)
265
266
267class ChunkedHTTPHandler(http.Request):
268    """
269    A HTTP request object that writes chunks of data back to the network based
270    on the URL.
271
272    Must be called with a path /chunked/<num_chunks>
273    """
274
275    chunkData = b"hello world!"
276
277    def process(self):
278        chunks = int(self.uri.split(b"/")[-1])
279        self.setResponseCode(200)
280
281        for _ in range(chunks):
282            self.write(self.chunkData)
283
284        self.finish()
285
286
287ChunkedHTTPHandlerProxy = _makeRequestProxyFactory(ChunkedHTTPHandler)
288
289
290class ConsumerDummyHandler(http.Request):
291    """
292    This is a HTTP request handler that works with the C{IPushProducer}
293    implementation in the L{H2Stream} object. No current IRequest object does
294    that, but in principle future implementations could: that codepath should
295    therefore be tested.
296    """
297
298    def __init__(self, *args, **kwargs):
299        http.Request.__init__(self, *args, **kwargs)
300
301        # Production starts paused.
302        self.channel.pauseProducing()
303        self._requestReceived = False
304        self._data = None
305
306    def acceptData(self):
307        """
308        Start the data pipe.
309        """
310        self.channel.resumeProducing()
311
312    def requestReceived(self, *args, **kwargs):
313        self._requestReceived = True
314        return http.Request.requestReceived(self, *args, **kwargs)
315
316    def process(self):
317        self.setResponseCode(200)
318        self._data = self.content.read()
319        returnData = b"this is a response from a consumer dummy handler"
320        self.write(returnData)
321        self.finish()
322
323
324ConsumerDummyHandlerProxy = _makeRequestProxyFactory(ConsumerDummyHandler)
325
326
327class AbortingConsumerDummyHandler(ConsumerDummyHandler):
328    """
329    This is a HTTP request handler that works with the C{IPushProducer}
330    implementation in the L{H2Stream} object. The difference between this and
331    the ConsumerDummyHandler is that after resuming production it immediately
332    aborts it again.
333    """
334
335    def acceptData(self):
336        """
337        Start and then immediately stop the data pipe.
338        """
339        self.channel.resumeProducing()
340        self.channel.stopProducing()
341
342
343AbortingConsumerDummyHandlerProxy = _makeRequestProxyFactory(
344    AbortingConsumerDummyHandler
345)
346
347
348class DummyProducerHandler(http.Request):
349    """
350    An HTTP request handler that registers a dummy producer to serve the body.
351
352    The owner must call C{finish} to complete the response.
353    """
354
355    def process(self):
356        self.setResponseCode(200)
357        self.registerProducer(DummyProducer(), True)
358
359
360DummyProducerHandlerProxy = _makeRequestProxyFactory(DummyProducerHandler)
361
362
363class NotifyingRequestFactory:
364    """
365    A L{http.Request} factory that calls L{http.Request.notifyFinish} on all
366    L{http.Request} objects before it returns them, and squirrels the resulting
367    L{defer.Deferred} away on the class for later use. This is done as early
368    as possible to ensure that we always see the result.
369    """
370
371    def __init__(self, wrappedFactory):
372        self.results = []
373        self._wrappedFactory = wrappedFactory
374
375        # Add interfaces provided by the factory we are wrapping. We expect
376        # this only to be INonQueuedRequestFactory, but we don't want to
377        # hard-code that rule.
378        for interface in providedBy(self._wrappedFactory):
379            directlyProvides(self, interface)
380
381    def __call__(self, *args, **kwargs):
382        req = self._wrappedFactory(*args, **kwargs)
383        self.results.append(req.notifyFinish())
384        return _IDeprecatedHTTPChannelToRequestInterfaceProxy(req)
385
386
387NotifyingRequestFactoryProxy = _makeRequestProxyFactory(NotifyingRequestFactory)
388
389
390class HTTP2TestHelpers:
391    """
392    A superclass that contains no tests but provides test helpers for HTTP/2
393    tests.
394    """
395
396    if skipH2:
397        skip = skipH2
398
399    def assertAllStreamsBlocked(self, connection):
400        """
401        Confirm that all streams are blocked: that is, the priority tree
402        believes that none of the streams have data ready to send.
403        """
404        self.assertRaises(priority.DeadlockError, next, connection.priority)
405
406
407class HTTP2ServerTests(unittest.TestCase, HTTP2TestHelpers):
408    getRequestHeaders = [
409        (b":method", b"GET"),
410        (b":authority", b"localhost"),
411        (b":path", b"/"),
412        (b":scheme", b"https"),
413        (b"user-agent", b"twisted-test-code"),
414        (b"custom-header", b"1"),
415        (b"custom-header", b"2"),
416    ]
417
418    postRequestHeaders = [
419        (b":method", b"POST"),
420        (b":authority", b"localhost"),
421        (b":path", b"/post_endpoint"),
422        (b":scheme", b"https"),
423        (b"user-agent", b"twisted-test-code"),
424        (b"content-length", b"25"),
425    ]
426
427    postRequestData = [b"hello ", b"world, ", b"it's ", b"http/2!"]
428
429    getResponseHeaders = [
430        (b":status", b"200"),
431        (b"request", b"/"),
432        (b"command", b"GET"),
433        (b"version", b"HTTP/2"),
434        (b"content-length", b"13"),
435    ]
436
437    getResponseData = b"'''\nNone\n'''\n"
438
439    postResponseHeaders = [
440        (b":status", b"200"),
441        (b"request", b"/post_endpoint"),
442        (b"command", b"POST"),
443        (b"version", b"HTTP/2"),
444        (b"content-length", b"36"),
445    ]
446
447    postResponseData = b"'''\n25\nhello world, it's http/2!'''\n"
448
449    def connectAndReceive(self, connection, headers, body):
450        """
451        Takes a single L{H2Connection} object and connects it to a
452        L{StringTransport} using a brand new L{FrameFactory}.
453
454        @param connection: The L{H2Connection} object to connect.
455        @type connection: L{H2Connection}
456
457        @param headers: The headers to send on the first request.
458        @type headers: L{Iterable} of L{tuple} of C{(bytes, bytes)}
459
460        @param body: Chunks of body to send, if any.
461        @type body: L{Iterable} of L{bytes}
462
463        @return: A tuple of L{FrameFactory}, L{StringTransport}
464        """
465        frameFactory = FrameFactory()
466        transport = StringTransport()
467
468        requestBytes = frameFactory.clientConnectionPreface()
469        requestBytes += buildRequestBytes(headers, body, frameFactory)
470
471        connection.makeConnection(transport)
472        # One byte at a time, to stress the implementation.
473        for byte in iterbytes(requestBytes):
474            connection.dataReceived(byte)
475
476        return frameFactory, transport
477
478    def test_basicRequest(self):
479        """
480        Send request over a TCP connection and confirm that we get back the
481        expected data in the order and style we expect.
482        """
483        # This test is complex because it validates the data very closely: it
484        # specifically checks frame ordering and type.
485        connection = H2Connection()
486        connection.requestFactory = DummyHTTPHandlerProxy
487        _, transport = self.connectAndReceive(connection, self.getRequestHeaders, [])
488
489        def validate(streamID):
490            frames = framesFromBytes(transport.value())
491
492            self.assertEqual(len(frames), 4)
493            self.assertTrue(all(f.stream_id == 1 for f in frames[1:]))
494
495            self.assertTrue(isinstance(frames[1], hyperframe.frame.HeadersFrame))
496            self.assertTrue(isinstance(frames[2], hyperframe.frame.DataFrame))
497            self.assertTrue(isinstance(frames[3], hyperframe.frame.DataFrame))
498
499            self.assertEqual(dict(frames[1].data), dict(self.getResponseHeaders))
500            self.assertEqual(frames[2].data, self.getResponseData)
501            self.assertEqual(frames[3].data, b"")
502            self.assertTrue("END_STREAM" in frames[3].flags)
503
504        return connection._streamCleanupCallbacks[1].addCallback(validate)
505
506    def test_postRequest(self):
507        """
508        Send a POST request and confirm that the data is safely transferred.
509        """
510        connection = H2Connection()
511        connection.requestFactory = DummyHTTPHandlerProxy
512        _, transport = self.connectAndReceive(
513            connection, self.postRequestHeaders, self.postRequestData
514        )
515
516        def validate(streamID):
517            frames = framesFromBytes(transport.value())
518
519            # One Settings frame, one Headers frame and two Data frames.
520            self.assertEqual(len(frames), 4)
521            self.assertTrue(all(f.stream_id == 1 for f in frames[-3:]))
522
523            self.assertTrue(isinstance(frames[-3], hyperframe.frame.HeadersFrame))
524            self.assertTrue(isinstance(frames[-2], hyperframe.frame.DataFrame))
525            self.assertTrue(isinstance(frames[-1], hyperframe.frame.DataFrame))
526
527            self.assertEqual(dict(frames[-3].data), dict(self.postResponseHeaders))
528            self.assertEqual(frames[-2].data, self.postResponseData)
529            self.assertEqual(frames[-1].data, b"")
530            self.assertTrue("END_STREAM" in frames[-1].flags)
531
532        return connection._streamCleanupCallbacks[1].addCallback(validate)
533
534    def test_postRequestNoLength(self):
535        """
536        Send a POST request without length and confirm that the data is safely
537        transferred.
538        """
539        postResponseHeaders = [
540            (b":status", b"200"),
541            (b"request", b"/post_endpoint"),
542            (b"command", b"POST"),
543            (b"version", b"HTTP/2"),
544            (b"content-length", b"38"),
545        ]
546        postResponseData = b"'''\nNone\nhello world, it's http/2!'''\n"
547
548        # Strip the content-length header.
549        postRequestHeaders = [
550            (x, y) for x, y in self.postRequestHeaders if x != b"content-length"
551        ]
552
553        connection = H2Connection()
554        connection.requestFactory = DummyHTTPHandlerProxy
555        _, transport = self.connectAndReceive(
556            connection, postRequestHeaders, self.postRequestData
557        )
558
559        def validate(streamID):
560            frames = framesFromBytes(transport.value())
561
562            # One Settings frame, one Headers frame, and two Data frames
563            self.assertEqual(len(frames), 4)
564            self.assertTrue(all(f.stream_id == 1 for f in frames[-3:]))
565
566            self.assertTrue(isinstance(frames[-3], hyperframe.frame.HeadersFrame))
567            self.assertTrue(isinstance(frames[-2], hyperframe.frame.DataFrame))
568            self.assertTrue(isinstance(frames[-1], hyperframe.frame.DataFrame))
569
570            self.assertEqual(dict(frames[-3].data), dict(postResponseHeaders))
571            self.assertEqual(frames[-2].data, postResponseData)
572            self.assertEqual(frames[-1].data, b"")
573            self.assertTrue("END_STREAM" in frames[-1].flags)
574
575        return connection._streamCleanupCallbacks[1].addCallback(validate)
576
577    def test_interleavedRequests(self):
578        """
579        Many interleaved POST requests all get received and responded to
580        appropriately.
581        """
582        # Unfortunately this test is pretty complex.
583        REQUEST_COUNT = 40
584
585        f = FrameFactory()
586        b = StringTransport()
587        a = H2Connection()
588        a.requestFactory = DummyHTTPHandlerProxy
589
590        # Stream IDs are always odd numbers.
591        streamIDs = list(range(1, REQUEST_COUNT * 2, 2))
592        frames = [
593            buildRequestFrames(
594                self.postRequestHeaders, self.postRequestData, f, streamID
595            )
596            for streamID in streamIDs
597        ]
598
599        requestBytes = f.clientConnectionPreface()
600
601        # Interleave the frames. That is, send one frame from each stream at a
602        # time. This wacky line lets us do that.
603        frames = itertools.chain.from_iterable(zip(*frames))
604        requestBytes += b"".join(frame.serialize() for frame in frames)
605        a.makeConnection(b)
606        # one byte at a time, to stress the implementation.
607        for byte in iterbytes(requestBytes):
608            a.dataReceived(byte)
609
610        def validate(results):
611            frames = framesFromBytes(b.value())
612
613            # We expect 1 Settings frame for the connection, and then 3 frames
614            # *per stream* (1 Headers frame, 2 Data frames). This doesn't send
615            # enough data to trigger a window update.
616            self.assertEqual(len(frames), 1 + (3 * 40))
617
618            # Let's check the data is ok. We need the non-WindowUpdate frames
619            # for each stream.
620            for streamID in streamIDs:
621                streamFrames = [
622                    f
623                    for f in frames
624                    if f.stream_id == streamID
625                    and not isinstance(f, hyperframe.frame.WindowUpdateFrame)
626                ]
627
628                self.assertEqual(len(streamFrames), 3)
629
630                self.assertEqual(
631                    dict(streamFrames[0].data), dict(self.postResponseHeaders)
632                )
633                self.assertEqual(streamFrames[1].data, self.postResponseData)
634                self.assertEqual(streamFrames[2].data, b"")
635                self.assertTrue("END_STREAM" in streamFrames[2].flags)
636
637        return defer.DeferredList(list(a._streamCleanupCallbacks.values())).addCallback(
638            validate
639        )
640
641    def test_sendAccordingToPriority(self):
642        """
643        Data in responses is interleaved according to HTTP/2 priorities.
644        """
645        # We want to start three parallel GET requests that will each return
646        # four chunks of data. These chunks will be interleaved according to
647        # HTTP/2 priorities. Stream 1 will be set to weight 64, Stream 3 to
648        # weight 32, and Stream 5 to weight 16 but dependent on Stream 1.
649        # That will cause data frames for these streams to be emitted in this
650        # order: 1, 3, 1, 1, 3, 1, 1, 3, 5, 3, 5, 3, 5, 5, 5.
651        #
652        # The reason there are so many frames is because the implementation
653        # interleaves stream completion according to priority order as well,
654        # because it is sent on a Data frame.
655        #
656        # This doesn't fully test priority, but tests *almost* enough of it to
657        # be worthwhile.
658        f = FrameFactory()
659        b = StringTransport()
660        a = H2Connection()
661        a.requestFactory = ChunkedHTTPHandlerProxy
662        getRequestHeaders = self.getRequestHeaders
663        getRequestHeaders[2] = (":path", "/chunked/4")
664
665        frames = [
666            buildRequestFrames(getRequestHeaders, [], f, streamID)
667            for streamID in [1, 3, 5]
668        ]
669
670        # Set the priorities. The first two will use their HEADERS frame, the
671        # third will have a PRIORITY frame sent before the headers.
672        frames[0][0].flags.add("PRIORITY")
673        frames[0][0].stream_weight = 64
674
675        frames[1][0].flags.add("PRIORITY")
676        frames[1][0].stream_weight = 32
677
678        priorityFrame = f.buildPriorityFrame(
679            streamID=5,
680            weight=16,
681            dependsOn=1,
682            exclusive=True,
683        )
684        frames[2].insert(0, priorityFrame)
685
686        frames = itertools.chain.from_iterable(frames)
687        requestBytes = f.clientConnectionPreface()
688        requestBytes += b"".join(frame.serialize() for frame in frames)
689
690        a.makeConnection(b)
691        # one byte at a time, to stress the implementation.
692        for byte in iterbytes(requestBytes):
693            a.dataReceived(byte)
694
695        def validate(results):
696            frames = framesFromBytes(b.value())
697
698            # We expect 1 Settings frame for the connection, and then 6 frames
699            # per stream (1 Headers frame, 5 data frames), for a total of 19.
700            self.assertEqual(len(frames), 19)
701
702            streamIDs = [
703                f.stream_id for f in frames if isinstance(f, hyperframe.frame.DataFrame)
704            ]
705            expectedOrder = [1, 3, 1, 1, 3, 1, 1, 3, 5, 3, 5, 3, 5, 5, 5]
706            self.assertEqual(streamIDs, expectedOrder)
707
708        return defer.DeferredList(list(a._streamCleanupCallbacks.values())).addCallback(
709            validate
710        )
711
712    def test_protocolErrorTerminatesConnection(self):
713        """
714        A protocol error from the remote peer terminates the connection.
715        """
716        f = FrameFactory()
717        b = StringTransport()
718        a = H2Connection()
719        a.requestFactory = DummyHTTPHandlerProxy
720
721        # We're going to open a stream and then send a PUSH_PROMISE frame,
722        # which is forbidden.
723        requestBytes = f.clientConnectionPreface()
724        requestBytes += buildRequestBytes(self.getRequestHeaders, [], f)
725        requestBytes += f.buildPushPromiseFrame(
726            streamID=1,
727            promisedStreamID=2,
728            headers=self.getRequestHeaders,
729            flags=["END_HEADERS"],
730        ).serialize()
731
732        a.makeConnection(b)
733        # one byte at a time, to stress the implementation.
734        for byte in iterbytes(requestBytes):
735            a.dataReceived(byte)
736
737            # Check whether the transport got shut down: if it did, stop
738            # sending more data.
739            if b.disconnecting:
740                break
741
742        frames = framesFromBytes(b.value())
743
744        # The send loop never gets to terminate the stream, but *some* data
745        # does get sent. We get a Settings frame, a Headers frame, and then the
746        # GoAway frame.
747        self.assertEqual(len(frames), 3)
748        self.assertTrue(isinstance(frames[-1], hyperframe.frame.GoAwayFrame))
749        self.assertTrue(b.disconnecting)
750
751    def test_streamProducingData(self):
752        """
753        The H2Stream data implements IPushProducer, and can have its data
754        production controlled by the Request if the Request chooses to.
755        """
756        connection = H2Connection()
757        connection.requestFactory = ConsumerDummyHandlerProxy
758        _, transport = self.connectAndReceive(
759            connection, self.postRequestHeaders, self.postRequestData
760        )
761
762        # At this point no data should have been received by the request *or*
763        # the response. We need to dig the request out of the tree of objects.
764        request = connection.streams[1]._request.original
765        self.assertFalse(request._requestReceived)
766
767        # We should have only received the Settings frame. It's important that
768        # the WindowUpdate frames don't land before data is delivered to the
769        # Request.
770        frames = framesFromBytes(transport.value())
771        self.assertEqual(len(frames), 1)
772
773        # At this point, we can kick off the producing. This will force the
774        # H2Stream object to deliver the request data all at once, so check
775        # that it was delivered correctly.
776        request.acceptData()
777        self.assertTrue(request._requestReceived)
778        self.assertTrue(request._data, b"hello world, it's http/2!")
779
780        # *That* will have also caused the H2Connection object to emit almost
781        # all the data it needs. That'll be a Headers frame, as well as the
782        # original SETTINGS frame.
783        frames = framesFromBytes(transport.value())
784        self.assertEqual(len(frames), 2)
785
786        def validate(streamID):
787            # Confirm that the response is ok.
788            frames = framesFromBytes(transport.value())
789
790            # The only new frames here are the two Data frames.
791            self.assertEqual(len(frames), 4)
792            self.assertTrue("END_STREAM" in frames[-1].flags)
793
794        return connection._streamCleanupCallbacks[1].addCallback(validate)
795
796    def test_abortStreamProducingData(self):
797        """
798        The H2Stream data implements IPushProducer, and can have its data
799        production controlled by the Request if the Request chooses to.
800        When the production is stopped, that causes the stream connection to
801        be lost.
802        """
803        f = FrameFactory()
804        b = StringTransport()
805        a = H2Connection()
806        a.requestFactory = AbortingConsumerDummyHandlerProxy
807
808        # We're going to send in a POST request.
809        frames = buildRequestFrames(self.postRequestHeaders, self.postRequestData, f)
810        frames[-1].flags = set()  # Remove END_STREAM flag.
811        requestBytes = f.clientConnectionPreface()
812        requestBytes += b"".join(f.serialize() for f in frames)
813        a.makeConnection(b)
814        # one byte at a time, to stress the implementation.
815        for byte in iterbytes(requestBytes):
816            a.dataReceived(byte)
817
818        # At this point no data should have been received by the request *or*
819        # the response. We need to dig the request out of the tree of objects.
820        request = a.streams[1]._request.original
821        self.assertFalse(request._requestReceived)
822
823        # Save off the cleanup deferred now, it'll be removed when the
824        # RstStream frame is sent.
825        cleanupCallback = a._streamCleanupCallbacks[1]
826
827        # At this point, we can kick off the production and immediate abort.
828        request.acceptData()
829
830        # The stream will now have been aborted.
831        def validate(streamID):
832            # Confirm that the response is ok.
833            frames = framesFromBytes(b.value())
834
835            # We expect a Settings frame and a RstStream frame.
836            self.assertEqual(len(frames), 2)
837            self.assertTrue(isinstance(frames[-1], hyperframe.frame.RstStreamFrame))
838            self.assertEqual(frames[-1].stream_id, 1)
839
840        return cleanupCallback.addCallback(validate)
841
842    def test_terminatedRequest(self):
843        """
844        When a RstStream frame is received, the L{H2Connection} and L{H2Stream}
845        objects tear down the L{http.Request} and swallow all outstanding
846        writes.
847        """
848        # Here we want to use the DummyProducerHandler primarily for the side
849        # effect it has of not writing to the connection. That means we can
850        # delay some writes until *after* the RstStream frame is received.
851        connection = H2Connection()
852        connection.requestFactory = DummyProducerHandlerProxy
853        frameFactory, transport = self.connectAndReceive(
854            connection, self.getRequestHeaders, []
855        )
856
857        # Get the request object.
858        request = connection.streams[1]._request.original
859
860        # Send two writes in.
861        request.write(b"first chunk")
862        request.write(b"second chunk")
863
864        # Save off the cleanup deferred now, it'll be removed when the
865        # RstStream frame is received.
866        cleanupCallback = connection._streamCleanupCallbacks[1]
867
868        # Now fire the RstStream frame.
869        connection.dataReceived(
870            frameFactory.buildRstStreamFrame(1, errorCode=1).serialize()
871        )
872
873        # This should have cancelled the request.
874        self.assertTrue(request._disconnected)
875        self.assertTrue(request.channel is None)
876
877        # This should not raise an exception, the function will just return
878        # immediately, and do nothing
879        request.write(b"third chunk")
880
881        # Check that everything is fine.
882        # We expect that only the Settings and Headers frames will have been
883        # emitted. The two writes are lost because the delayed call never had
884        # another chance to execute before the RstStream frame got processed.
885        def validate(streamID):
886            frames = framesFromBytes(transport.value())
887
888            self.assertEqual(len(frames), 2)
889            self.assertEqual(frames[1].stream_id, 1)
890
891            self.assertTrue(isinstance(frames[1], hyperframe.frame.HeadersFrame))
892
893        return cleanupCallback.addCallback(validate)
894
895    def test_terminatedConnection(self):
896        """
897        When a GoAway frame is received, the L{H2Connection} and L{H2Stream}
898        objects tear down all outstanding L{http.Request} objects and stop all
899        writing.
900        """
901        # Here we want to use the DummyProducerHandler primarily for the side
902        # effect it has of not writing to the connection. That means we can
903        # delay some writes until *after* the GoAway frame is received.
904        connection = H2Connection()
905        connection.requestFactory = DummyProducerHandlerProxy
906        frameFactory, transport = self.connectAndReceive(
907            connection, self.getRequestHeaders, []
908        )
909
910        # Get the request object.
911        request = connection.streams[1]._request.original
912
913        # Send two writes in.
914        request.write(b"first chunk")
915        request.write(b"second chunk")
916
917        # Save off the cleanup deferred now, it'll be removed when the
918        # GoAway frame is received.
919        cleanupCallback = connection._streamCleanupCallbacks[1]
920
921        # Now fire the GoAway frame.
922        connection.dataReceived(
923            frameFactory.buildGoAwayFrame(lastStreamID=0).serialize()
924        )
925
926        # This should have cancelled the request.
927        self.assertTrue(request._disconnected)
928        self.assertTrue(request.channel is None)
929
930        # It should also have cancelled the sending loop.
931        self.assertFalse(connection._stillProducing)
932
933        # Check that everything is fine.
934        # We expect that only the Settings and Headers frames will have been
935        # emitted. The writes are lost because the callLater never had
936        # a chance to execute before the GoAway frame got processed.
937        def validate(streamID):
938            frames = framesFromBytes(transport.value())
939
940            self.assertEqual(len(frames), 2)
941            self.assertEqual(frames[1].stream_id, 1)
942
943            self.assertTrue(isinstance(frames[1], hyperframe.frame.HeadersFrame))
944
945        return cleanupCallback.addCallback(validate)
946
947    def test_respondWith100Continue(self):
948        """
949        Requests containing Expect: 100-continue cause provisional 100
950        responses to be emitted.
951        """
952        connection = H2Connection()
953        connection.requestFactory = DummyHTTPHandlerProxy
954
955        # Add Expect: 100-continue for this request.
956        headers = self.getRequestHeaders + [(b"expect", b"100-continue")]
957
958        _, transport = self.connectAndReceive(connection, headers, [])
959
960        # We expect 5 frames now: Settings, two Headers frames, and two Data
961        # frames. We're only really interested in validating the first Headers
962        # frame which contains the 100.
963        def validate(streamID):
964            frames = framesFromBytes(transport.value())
965
966            self.assertEqual(len(frames), 5)
967            self.assertTrue(all(f.stream_id == 1 for f in frames[1:]))
968
969            self.assertTrue(isinstance(frames[1], hyperframe.frame.HeadersFrame))
970            self.assertEqual(frames[1].data, [(b":status", b"100")])
971            self.assertTrue("END_STREAM" in frames[-1].flags)
972
973        return connection._streamCleanupCallbacks[1].addCallback(validate)
974
975    def test_respondWith400(self):
976        """
977        Triggering the call to L{H2Stream._respondToBadRequestAndDisconnect}
978        leads to a 400 error being sent automatically and the stream being torn
979        down.
980        """
981        # The only "natural" way to trigger this in the current codebase is to
982        # send a multipart/form-data request that the cgi module doesn't like.
983        # That's absurdly hard, so instead we'll just call it ourselves. For
984        # this reason we use the DummyProducerHandler, which doesn't write the
985        # headers straight away.
986        connection = H2Connection()
987        connection.requestFactory = DummyProducerHandlerProxy
988        _, transport = self.connectAndReceive(connection, self.getRequestHeaders, [])
989
990        # Grab the request and the completion callback.
991        stream = connection.streams[1]
992        request = stream._request.original
993        cleanupCallback = connection._streamCleanupCallbacks[1]
994
995        # Abort the stream.
996        stream._respondToBadRequestAndDisconnect()
997
998        # This should have cancelled the request.
999        self.assertTrue(request._disconnected)
1000        self.assertTrue(request.channel is None)
1001
1002        # We expect 2 frames Settings and the 400 Headers.
1003        def validate(streamID):
1004            frames = framesFromBytes(transport.value())
1005
1006            self.assertEqual(len(frames), 2)
1007
1008            self.assertTrue(isinstance(frames[1], hyperframe.frame.HeadersFrame))
1009            self.assertEqual(frames[1].data, [(b":status", b"400")])
1010            self.assertTrue("END_STREAM" in frames[-1].flags)
1011
1012        return cleanupCallback.addCallback(validate)
1013
1014    def test_loseH2StreamConnection(self):
1015        """
1016        Calling L{Request.loseConnection} causes all data that has previously
1017        been sent to be flushed, and then the stream cleanly closed.
1018        """
1019        # Here we again want to use the DummyProducerHandler because it doesn't
1020        # close the connection on its own.
1021        connection = H2Connection()
1022        connection.requestFactory = DummyProducerHandlerProxy
1023        _, transport = self.connectAndReceive(connection, self.getRequestHeaders, [])
1024
1025        # Grab the request.
1026        stream = connection.streams[1]
1027        request = stream._request.original
1028
1029        # Send in some writes.
1030        dataChunks = [b"hello", b"world", b"here", b"are", b"some", b"writes"]
1031        for chunk in dataChunks:
1032            request.write(chunk)
1033
1034        # Now lose the connection.
1035        request.loseConnection()
1036
1037        # Check that the data was all written out correctly and that the stream
1038        # state is cleaned up.
1039        def validate(streamID):
1040            frames = framesFromBytes(transport.value())
1041
1042            # Settings, Headers, 7 Data frames.
1043            self.assertEqual(len(frames), 9)
1044            self.assertTrue(all(f.stream_id == 1 for f in frames[1:]))
1045
1046            self.assertTrue(isinstance(frames[1], hyperframe.frame.HeadersFrame))
1047            self.assertTrue("END_STREAM" in frames[-1].flags)
1048
1049            receivedDataChunks = [
1050                f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame)
1051            ]
1052            self.assertEqual(
1053                receivedDataChunks,
1054                dataChunks + [b""],
1055            )
1056
1057        return connection._streamCleanupCallbacks[1].addCallback(validate)
1058
1059    def test_cannotRegisterTwoProducers(self):
1060        """
1061        The L{H2Stream} object forbids registering two producers.
1062        """
1063        connection = H2Connection()
1064        connection.requestFactory = DummyProducerHandlerProxy
1065        self.connectAndReceive(connection, self.getRequestHeaders, [])
1066
1067        # Grab the request.
1068        stream = connection.streams[1]
1069        request = stream._request.original
1070
1071        self.assertRaises(ValueError, stream.registerProducer, request, True)
1072
1073    def test_handlesPullProducer(self):
1074        """
1075        L{Request} objects that have registered pull producers get blocked and
1076        unblocked according to HTTP/2 flow control.
1077        """
1078        connection = H2Connection()
1079        connection.requestFactory = DummyPullProducerHandlerProxy
1080        _, transport = self.connectAndReceive(connection, self.getRequestHeaders, [])
1081
1082        # Get the producer completion deferred and ensure we call
1083        # request.finish.
1084        stream = connection.streams[1]
1085        request = stream._request.original
1086        producerComplete = request._actualProducer.result
1087        producerComplete.addCallback(lambda x: request.finish())
1088
1089        # Check that the sending loop sends all the appropriate data.
1090        def validate(streamID):
1091            frames = framesFromBytes(transport.value())
1092
1093            # Check that the stream is correctly terminated.
1094            self.assertTrue("END_STREAM" in frames[-1].flags)
1095
1096            # Grab the data from the frames.
1097            dataChunks = [
1098                f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame)
1099            ]
1100            self.assertEqual(
1101                dataChunks,
1102                [b"0", b"1", b"2", b"3", b"4", b"5", b"6", b"7", b"8", b"9", b""],
1103            )
1104
1105        return connection._streamCleanupCallbacks[1].addCallback(validate)
1106
1107    def test_isSecureWorksProperly(self):
1108        """
1109        L{Request} objects can correctly ask isSecure on HTTP/2.
1110        """
1111        connection = H2Connection()
1112        connection.requestFactory = DelayedHTTPHandlerProxy
1113        self.connectAndReceive(connection, self.getRequestHeaders, [])
1114
1115        request = connection.streams[1]._request.original
1116        self.assertFalse(request.isSecure())
1117        connection.streams[1].abortConnection()
1118
1119    def test_lateCompletionWorks(self):
1120        """
1121        L{H2Connection} correctly unblocks when a stream is ended.
1122        """
1123        connection = H2Connection()
1124        connection.requestFactory = DelayedHTTPHandlerProxy
1125        _, transport = self.connectAndReceive(connection, self.getRequestHeaders, [])
1126
1127        # Delay a call to end request, forcing the connection to block because
1128        # it has no data to send.
1129        request = connection.streams[1]._request.original
1130        reactor.callLater(0.01, request.finish)
1131
1132        def validateComplete(*args):
1133            frames = framesFromBytes(transport.value())
1134
1135            # Check that the stream is correctly terminated.
1136            self.assertEqual(len(frames), 3)
1137            self.assertTrue("END_STREAM" in frames[-1].flags)
1138
1139        return connection._streamCleanupCallbacks[1].addCallback(validateComplete)
1140
1141    def test_writeSequenceForChannels(self):
1142        """
1143        L{H2Stream} objects can send a series of frames via C{writeSequence}.
1144        """
1145        connection = H2Connection()
1146        connection.requestFactory = DelayedHTTPHandlerProxy
1147        _, transport = self.connectAndReceive(connection, self.getRequestHeaders, [])
1148
1149        stream = connection.streams[1]
1150        request = stream._request.original
1151
1152        request.setResponseCode(200)
1153        stream.writeSequence([b"Hello", b",", b"world!"])
1154        request.finish()
1155
1156        completionDeferred = connection._streamCleanupCallbacks[1]
1157
1158        def validate(streamID):
1159            frames = framesFromBytes(transport.value())
1160
1161            # Check that the stream is correctly terminated.
1162            self.assertTrue("END_STREAM" in frames[-1].flags)
1163
1164            # Grab the data from the frames.
1165            dataChunks = [
1166                f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame)
1167            ]
1168            self.assertEqual(dataChunks, [b"Hello", b",", b"world!", b""])
1169
1170        return completionDeferred.addCallback(validate)
1171
1172    def test_delayWrites(self):
1173        """
1174        Delaying writes from L{Request} causes the L{H2Connection} to block on
1175        sending until data is available. However, data is *not* sent if there's
1176        no room in the flow control window.
1177        """
1178        # Here we again want to use the DummyProducerHandler because it doesn't
1179        # close the connection on its own.
1180        f = FrameFactory()
1181        b = StringTransport()
1182        a = H2Connection()
1183        a.requestFactory = DelayedHTTPHandlerProxy
1184
1185        requestBytes = f.clientConnectionPreface()
1186        requestBytes += f.buildSettingsFrame(
1187            {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5}
1188        ).serialize()
1189        requestBytes += buildRequestBytes(self.getRequestHeaders, [], f)
1190        a.makeConnection(b)
1191        # one byte at a time, to stress the implementation.
1192        for byte in iterbytes(requestBytes):
1193            a.dataReceived(byte)
1194
1195        # Grab the request.
1196        stream = a.streams[1]
1197        request = stream._request.original
1198
1199        # Write the first 5 bytes.
1200        request.write(b"fiver")
1201        dataChunks = [b"here", b"are", b"some", b"writes"]
1202
1203        def write_chunks():
1204            # Send in some writes.
1205            for chunk in dataChunks:
1206                request.write(chunk)
1207            request.finish()
1208
1209        d = task.deferLater(reactor, 0.01, write_chunks)
1210        d.addCallback(
1211            lambda *args: a.dataReceived(
1212                f.buildWindowUpdateFrame(streamID=1, increment=50).serialize()
1213            )
1214        )
1215
1216        # Check that the data was all written out correctly and that the stream
1217        # state is cleaned up.
1218        def validate(streamID):
1219            frames = framesFromBytes(b.value())
1220
1221            # 2 Settings, Headers, 7 Data frames.
1222            self.assertEqual(len(frames), 9)
1223            self.assertTrue(all(f.stream_id == 1 for f in frames[2:]))
1224
1225            self.assertTrue(isinstance(frames[2], hyperframe.frame.HeadersFrame))
1226            self.assertTrue("END_STREAM" in frames[-1].flags)
1227
1228            receivedDataChunks = [
1229                f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame)
1230            ]
1231            self.assertEqual(
1232                receivedDataChunks,
1233                [b"fiver"] + dataChunks + [b""],
1234            )
1235
1236        return a._streamCleanupCallbacks[1].addCallback(validate)
1237
1238    def test_resetAfterBody(self):
1239        """
1240        A client that immediately resets after sending the body causes Twisted
1241        to send no response.
1242        """
1243        frameFactory = FrameFactory()
1244        transport = StringTransport()
1245        a = H2Connection()
1246        a.requestFactory = DummyHTTPHandlerProxy
1247
1248        requestBytes = frameFactory.clientConnectionPreface()
1249        requestBytes += buildRequestBytes(
1250            headers=self.getRequestHeaders, data=[], frameFactory=frameFactory
1251        )
1252        requestBytes += frameFactory.buildRstStreamFrame(streamID=1).serialize()
1253        a.makeConnection(transport)
1254        a.dataReceived(requestBytes)
1255
1256        frames = framesFromBytes(transport.value())
1257
1258        self.assertEqual(len(frames), 1)
1259        self.assertNotIn(1, a._streamCleanupCallbacks)
1260
1261    def test_RequestRequiringFactorySiteInConstructor(self):
1262        """
1263        A custom L{Request} subclass that requires the site and factory in the
1264        constructor is able to get them.
1265        """
1266        d = defer.Deferred()
1267
1268        class SuperRequest(DummyHTTPHandler):
1269            def __init__(self, *args, **kwargs):
1270                DummyHTTPHandler.__init__(self, *args, **kwargs)
1271                d.callback((self.channel.site, self.channel.factory))
1272
1273        connection = H2Connection()
1274        httpFactory = http.HTTPFactory()
1275        connection.requestFactory = _makeRequestProxyFactory(SuperRequest)
1276
1277        # Create some sentinels to look for.
1278        connection.factory = httpFactory
1279        connection.site = object()
1280
1281        self.connectAndReceive(connection, self.getRequestHeaders, [])
1282
1283        def validateFactoryAndSite(args):
1284            site, factory = args
1285            self.assertIs(site, connection.site)
1286            self.assertIs(factory, connection.factory)
1287
1288        d.addCallback(validateFactoryAndSite)
1289
1290        # We need to wait for the stream cleanup callback to drain the
1291        # response.
1292        cleanupCallback = connection._streamCleanupCallbacks[1]
1293        return defer.gatherResults([d, cleanupCallback])
1294
1295    def test_notifyOnCompleteRequest(self):
1296        """
1297        A request sent to a HTTP/2 connection fires the
1298        L{http.Request.notifyFinish} callback with a L{None} value.
1299        """
1300        connection = H2Connection()
1301        connection.requestFactory = NotifyingRequestFactory(DummyHTTPHandler)
1302        _, transport = self.connectAndReceive(connection, self.getRequestHeaders, [])
1303
1304        deferreds = connection.requestFactory.results
1305        self.assertEqual(len(deferreds), 1)
1306
1307        def validate(result):
1308            self.assertIsNone(result)
1309
1310        d = deferreds[0]
1311        d.addCallback(validate)
1312
1313        # We need to wait for the stream cleanup callback to drain the
1314        # response.
1315        cleanupCallback = connection._streamCleanupCallbacks[1]
1316        return defer.gatherResults([d, cleanupCallback])
1317
1318    def test_notifyOnResetStream(self):
1319        """
1320        A HTTP/2 reset stream fires the L{http.Request.notifyFinish} deferred
1321        with L{ConnectionLost}.
1322        """
1323        connection = H2Connection()
1324        connection.requestFactory = NotifyingRequestFactory(DelayedHTTPHandler)
1325        frameFactory, transport = self.connectAndReceive(
1326            connection, self.getRequestHeaders, []
1327        )
1328
1329        deferreds = connection.requestFactory.results
1330        self.assertEqual(len(deferreds), 1)
1331
1332        # We need this to errback with a Failure indicating the RSTSTREAM
1333        # frame.
1334        def callback(ign):
1335            self.fail("Didn't errback, called back instead")
1336
1337        def errback(reason):
1338            self.assertIsInstance(reason, failure.Failure)
1339            self.assertIs(reason.type, error.ConnectionLost)
1340            return None  # Trap the error
1341
1342        d = deferreds[0]
1343        d.addCallbacks(callback, errback)
1344
1345        # Now send the RSTSTREAM frame.
1346        invalidData = frameFactory.buildRstStreamFrame(streamID=1).serialize()
1347        connection.dataReceived(invalidData)
1348
1349        return d
1350
1351    def test_failWithProtocolError(self):
1352        """
1353        A HTTP/2 protocol error triggers the L{http.Request.notifyFinish}
1354        deferred for all outstanding requests with a Failure that contains the
1355        underlying exception.
1356        """
1357        # We need to set up two requests concurrently so that we can validate
1358        # that these all fail. connectAndReceive will set up one: we will need
1359        # to manually send the rest.
1360        connection = H2Connection()
1361        connection.requestFactory = NotifyingRequestFactory(DelayedHTTPHandler)
1362        frameFactory, transport = self.connectAndReceive(
1363            connection, self.getRequestHeaders, []
1364        )
1365
1366        secondRequest = buildRequestBytes(
1367            self.getRequestHeaders, [], frameFactory=frameFactory, streamID=3
1368        )
1369        connection.dataReceived(secondRequest)
1370
1371        # Now we want to grab the deferreds from the notifying factory.
1372        deferreds = connection.requestFactory.results
1373        self.assertEqual(len(deferreds), 2)
1374
1375        # We need these to errback with a Failure representing the
1376        # ProtocolError.
1377        def callback(ign):
1378            self.fail("Didn't errback, called back instead")
1379
1380        def errback(reason):
1381            self.assertIsInstance(reason, failure.Failure)
1382            self.assertIsInstance(reason.value, h2.exceptions.ProtocolError)
1383            return None  # Trap the error
1384
1385        for d in deferreds:
1386            d.addCallbacks(callback, errback)
1387
1388        # Now trigger the protocol error. The easiest protocol error to trigger
1389        # is to send a data frame for a non-existent stream.
1390        invalidData = frameFactory.buildDataFrame(data=b"yo", streamID=0xF0).serialize()
1391        connection.dataReceived(invalidData)
1392
1393        return defer.gatherResults(deferreds)
1394
1395    def test_failOnGoaway(self):
1396        """
1397        A HTTP/2 GoAway triggers the L{http.Request.notifyFinish}
1398        deferred for all outstanding requests with a Failure that contains a
1399        RemoteGoAway error.
1400        """
1401        # We need to set up two requests concurrently so that we can validate
1402        # that these all fail. connectAndReceive will set up one: we will need
1403        # to manually send the rest.
1404        connection = H2Connection()
1405        connection.requestFactory = NotifyingRequestFactory(DelayedHTTPHandler)
1406        frameFactory, transport = self.connectAndReceive(
1407            connection, self.getRequestHeaders, []
1408        )
1409
1410        secondRequest = buildRequestBytes(
1411            self.getRequestHeaders, [], frameFactory=frameFactory, streamID=3
1412        )
1413        connection.dataReceived(secondRequest)
1414
1415        # Now we want to grab the deferreds from the notifying factory.
1416        deferreds = connection.requestFactory.results
1417        self.assertEqual(len(deferreds), 2)
1418
1419        # We need these to errback with a Failure indicating the GOAWAY frame.
1420        def callback(ign):
1421            self.fail("Didn't errback, called back instead")
1422
1423        def errback(reason):
1424            self.assertIsInstance(reason, failure.Failure)
1425            self.assertIs(reason.type, error.ConnectionLost)
1426            return None  # Trap the error
1427
1428        for d in deferreds:
1429            d.addCallbacks(callback, errback)
1430
1431        # Now send the GoAway frame.
1432        invalidData = frameFactory.buildGoAwayFrame(lastStreamID=3).serialize()
1433        connection.dataReceived(invalidData)
1434
1435        return defer.gatherResults(deferreds)
1436
1437    def test_failOnStopProducing(self):
1438        """
1439        The transport telling the HTTP/2 connection to stop producing will
1440        fire all L{http.Request.notifyFinish} errbacks with L{error.}
1441        """
1442        # We need to set up two requests concurrently so that we can validate
1443        # that these all fail. connectAndReceive will set up one: we will need
1444        # to manually send the rest.
1445        connection = H2Connection()
1446        connection.requestFactory = NotifyingRequestFactory(DelayedHTTPHandler)
1447        frameFactory, transport = self.connectAndReceive(
1448            connection, self.getRequestHeaders, []
1449        )
1450
1451        secondRequest = buildRequestBytes(
1452            self.getRequestHeaders, [], frameFactory=frameFactory, streamID=3
1453        )
1454        connection.dataReceived(secondRequest)
1455
1456        # Now we want to grab the deferreds from the notifying factory.
1457        deferreds = connection.requestFactory.results
1458        self.assertEqual(len(deferreds), 2)
1459
1460        # We need these to errback with a Failure indicating the consumer
1461        # aborted our data production.
1462        def callback(ign):
1463            self.fail("Didn't errback, called back instead")
1464
1465        def errback(reason):
1466            self.assertIsInstance(reason, failure.Failure)
1467            self.assertIs(reason.type, error.ConnectionLost)
1468            return None  # Trap the error
1469
1470        for d in deferreds:
1471            d.addCallbacks(callback, errback)
1472
1473        # Now call stopProducing.
1474        connection.stopProducing()
1475
1476        return defer.gatherResults(deferreds)
1477
1478    def test_notifyOnFast400(self):
1479        """
1480        A HTTP/2 stream that has had _respondToBadRequestAndDisconnect called
1481        on it from a request handler calls the L{http.Request.notifyFinish}
1482        errback with L{ConnectionLost}.
1483        """
1484        connection = H2Connection()
1485        connection.requestFactory = NotifyingRequestFactory(DelayedHTTPHandler)
1486        frameFactory, transport = self.connectAndReceive(
1487            connection, self.getRequestHeaders, []
1488        )
1489
1490        deferreds = connection.requestFactory.results
1491        self.assertEqual(len(deferreds), 1)
1492
1493        # We need this to errback with a Failure indicating the loss of the
1494        # connection.
1495        def callback(ign):
1496            self.fail("Didn't errback, called back instead")
1497
1498        def errback(reason):
1499            self.assertIsInstance(reason, failure.Failure)
1500            self.assertIs(reason.type, error.ConnectionLost)
1501            return None  # Trap the error
1502
1503        d = deferreds[0]
1504        d.addCallbacks(callback, errback)
1505
1506        # Abort the stream. The only "natural" way to trigger this in the
1507        # current codebase is to send a multipart/form-data request that the
1508        # cgi module doesn't like.
1509        # That's absurdly hard, so instead we'll just call it ourselves. For
1510        # this reason we use the DummyProducerHandler, which doesn't write the
1511        # headers straight away.
1512        stream = connection.streams[1]
1513        stream._respondToBadRequestAndDisconnect()
1514
1515        return d
1516
1517    def test_fast400WithCircuitBreaker(self):
1518        """
1519        A HTTP/2 stream that has had _respondToBadRequestAndDisconnect
1520        called on it does not write control frame data if its
1521        transport is paused and its control frame limit has been
1522        reached.
1523        """
1524        # Set the connection up.
1525        memoryReactor = MemoryReactorClock()
1526        connection = H2Connection(memoryReactor)
1527        connection.callLater = memoryReactor.callLater
1528        # Use the DelayedHTTPHandler to prevent the connection from
1529        # writing any response bytes after receiving a request that
1530        # establishes the stream.
1531        connection.requestFactory = DelayedHTTPHandler
1532
1533        streamID = 1
1534
1535        frameFactory = FrameFactory()
1536        transport = StringTransport()
1537
1538        # Establish the connection
1539        clientConnectionPreface = frameFactory.clientConnectionPreface()
1540        connection.makeConnection(transport)
1541        connection.dataReceived(clientConnectionPreface)
1542        # Establish the stream.
1543        connection.dataReceived(
1544            buildRequestBytes(
1545                self.getRequestHeaders, [], frameFactory, streamID=streamID
1546            )
1547        )
1548
1549        # Pause the connection and limit the number of outbound bytes
1550        # to 0, so that attempting to send the 400 aborts the
1551        # connection.
1552        connection.pauseProducing()
1553        connection._maxBufferedControlFrameBytes = 0
1554
1555        connection._respondToBadRequestAndDisconnect(streamID)
1556
1557        self.assertTrue(transport.disconnected)
1558
1559    def test_bufferingAutomaticFrameData(self):
1560        """
1561        If a the L{H2Connection} has been paused by the transport, it will
1562        not write automatic frame data triggered by writes.
1563        """
1564        # Set the connection up.
1565        connection = H2Connection()
1566        connection.requestFactory = DummyHTTPHandlerProxy
1567        frameFactory = FrameFactory()
1568        transport = StringTransport()
1569
1570        clientConnectionPreface = frameFactory.clientConnectionPreface()
1571        connection.makeConnection(transport)
1572        connection.dataReceived(clientConnectionPreface)
1573
1574        # Now we're going to pause the producer.
1575        connection.pauseProducing()
1576
1577        # Now we're going to send a bunch of empty SETTINGS frames. This
1578        # should not cause writes.
1579        for _ in range(0, 100):
1580            connection.dataReceived(frameFactory.buildSettingsFrame({}).serialize())
1581
1582        frames = framesFromBytes(transport.value())
1583        self.assertEqual(len(frames), 1)
1584
1585        # Re-enable the transport.
1586        connection.resumeProducing()
1587        frames = framesFromBytes(transport.value())
1588        self.assertEqual(len(frames), 101)
1589
1590    def test_bufferingAutomaticFrameDataWithCircuitBreaker(self):
1591        """
1592        If the L{H2Connection} has been paused by the transport, it will
1593        not write automatic frame data triggered by writes. If this buffer
1594        gets too large, the connection will be dropped.
1595        """
1596        # Set the connection up.
1597        connection = H2Connection()
1598        connection.requestFactory = DummyHTTPHandlerProxy
1599        frameFactory = FrameFactory()
1600        transport = StringTransport()
1601
1602        clientConnectionPreface = frameFactory.clientConnectionPreface()
1603        connection.makeConnection(transport)
1604        connection.dataReceived(clientConnectionPreface)
1605
1606        # Now we're going to pause the producer.
1607        connection.pauseProducing()
1608
1609        # Now we're going to limit the outstanding buffered bytes to
1610        # 100 bytes.
1611        connection._maxBufferedControlFrameBytes = 100
1612
1613        # Now we're going to send 11 empty SETTINGS frames. This
1614        # should not cause writes, or a close.
1615        self.assertFalse(transport.disconnecting)
1616        for _ in range(0, 11):
1617            connection.dataReceived(frameFactory.buildSettingsFrame({}).serialize())
1618        self.assertFalse(transport.disconnecting)
1619
1620        # Send a last settings frame, which will push us over the buffer limit.
1621        connection.dataReceived(frameFactory.buildSettingsFrame({}).serialize())
1622        self.assertTrue(transport.disconnected)
1623
1624    def test_bufferingContinuesIfProducerIsPausedOnWrite(self):
1625        """
1626        If the L{H2Connection} has buffered control frames, is unpaused, and then
1627        paused while unbuffering, it persists the buffer and stops trying to write.
1628        """
1629
1630        class AutoPausingStringTransport(StringTransport):
1631            def write(self, *args, **kwargs):
1632                StringTransport.write(self, *args, **kwargs)
1633                self.producer.pauseProducing()
1634
1635        # Set the connection up.
1636        connection = H2Connection()
1637        connection.requestFactory = DummyHTTPHandlerProxy
1638        frameFactory = FrameFactory()
1639        transport = AutoPausingStringTransport()
1640        transport.registerProducer(connection, True)
1641
1642        clientConnectionPreface = frameFactory.clientConnectionPreface()
1643        connection.makeConnection(transport)
1644        connection.dataReceived(clientConnectionPreface)
1645
1646        # The connection should already be paused.
1647        self.assertIsNotNone(connection._consumerBlocked)
1648        frames = framesFromBytes(transport.value())
1649        self.assertEqual(len(frames), 1)
1650        self.assertEqual(connection._bufferedControlFrameBytes, 0)
1651
1652        # Now we're going to send 11 empty SETTINGS frames. This should produce
1653        # no output, but some buffered settings ACKs.
1654        for _ in range(0, 11):
1655            connection.dataReceived(frameFactory.buildSettingsFrame({}).serialize())
1656
1657        frames = framesFromBytes(transport.value())
1658        self.assertEqual(len(frames), 1)
1659        self.assertEqual(connection._bufferedControlFrameBytes, 9 * 11)
1660
1661        # Ok, now we're going to unpause the producer. This should write only one of the
1662        # SETTINGS ACKs, as the connection gets repaused.
1663        connection.resumeProducing()
1664
1665        frames = framesFromBytes(transport.value())
1666        self.assertEqual(len(frames), 2)
1667        self.assertEqual(connection._bufferedControlFrameBytes, 9 * 10)
1668
1669    def test_circuitBreakerAbortsAfterProtocolError(self):
1670        """
1671        A client that triggers a L{h2.exceptions.ProtocolError} over a
1672        paused connection that's reached its buffered control frame
1673        limit causes that connection to be aborted.
1674        """
1675        memoryReactor = MemoryReactorClock()
1676        connection = H2Connection(memoryReactor)
1677        connection.callLater = memoryReactor.callLater
1678
1679        frameFactory = FrameFactory()
1680        transport = StringTransport()
1681
1682        # Establish the connection.
1683        clientConnectionPreface = frameFactory.clientConnectionPreface()
1684        connection.makeConnection(transport)
1685        connection.dataReceived(clientConnectionPreface)
1686
1687        # Pause it and limit the number of outbound bytes to 0, so
1688        # that a ProtocolError aborts the connection
1689        connection.pauseProducing()
1690        connection._maxBufferedControlFrameBytes = 0
1691
1692        # Trigger a ProtocolError with a data frame that refers to an
1693        # unknown stream.
1694        invalidData = frameFactory.buildDataFrame(data=b"yo", streamID=0xF0).serialize()
1695
1696        # The frame should have aborted the connection.
1697        connection.dataReceived(invalidData)
1698        self.assertTrue(transport.disconnected)
1699
1700
1701class H2FlowControlTests(unittest.TestCase, HTTP2TestHelpers):
1702    """
1703    Tests that ensure that we handle HTTP/2 flow control limits appropriately.
1704    """
1705
1706    getRequestHeaders = [
1707        (b":method", b"GET"),
1708        (b":authority", b"localhost"),
1709        (b":path", b"/"),
1710        (b":scheme", b"https"),
1711        (b"user-agent", b"twisted-test-code"),
1712    ]
1713
1714    getResponseData = b"'''\nNone\n'''\n"
1715
1716    postRequestHeaders = [
1717        (b":method", b"POST"),
1718        (b":authority", b"localhost"),
1719        (b":path", b"/post_endpoint"),
1720        (b":scheme", b"https"),
1721        (b"user-agent", b"twisted-test-code"),
1722        (b"content-length", b"25"),
1723    ]
1724
1725    postRequestData = [b"hello ", b"world, ", b"it's ", b"http/2!"]
1726
1727    postResponseData = b"'''\n25\nhello world, it's http/2!'''\n"
1728
1729    def test_bufferExcessData(self):
1730        """
1731        When a L{Request} object is not using C{IProducer} to generate data and
1732        so is not having backpressure exerted on it, the L{H2Stream} object
1733        will buffer data until the flow control window is opened.
1734        """
1735        f = FrameFactory()
1736        b = StringTransport()
1737        a = H2Connection()
1738        a.requestFactory = DummyHTTPHandlerProxy
1739
1740        # Shrink the window to 5 bytes, then send the request.
1741        requestBytes = f.clientConnectionPreface()
1742        requestBytes += f.buildSettingsFrame(
1743            {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5}
1744        ).serialize()
1745        requestBytes += buildRequestBytes(self.getRequestHeaders, [], f)
1746        a.makeConnection(b)
1747        # one byte at a time, to stress the implementation.
1748        for byte in iterbytes(requestBytes):
1749            a.dataReceived(byte)
1750
1751        # Send in WindowUpdate frames that open the window one byte at a time,
1752        # to repeatedly temporarily unbuffer data. 5 bytes will have already
1753        # been sent.
1754        bonusFrames = len(self.getResponseData) - 5
1755        for _ in range(bonusFrames):
1756            frame = f.buildWindowUpdateFrame(streamID=1, increment=1)
1757            a.dataReceived(frame.serialize())
1758
1759        # Give the sending loop a chance to catch up!
1760        def validate(streamID):
1761            frames = framesFromBytes(b.value())
1762
1763            # Check that the stream is correctly terminated.
1764            self.assertTrue("END_STREAM" in frames[-1].flags)
1765
1766            # Put the Data frames together to confirm we're all good.
1767            actualResponseData = b"".join(
1768                f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame)
1769            )
1770            self.assertEqual(self.getResponseData, actualResponseData)
1771
1772        return a._streamCleanupCallbacks[1].addCallback(validate)
1773
1774    def test_producerBlockingUnblocking(self):
1775        """
1776        L{Request} objects that have registered producers get blocked and
1777        unblocked according to HTTP/2 flow control.
1778        """
1779        f = FrameFactory()
1780        b = StringTransport()
1781        a = H2Connection()
1782        a.requestFactory = DummyProducerHandlerProxy
1783
1784        # Shrink the window to 5 bytes, then send the request.
1785        requestBytes = f.clientConnectionPreface()
1786        requestBytes += f.buildSettingsFrame(
1787            {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5}
1788        ).serialize()
1789        requestBytes += buildRequestBytes(self.getRequestHeaders, [], f)
1790        a.makeConnection(b)
1791        # one byte at a time, to stress the implementation.
1792        for byte in iterbytes(requestBytes):
1793            a.dataReceived(byte)
1794
1795        # Grab the request object.
1796        stream = a.streams[1]
1797        request = stream._request.original
1798
1799        # Confirm that the stream believes the producer is producing.
1800        self.assertTrue(stream._producerProducing)
1801
1802        # Write 10 bytes to the connection.
1803        request.write(b"helloworld")
1804
1805        # The producer should have been paused.
1806        self.assertFalse(stream._producerProducing)
1807        self.assertEqual(request.producer.events, ["pause"])
1808
1809        # Open the flow control window by 5 bytes. This should not unpause the
1810        # producer.
1811        a.dataReceived(f.buildWindowUpdateFrame(streamID=1, increment=5).serialize())
1812        self.assertFalse(stream._producerProducing)
1813        self.assertEqual(request.producer.events, ["pause"])
1814
1815        # Open the connection window by 5 bytes as well. This should also not
1816        # unpause the producer.
1817        a.dataReceived(f.buildWindowUpdateFrame(streamID=0, increment=5).serialize())
1818        self.assertFalse(stream._producerProducing)
1819        self.assertEqual(request.producer.events, ["pause"])
1820
1821        # Open it by five more bytes. This should unpause the producer.
1822        a.dataReceived(f.buildWindowUpdateFrame(streamID=1, increment=5).serialize())
1823        self.assertTrue(stream._producerProducing)
1824        self.assertEqual(request.producer.events, ["pause", "resume"])
1825
1826        # Write another 10 bytes, which should force us to pause again. When
1827        # written this chunk will be sent as one lot, simply because of the
1828        # fact that the sending loop is not currently running.
1829        request.write(b"helloworld")
1830        self.assertFalse(stream._producerProducing)
1831        self.assertEqual(request.producer.events, ["pause", "resume", "pause"])
1832
1833        # Open the window wide and then complete the request.
1834        a.dataReceived(f.buildWindowUpdateFrame(streamID=1, increment=50).serialize())
1835        self.assertTrue(stream._producerProducing)
1836        self.assertEqual(
1837            request.producer.events, ["pause", "resume", "pause", "resume"]
1838        )
1839        request.unregisterProducer()
1840        request.finish()
1841
1842        # Check that the sending loop sends all the appropriate data.
1843        def validate(streamID):
1844            frames = framesFromBytes(b.value())
1845
1846            # Check that the stream is correctly terminated.
1847            self.assertTrue("END_STREAM" in frames[-1].flags)
1848
1849            # Grab the data from the frames.
1850            dataChunks = [
1851                f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame)
1852            ]
1853            self.assertEqual(dataChunks, [b"helloworld", b"helloworld", b""])
1854
1855        return a._streamCleanupCallbacks[1].addCallback(validate)
1856
1857    def test_flowControlExact(self):
1858        """
1859        Exactly filling the flow control window still blocks producers.
1860        """
1861        f = FrameFactory()
1862        b = StringTransport()
1863        a = H2Connection()
1864        a.requestFactory = DummyProducerHandlerProxy
1865
1866        # Shrink the window to 5 bytes, then send the request.
1867        requestBytes = f.clientConnectionPreface()
1868        requestBytes += f.buildSettingsFrame(
1869            {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5}
1870        ).serialize()
1871        requestBytes += buildRequestBytes(self.getRequestHeaders, [], f)
1872        a.makeConnection(b)
1873        # one byte at a time, to stress the implementation.
1874        for byte in iterbytes(requestBytes):
1875            a.dataReceived(byte)
1876
1877        # Grab the request object.
1878        stream = a.streams[1]
1879        request = stream._request.original
1880
1881        # Confirm that the stream believes the producer is producing.
1882        self.assertTrue(stream._producerProducing)
1883
1884        # Write 10 bytes to the connection. This should block the producer
1885        # immediately.
1886        request.write(b"helloworld")
1887        self.assertFalse(stream._producerProducing)
1888        self.assertEqual(request.producer.events, ["pause"])
1889
1890        # Despite the producer being blocked, write one more byte. This should
1891        # not get sent or force any other data to be sent.
1892        request.write(b"h")
1893
1894        # Open the window wide and then complete the request. We do this by
1895        # means of callLater to ensure that the sending loop has time to run.
1896        def window_open():
1897            a.dataReceived(
1898                f.buildWindowUpdateFrame(streamID=1, increment=50).serialize()
1899            )
1900            self.assertTrue(stream._producerProducing)
1901            self.assertEqual(request.producer.events, ["pause", "resume"])
1902            request.unregisterProducer()
1903            request.finish()
1904
1905        windowDefer = task.deferLater(reactor, 0, window_open)
1906
1907        # Check that the sending loop sends all the appropriate data.
1908        def validate(streamID):
1909            frames = framesFromBytes(b.value())
1910
1911            # Check that the stream is correctly terminated.
1912            self.assertTrue("END_STREAM" in frames[-1].flags)
1913
1914            # Grab the data from the frames.
1915            dataChunks = [
1916                f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame)
1917            ]
1918            self.assertEqual(dataChunks, [b"hello", b"world", b"h", b""])
1919
1920        validateDefer = a._streamCleanupCallbacks[1].addCallback(validate)
1921        return defer.DeferredList([windowDefer, validateDefer])
1922
1923    def test_endingBlockedStream(self):
1924        """
1925        L{Request} objects that end a stream that is currently blocked behind
1926        flow control can still end the stream and get cleaned up.
1927        """
1928        f = FrameFactory()
1929        b = StringTransport()
1930        a = H2Connection()
1931        a.requestFactory = DummyProducerHandlerProxy
1932
1933        # Shrink the window to 5 bytes, then send the request.
1934        requestBytes = f.clientConnectionPreface()
1935        requestBytes += f.buildSettingsFrame(
1936            {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5}
1937        ).serialize()
1938        requestBytes += buildRequestBytes(self.getRequestHeaders, [], f)
1939        a.makeConnection(b)
1940        # one byte at a time, to stress the implementation.
1941        for byte in iterbytes(requestBytes):
1942            a.dataReceived(byte)
1943
1944        # Grab the request object.
1945        stream = a.streams[1]
1946        request = stream._request.original
1947
1948        # Confirm that the stream believes the producer is producing.
1949        self.assertTrue(stream._producerProducing)
1950
1951        # Write 10 bytes to the connection, then complete the connection.
1952        request.write(b"helloworld")
1953        request.unregisterProducer()
1954        request.finish()
1955
1956        # This should have completed the request.
1957        self.assertTrue(request.finished)
1958
1959        # Open the window wide and then complete the request.
1960        reactor.callLater(
1961            0,
1962            a.dataReceived,
1963            f.buildWindowUpdateFrame(streamID=1, increment=50).serialize(),
1964        )
1965
1966        # Check that the sending loop sends all the appropriate data.
1967        def validate(streamID):
1968            frames = framesFromBytes(b.value())
1969
1970            # Check that the stream is correctly terminated.
1971            self.assertTrue("END_STREAM" in frames[-1].flags)
1972
1973            # Grab the data from the frames.
1974            dataChunks = [
1975                f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame)
1976            ]
1977            self.assertEqual(dataChunks, [b"hello", b"world", b""])
1978
1979        return a._streamCleanupCallbacks[1].addCallback(validate)
1980
1981    def test_responseWithoutBody(self):
1982        """
1983        We safely handle responses without bodies.
1984        """
1985        f = FrameFactory()
1986        b = StringTransport()
1987        a = H2Connection()
1988
1989        # We use the DummyProducerHandler just because we can guarantee that it
1990        # doesn't end up with a body.
1991        a.requestFactory = DummyProducerHandlerProxy
1992
1993        # Send the request.
1994        requestBytes = f.clientConnectionPreface()
1995        requestBytes += buildRequestBytes(self.getRequestHeaders, [], f)
1996        a.makeConnection(b)
1997        # one byte at a time, to stress the implementation.
1998        for byte in iterbytes(requestBytes):
1999            a.dataReceived(byte)
2000
2001        # Grab the request object and the stream completion callback.
2002        stream = a.streams[1]
2003        request = stream._request.original
2004        cleanupCallback = a._streamCleanupCallbacks[1]
2005
2006        # Complete the connection immediately.
2007        request.unregisterProducer()
2008        request.finish()
2009
2010        # This should have completed the request.
2011        self.assertTrue(request.finished)
2012
2013        # Check that the sending loop sends all the appropriate data.
2014        def validate(streamID):
2015            frames = framesFromBytes(b.value())
2016
2017            self.assertEqual(len(frames), 3)
2018
2019            # Check that the stream is correctly terminated.
2020            self.assertTrue("END_STREAM" in frames[-1].flags)
2021
2022            # Grab the data from the frames.
2023            dataChunks = [
2024                f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame)
2025            ]
2026            self.assertEqual(
2027                dataChunks,
2028                [b""],
2029            )
2030
2031        return cleanupCallback.addCallback(validate)
2032
2033    def test_windowUpdateForCompleteStream(self):
2034        """
2035        WindowUpdate frames received after we've completed the stream are
2036        safely handled.
2037        """
2038        # To test this with the data sending loop working the way it does, we
2039        # need to send *no* body on the response. That's unusual, but fine.
2040        f = FrameFactory()
2041        b = StringTransport()
2042        a = H2Connection()
2043
2044        # We use the DummyProducerHandler just because we can guarantee that it
2045        # doesn't end up with a body.
2046        a.requestFactory = DummyProducerHandlerProxy
2047
2048        # Send the request.
2049        requestBytes = f.clientConnectionPreface()
2050        requestBytes += buildRequestBytes(self.getRequestHeaders, [], f)
2051        a.makeConnection(b)
2052        # one byte at a time, to stress the implementation.
2053        for byte in iterbytes(requestBytes):
2054            a.dataReceived(byte)
2055
2056        # Grab the request object and the stream completion callback.
2057        stream = a.streams[1]
2058        request = stream._request.original
2059        cleanupCallback = a._streamCleanupCallbacks[1]
2060
2061        # Complete the connection immediately.
2062        request.unregisterProducer()
2063        request.finish()
2064
2065        # This should have completed the request.
2066        self.assertTrue(request.finished)
2067
2068        # Now open the flow control window a bit. This should cause no
2069        # problems.
2070        a.dataReceived(f.buildWindowUpdateFrame(streamID=1, increment=50).serialize())
2071
2072        # Check that the sending loop sends all the appropriate data.
2073        def validate(streamID):
2074            frames = framesFromBytes(b.value())
2075
2076            self.assertEqual(len(frames), 3)
2077
2078            # Check that the stream is correctly terminated.
2079            self.assertTrue("END_STREAM" in frames[-1].flags)
2080
2081            # Grab the data from the frames.
2082            dataChunks = [
2083                f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame)
2084            ]
2085            self.assertEqual(
2086                dataChunks,
2087                [b""],
2088            )
2089
2090        return cleanupCallback.addCallback(validate)
2091
2092    def test_producerUnblocked(self):
2093        """
2094        L{Request} objects that have registered producers that are not blocked
2095        behind flow control do not have their producer notified.
2096        """
2097        f = FrameFactory()
2098        b = StringTransport()
2099        a = H2Connection()
2100        a.requestFactory = DummyProducerHandlerProxy
2101
2102        # Shrink the window to 5 bytes, then send the request.
2103        requestBytes = f.clientConnectionPreface()
2104        requestBytes += f.buildSettingsFrame(
2105            {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5}
2106        ).serialize()
2107        requestBytes += buildRequestBytes(self.getRequestHeaders, [], f)
2108        a.makeConnection(b)
2109        # one byte at a time, to stress the implementation.
2110        for byte in iterbytes(requestBytes):
2111            a.dataReceived(byte)
2112
2113        # Grab the request object.
2114        stream = a.streams[1]
2115        request = stream._request.original
2116
2117        # Confirm that the stream believes the producer is producing.
2118        self.assertTrue(stream._producerProducing)
2119
2120        # Write 4 bytes to the connection, leaving space in the window.
2121        request.write(b"word")
2122
2123        # The producer should not have been paused.
2124        self.assertTrue(stream._producerProducing)
2125        self.assertEqual(request.producer.events, [])
2126
2127        # Open the flow control window by 5 bytes. This should not notify the
2128        # producer.
2129        a.dataReceived(f.buildWindowUpdateFrame(streamID=1, increment=5).serialize())
2130        self.assertTrue(stream._producerProducing)
2131        self.assertEqual(request.producer.events, [])
2132
2133        # Open the window wide complete the request.
2134        request.unregisterProducer()
2135        request.finish()
2136
2137        # Check that the sending loop sends all the appropriate data.
2138        def validate(streamID):
2139            frames = framesFromBytes(b.value())
2140
2141            # Check that the stream is correctly terminated.
2142            self.assertTrue("END_STREAM" in frames[-1].flags)
2143
2144            # Grab the data from the frames.
2145            dataChunks = [
2146                f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame)
2147            ]
2148            self.assertEqual(dataChunks, [b"word", b""])
2149
2150        return a._streamCleanupCallbacks[1].addCallback(validate)
2151
2152    def test_unnecessaryWindowUpdate(self):
2153        """
2154        When a WindowUpdate frame is received for the whole connection but no
2155        data is currently waiting, nothing exciting happens.
2156        """
2157        f = FrameFactory()
2158        b = StringTransport()
2159        a = H2Connection()
2160        a.requestFactory = DummyHTTPHandlerProxy
2161
2162        # Send the request.
2163        frames = buildRequestFrames(self.postRequestHeaders, self.postRequestData, f)
2164        frames.insert(1, f.buildWindowUpdateFrame(streamID=0, increment=5))
2165        requestBytes = f.clientConnectionPreface()
2166        requestBytes += b"".join(f.serialize() for f in frames)
2167        a.makeConnection(b)
2168        # one byte at a time, to stress the implementation.
2169        for byte in iterbytes(requestBytes):
2170            a.dataReceived(byte)
2171
2172        # Give the sending loop a chance to catch up!
2173        def validate(streamID):
2174            frames = framesFromBytes(b.value())
2175
2176            # Check that the stream is correctly terminated.
2177            self.assertTrue("END_STREAM" in frames[-1].flags)
2178
2179            # Put the Data frames together to confirm we're all good.
2180            actualResponseData = b"".join(
2181                f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame)
2182            )
2183            self.assertEqual(self.postResponseData, actualResponseData)
2184
2185        return a._streamCleanupCallbacks[1].addCallback(validate)
2186
2187    def test_unnecessaryWindowUpdateForStream(self):
2188        """
2189        When a WindowUpdate frame is received for a stream but no data is
2190        currently waiting, that stream is not marked as unblocked and the
2191        priority tree continues to assert that no stream can progress.
2192        """
2193        f = FrameFactory()
2194        transport = StringTransport()
2195        conn = H2Connection()
2196        conn.requestFactory = DummyHTTPHandlerProxy
2197
2198        # Send a request that implies a body is coming. Twisted doesn't send a
2199        # response until the entire request is received, so it won't queue any
2200        # data yet. Then, fire off a WINDOW_UPDATE frame.
2201        frames = []
2202        frames.append(f.buildHeadersFrame(headers=self.postRequestHeaders, streamID=1))
2203        frames.append(f.buildWindowUpdateFrame(streamID=1, increment=5))
2204        data = f.clientConnectionPreface()
2205        data += b"".join(f.serialize() for f in frames)
2206
2207        conn.makeConnection(transport)
2208        conn.dataReceived(data)
2209
2210        self.assertAllStreamsBlocked(conn)
2211
2212    def test_windowUpdateAfterTerminate(self):
2213        """
2214        When a WindowUpdate frame is received for a stream that has been
2215        aborted it is ignored.
2216        """
2217        f = FrameFactory()
2218        b = StringTransport()
2219        a = H2Connection()
2220        a.requestFactory = DummyHTTPHandlerProxy
2221
2222        # Send the request.
2223        frames = buildRequestFrames(self.postRequestHeaders, self.postRequestData, f)
2224        requestBytes = f.clientConnectionPreface()
2225        requestBytes += b"".join(f.serialize() for f in frames)
2226        a.makeConnection(b)
2227        # one byte at a time, to stress the implementation.
2228        for byte in iterbytes(requestBytes):
2229            a.dataReceived(byte)
2230
2231        # Abort the connection.
2232        a.streams[1].abortConnection()
2233
2234        # Send a WindowUpdate
2235        windowUpdateFrame = f.buildWindowUpdateFrame(streamID=1, increment=5)
2236        a.dataReceived(windowUpdateFrame.serialize())
2237
2238        # Give the sending loop a chance to catch up!
2239        frames = framesFromBytes(b.value())
2240
2241        # Check that the stream is terminated.
2242        self.assertTrue(isinstance(frames[-1], hyperframe.frame.RstStreamFrame))
2243
2244    def test_windowUpdateAfterComplete(self):
2245        """
2246        When a WindowUpdate frame is received for a stream that has been
2247        completed it is ignored.
2248        """
2249        f = FrameFactory()
2250        b = StringTransport()
2251        a = H2Connection()
2252        a.requestFactory = DummyHTTPHandlerProxy
2253
2254        # Send the request.
2255        frames = buildRequestFrames(self.postRequestHeaders, self.postRequestData, f)
2256        requestBytes = f.clientConnectionPreface()
2257        requestBytes += b"".join(f.serialize() for f in frames)
2258        a.makeConnection(b)
2259        # one byte at a time, to stress the implementation.
2260        for byte in iterbytes(requestBytes):
2261            a.dataReceived(byte)
2262
2263        def update_window(*args):
2264            # Send a WindowUpdate
2265            windowUpdateFrame = f.buildWindowUpdateFrame(streamID=1, increment=5)
2266            a.dataReceived(windowUpdateFrame.serialize())
2267
2268        def validate(*args):
2269            # Give the sending loop a chance to catch up!
2270            frames = framesFromBytes(b.value())
2271
2272            # Check that the stream is ended neatly.
2273            self.assertIn("END_STREAM", frames[-1].flags)
2274
2275        d = a._streamCleanupCallbacks[1].addCallback(update_window)
2276        return d.addCallback(validate)
2277
2278    def test_dataAndRstStream(self):
2279        """
2280        When a DATA frame is received at the same time as RST_STREAM,
2281        Twisted does not send WINDOW_UPDATE frames for the stream.
2282        """
2283        frameFactory = FrameFactory()
2284        transport = StringTransport()
2285        a = H2Connection()
2286        a.requestFactory = DummyHTTPHandlerProxy
2287
2288        # Send the request, but instead of the last frame send a RST_STREAM
2289        # frame instead. This needs to be very long to actually force the
2290        # WINDOW_UPDATE frames out.
2291        frameData = [b"\x00" * (2 ** 14)] * 4
2292        bodyLength = f"{sum(len(data) for data in frameData)}"
2293        headers = self.postRequestHeaders[:-1] + [("content-length", bodyLength)]
2294        frames = buildRequestFrames(
2295            headers=headers, data=frameData, frameFactory=frameFactory
2296        )
2297        del frames[-1]
2298        frames.append(
2299            frameFactory.buildRstStreamFrame(
2300                streamID=1, errorCode=h2.errors.ErrorCodes.INTERNAL_ERROR
2301            )
2302        )
2303
2304        requestBytes = frameFactory.clientConnectionPreface()
2305        requestBytes += b"".join(f.serialize() for f in frames)
2306        a.makeConnection(transport)
2307
2308        # Feed all the bytes at once. This is important: if they arrive slowly,
2309        # Twisted doesn't have any problems.
2310        a.dataReceived(requestBytes)
2311
2312        # Check the frames we got. We expect a WINDOW_UPDATE frame only for the
2313        # connection, because Twisted knew the stream was going to be reset.
2314        frames = framesFromBytes(transport.value())
2315
2316        # Check that the only WINDOW_UPDATE frame came for the connection.
2317        windowUpdateFrameIDs = [
2318            f.stream_id
2319            for f in frames
2320            if isinstance(f, hyperframe.frame.WindowUpdateFrame)
2321        ]
2322        self.assertEqual([0], windowUpdateFrameIDs)
2323
2324        # While we're here: we shouldn't have received HEADERS or DATA for this
2325        # either.
2326        headersFrames = [
2327            f for f in frames if isinstance(f, hyperframe.frame.HeadersFrame)
2328        ]
2329        dataFrames = [f for f in frames if isinstance(f, hyperframe.frame.DataFrame)]
2330        self.assertFalse(headersFrames)
2331        self.assertFalse(dataFrames)
2332
2333    def test_abortRequestWithCircuitBreaker(self):
2334        """
2335        Aborting a request associated with a paused connection that's
2336        reached its buffered control frame limit causes that
2337        connection to be aborted.
2338        """
2339        memoryReactor = MemoryReactorClock()
2340        connection = H2Connection(memoryReactor)
2341        connection.callLater = memoryReactor.callLater
2342        connection.requestFactory = DummyHTTPHandlerProxy
2343
2344        frameFactory = FrameFactory()
2345        transport = StringTransport()
2346
2347        # Establish the connection.
2348        clientConnectionPreface = frameFactory.clientConnectionPreface()
2349        connection.makeConnection(transport)
2350        connection.dataReceived(clientConnectionPreface)
2351
2352        # Send a headers frame for a stream
2353        streamID = 1
2354        headersFrameData = frameFactory.buildHeadersFrame(
2355            headers=self.postRequestHeaders, streamID=streamID
2356        ).serialize()
2357        connection.dataReceived(headersFrameData)
2358
2359        # Pause it and limit the number of outbound bytes to 1, so
2360        # that a ProtocolError aborts the connection
2361        connection.pauseProducing()
2362        connection._maxBufferedControlFrameBytes = 0
2363
2364        # Remove anything sent by the preceding frames.
2365        transport.clear()
2366
2367        # Abort the request.
2368        connection.abortRequest(streamID)
2369
2370        # No RST_STREAM frame was sent...
2371        self.assertFalse(transport.value())
2372        # ...and the transport was disconnected (abortConnection was
2373        # called)
2374        self.assertTrue(transport.disconnected)
2375
2376
2377class HTTP2TransportChecking(unittest.TestCase, HTTP2TestHelpers):
2378    getRequestHeaders = [
2379        (b":method", b"GET"),
2380        (b":authority", b"localhost"),
2381        (b":path", b"/"),
2382        (b":scheme", b"https"),
2383        (b"user-agent", b"twisted-test-code"),
2384        (b"custom-header", b"1"),
2385        (b"custom-header", b"2"),
2386    ]
2387
2388    def test_registerProducerWithTransport(self):
2389        """
2390        L{H2Connection} can be registered with the transport as a producer.
2391        """
2392        b = StringTransport()
2393        a = H2Connection()
2394        a.requestFactory = DummyHTTPHandlerProxy
2395
2396        b.registerProducer(a, True)
2397        self.assertTrue(b.producer is a)
2398
2399    def test_pausingProducerPreventsDataSend(self):
2400        """
2401        L{H2Connection} can be paused by its consumer. When paused it stops
2402        sending data to the transport.
2403        """
2404        f = FrameFactory()
2405        b = StringTransport()
2406        a = H2Connection()
2407        a.requestFactory = DummyHTTPHandlerProxy
2408
2409        # Send the request.
2410        frames = buildRequestFrames(self.getRequestHeaders, [], f)
2411        requestBytes = f.clientConnectionPreface()
2412        requestBytes += b"".join(f.serialize() for f in frames)
2413        a.makeConnection(b)
2414        b.registerProducer(a, True)
2415
2416        # one byte at a time, to stress the implementation.
2417        for byte in iterbytes(requestBytes):
2418            a.dataReceived(byte)
2419
2420        # The headers will be sent immediately, but the body will be waiting
2421        # until the reactor gets to spin. Before it does we'll pause
2422        # production.
2423        a.pauseProducing()
2424
2425        # Now we want to build up a whole chain of Deferreds. We want to
2426        # 1. deferLater for a moment to let the sending loop run, which should
2427        #    block.
2428        # 2. After that deferred fires, we want to validate that no data has
2429        #    been sent yet.
2430        # 3. Then we want to resume the production.
2431        # 4. Then, we want to wait for the stream completion deferred.
2432        # 5. Validate that the data is correct.
2433        cleanupCallback = a._streamCleanupCallbacks[1]
2434
2435        def validateNotSent(*args):
2436            frames = framesFromBytes(b.value())
2437
2438            self.assertEqual(len(frames), 2)
2439            self.assertFalse(isinstance(frames[-1], hyperframe.frame.DataFrame))
2440            a.resumeProducing()
2441
2442            # Resume producing is a no-op, so let's call it a bunch more times.
2443            a.resumeProducing()
2444            a.resumeProducing()
2445            a.resumeProducing()
2446            a.resumeProducing()
2447            return cleanupCallback
2448
2449        def validateComplete(*args):
2450            frames = framesFromBytes(b.value())
2451
2452            # Check that the stream is correctly terminated.
2453            self.assertEqual(len(frames), 4)
2454            self.assertTrue("END_STREAM" in frames[-1].flags)
2455
2456        d = task.deferLater(reactor, 0.01, validateNotSent)
2457        d.addCallback(validateComplete)
2458
2459        return d
2460
2461    def test_stopProducing(self):
2462        """
2463        L{H2Connection} can be stopped by its producer. That causes it to lose
2464        its transport.
2465        """
2466        f = FrameFactory()
2467        b = StringTransport()
2468        a = H2Connection()
2469        a.requestFactory = DummyHTTPHandlerProxy
2470
2471        # Send the request.
2472        frames = buildRequestFrames(self.getRequestHeaders, [], f)
2473        requestBytes = f.clientConnectionPreface()
2474        requestBytes += b"".join(f.serialize() for f in frames)
2475        a.makeConnection(b)
2476        b.registerProducer(a, True)
2477
2478        # one byte at a time, to stress the implementation.
2479        for byte in iterbytes(requestBytes):
2480            a.dataReceived(byte)
2481
2482        # The headers will be sent immediately, but the body will be waiting
2483        # until the reactor gets to spin. Before it does we'll stop production.
2484        a.stopProducing()
2485
2486        frames = framesFromBytes(b.value())
2487
2488        self.assertEqual(len(frames), 2)
2489        self.assertFalse(isinstance(frames[-1], hyperframe.frame.DataFrame))
2490        self.assertFalse(a._stillProducing)
2491
2492    def test_passthroughHostAndPeer(self):
2493        """
2494        A L{H2Stream} object correctly passes through host and peer information
2495        from its L{H2Connection}.
2496        """
2497        hostAddress = IPv4Address("TCP", "17.52.24.8", 443)
2498        peerAddress = IPv4Address("TCP", "17.188.0.12", 32008)
2499
2500        frameFactory = FrameFactory()
2501        transport = StringTransport(hostAddress=hostAddress, peerAddress=peerAddress)
2502        connection = H2Connection()
2503        connection.requestFactory = DummyHTTPHandlerProxy
2504        connection.makeConnection(transport)
2505
2506        frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory)
2507        requestBytes = frameFactory.clientConnectionPreface()
2508        requestBytes += b"".join(frame.serialize() for frame in frames)
2509
2510        for byte in iterbytes(requestBytes):
2511            connection.dataReceived(byte)
2512
2513        # The stream is present. Go grab the stream object.
2514        stream = connection.streams[1]
2515        self.assertEqual(stream.getHost(), hostAddress)
2516        self.assertEqual(stream.getPeer(), peerAddress)
2517
2518        # Allow the stream to finish up and check the result.
2519        cleanupCallback = connection._streamCleanupCallbacks[1]
2520
2521        def validate(*args):
2522            self.assertEqual(stream.getHost(), hostAddress)
2523            self.assertEqual(stream.getPeer(), peerAddress)
2524
2525        return cleanupCallback.addCallback(validate)
2526
2527
2528class HTTP2SchedulingTests(unittest.TestCase, HTTP2TestHelpers):
2529    """
2530    The H2Connection object schedules certain events (mostly its data sending
2531    loop) using callbacks from the reactor. These tests validate that the calls
2532    are scheduled correctly.
2533    """
2534
2535    def test_initiallySchedulesOneDataCall(self):
2536        """
2537        When a H2Connection is established it schedules one call to be run as
2538        soon as the reactor has time.
2539        """
2540        reactor = task.Clock()
2541        a = H2Connection(reactor)
2542
2543        calls = reactor.getDelayedCalls()
2544        self.assertEqual(len(calls), 1)
2545        call = calls[0]
2546
2547        # Validate that the call is scheduled for right now, but hasn't run,
2548        # and that it's correct.
2549        self.assertTrue(call.active())
2550        self.assertEqual(call.time, 0)
2551        self.assertEqual(call.func, a._sendPrioritisedData)
2552        self.assertEqual(call.args, ())
2553        self.assertEqual(call.kw, {})
2554
2555
2556class HTTP2TimeoutTests(unittest.TestCase, HTTP2TestHelpers):
2557    """
2558    The L{H2Connection} object times out idle connections.
2559    """
2560
2561    getRequestHeaders = [
2562        (b":method", b"GET"),
2563        (b":authority", b"localhost"),
2564        (b":path", b"/"),
2565        (b":scheme", b"https"),
2566        (b"user-agent", b"twisted-test-code"),
2567        (b"custom-header", b"1"),
2568        (b"custom-header", b"2"),
2569    ]
2570
2571    # A sentinel object used to flag default timeouts
2572    _DEFAULT = object()
2573
2574    def patch_TimeoutMixin_clock(self, connection, reactor):
2575        """
2576        Unfortunately, TimeoutMixin does not allow passing an explicit reactor
2577        to test timeouts. For that reason, we need to monkeypatch the method
2578        set up by the TimeoutMixin.
2579
2580        @param connection: The HTTP/2 connection object to patch.
2581        @type connection: L{H2Connection}
2582
2583        @param reactor: The reactor whose callLater method we want.
2584        @type reactor: An object implementing
2585            L{twisted.internet.interfaces.IReactorTime}
2586        """
2587        connection.callLater = reactor.callLater
2588
2589    def initiateH2Connection(self, initialData, requestFactory):
2590        """
2591        Performs test setup by building a HTTP/2 connection object, a transport
2592        to back it, a reactor to run in, and sending in some initial data as
2593        needed.
2594
2595        @param initialData: The initial HTTP/2 data to be fed into the
2596            connection after setup.
2597        @type initialData: L{bytes}
2598
2599        @param requestFactory: The L{Request} factory to use with the
2600            connection.
2601        """
2602        reactor = task.Clock()
2603        conn = H2Connection(reactor)
2604        conn.timeOut = 100
2605        self.patch_TimeoutMixin_clock(conn, reactor)
2606
2607        transport = StringTransport()
2608        conn.requestFactory = _makeRequestProxyFactory(requestFactory)
2609        conn.makeConnection(transport)
2610
2611        # one byte at a time, to stress the implementation.
2612        for byte in iterbytes(initialData):
2613            conn.dataReceived(byte)
2614
2615        return (reactor, conn, transport)
2616
2617    def assertTimedOut(self, data, frameCount, errorCode, lastStreamID):
2618        """
2619        Confirm that the data that was sent matches what we expect from a
2620        timeout: namely, that it ends with a GOAWAY frame carrying an
2621        appropriate error code and last stream ID.
2622        """
2623        frames = framesFromBytes(data)
2624
2625        self.assertEqual(len(frames), frameCount)
2626        self.assertTrue(isinstance(frames[-1], hyperframe.frame.GoAwayFrame))
2627        self.assertEqual(frames[-1].error_code, errorCode)
2628        self.assertEqual(frames[-1].last_stream_id, lastStreamID)
2629
2630    def prepareAbortTest(self, abortTimeout=_DEFAULT):
2631        """
2632        Does the common setup for tests that want to test the aborting
2633        functionality of the HTTP/2 stack.
2634
2635        @param abortTimeout: The value to use for the abortTimeout. Defaults to
2636            whatever is set on L{H2Connection.abortTimeout}.
2637        @type abortTimeout: L{int} or L{None}
2638
2639        @return: A tuple of the reactor being used for the connection, the
2640            connection itself, and the transport.
2641        """
2642        if abortTimeout is self._DEFAULT:
2643            abortTimeout = H2Connection.abortTimeout
2644
2645        frameFactory = FrameFactory()
2646        initialData = frameFactory.clientConnectionPreface()
2647
2648        reactor, conn, transport = self.initiateH2Connection(
2649            initialData,
2650            requestFactory=DummyHTTPHandler,
2651        )
2652        conn.abortTimeout = abortTimeout
2653
2654        # Advance the clock.
2655        reactor.advance(100)
2656
2657        self.assertTimedOut(
2658            transport.value(),
2659            frameCount=2,
2660            errorCode=h2.errors.ErrorCodes.NO_ERROR,
2661            lastStreamID=0,
2662        )
2663        self.assertTrue(transport.disconnecting)
2664        self.assertFalse(transport.disconnected)
2665
2666        return reactor, conn, transport
2667
2668    def test_timeoutAfterInactivity(self):
2669        """
2670        When a L{H2Connection} does not receive any data for more than the
2671        time out interval, it closes the connection cleanly.
2672        """
2673        frameFactory = FrameFactory()
2674        initialData = frameFactory.clientConnectionPreface()
2675
2676        reactor, conn, transport = self.initiateH2Connection(
2677            initialData,
2678            requestFactory=DummyHTTPHandler,
2679        )
2680
2681        # Save the response preamble.
2682        preamble = transport.value()
2683
2684        # Advance the clock.
2685        reactor.advance(99)
2686
2687        # Everything is fine, no extra data got sent.
2688        self.assertEqual(preamble, transport.value())
2689        self.assertFalse(transport.disconnecting)
2690
2691        # Advance the clock.
2692        reactor.advance(2)
2693
2694        self.assertTimedOut(
2695            transport.value(),
2696            frameCount=2,
2697            errorCode=h2.errors.ErrorCodes.NO_ERROR,
2698            lastStreamID=0,
2699        )
2700        self.assertTrue(transport.disconnecting)
2701
2702    def test_timeoutResetByRequestData(self):
2703        """
2704        When a L{H2Connection} receives data, the timeout is reset.
2705        """
2706        # Don't send any initial data, we'll send the preamble manually.
2707        frameFactory = FrameFactory()
2708        initialData = b""
2709
2710        reactor, conn, transport = self.initiateH2Connection(
2711            initialData,
2712            requestFactory=DummyHTTPHandler,
2713        )
2714
2715        # Send one byte of the preamble every 99 'seconds'.
2716        for byte in iterbytes(frameFactory.clientConnectionPreface()):
2717            conn.dataReceived(byte)
2718
2719            # Advance the clock.
2720            reactor.advance(99)
2721
2722            # Everything is fine.
2723            self.assertFalse(transport.disconnecting)
2724
2725        # Advance the clock.
2726        reactor.advance(2)
2727
2728        self.assertTimedOut(
2729            transport.value(),
2730            frameCount=2,
2731            errorCode=h2.errors.ErrorCodes.NO_ERROR,
2732            lastStreamID=0,
2733        )
2734        self.assertTrue(transport.disconnecting)
2735
2736    def test_timeoutResetByResponseData(self):
2737        """
2738        When a L{H2Connection} sends data, the timeout is reset.
2739        """
2740        # Don't send any initial data, we'll send the preamble manually.
2741        frameFactory = FrameFactory()
2742        initialData = b""
2743        requests = []
2744
2745        frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory)
2746        initialData = frameFactory.clientConnectionPreface()
2747        initialData += b"".join(f.serialize() for f in frames)
2748
2749        def saveRequest(stream, queued):
2750            req = DelayedHTTPHandler(stream, queued=queued)
2751            requests.append(req)
2752            return req
2753
2754        reactor, conn, transport = self.initiateH2Connection(
2755            initialData,
2756            requestFactory=saveRequest,
2757        )
2758
2759        conn.dataReceived(frameFactory.clientConnectionPreface())
2760
2761        # Advance the clock.
2762        reactor.advance(99)
2763        self.assertEquals(len(requests), 1)
2764
2765        for x in range(10):
2766            # It doesn't time out as it's being written...
2767            requests[0].write(b"some bytes")
2768            reactor.advance(99)
2769            self.assertFalse(transport.disconnecting)
2770
2771        # but the timer is still running, and it times out when it idles.
2772        reactor.advance(2)
2773        self.assertTimedOut(
2774            transport.value(),
2775            frameCount=13,
2776            errorCode=h2.errors.ErrorCodes.PROTOCOL_ERROR,
2777            lastStreamID=1,
2778        )
2779
2780    def test_timeoutWithProtocolErrorIfStreamsOpen(self):
2781        """
2782        When a L{H2Connection} times out with active streams, the error code
2783        returned is L{h2.errors.ErrorCodes.PROTOCOL_ERROR}.
2784        """
2785        frameFactory = FrameFactory()
2786        frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory)
2787        initialData = frameFactory.clientConnectionPreface()
2788        initialData += b"".join(f.serialize() for f in frames)
2789
2790        reactor, conn, transport = self.initiateH2Connection(
2791            initialData,
2792            requestFactory=DummyProducerHandler,
2793        )
2794
2795        # Advance the clock to time out the request.
2796        reactor.advance(101)
2797
2798        self.assertTimedOut(
2799            transport.value(),
2800            frameCount=2,
2801            errorCode=h2.errors.ErrorCodes.PROTOCOL_ERROR,
2802            lastStreamID=1,
2803        )
2804        self.assertTrue(transport.disconnecting)
2805
2806    def test_noTimeoutIfConnectionLost(self):
2807        """
2808        When a L{H2Connection} loses its connection it cancels its timeout.
2809        """
2810        frameFactory = FrameFactory()
2811        frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory)
2812        initialData = frameFactory.clientConnectionPreface()
2813        initialData += b"".join(f.serialize() for f in frames)
2814
2815        reactor, conn, transport = self.initiateH2Connection(
2816            initialData,
2817            requestFactory=DummyProducerHandler,
2818        )
2819
2820        sentData = transport.value()
2821        oldCallCount = len(reactor.getDelayedCalls())
2822
2823        # Now lose the connection.
2824        conn.connectionLost("reason")
2825
2826        # There should be one fewer call than there was.
2827        currentCallCount = len(reactor.getDelayedCalls())
2828        self.assertEqual(oldCallCount - 1, currentCallCount)
2829
2830        # Advancing the clock should do nothing.
2831        reactor.advance(101)
2832        self.assertEqual(transport.value(), sentData)
2833
2834    def test_timeoutEventuallyForcesConnectionClosed(self):
2835        """
2836        When a L{H2Connection} has timed the connection out, and the transport
2837        doesn't get torn down within 15 seconds, it gets forcibly closed.
2838        """
2839        reactor, conn, transport = self.prepareAbortTest()
2840
2841        # Advance the clock to see that we abort the connection.
2842        reactor.advance(14)
2843        self.assertTrue(transport.disconnecting)
2844        self.assertFalse(transport.disconnected)
2845        reactor.advance(1)
2846        self.assertTrue(transport.disconnecting)
2847        self.assertTrue(transport.disconnected)
2848
2849    def test_losingConnectionCancelsTheAbort(self):
2850        """
2851        When a L{H2Connection} has timed the connection out, getting
2852        C{connectionLost} called on it cancels the forcible connection close.
2853        """
2854        reactor, conn, transport = self.prepareAbortTest()
2855
2856        # Advance the clock, but right before the end fire connectionLost.
2857        reactor.advance(14)
2858        conn.connectionLost(None)
2859
2860        # Check that the transport isn't forcibly closed.
2861        reactor.advance(1)
2862        self.assertTrue(transport.disconnecting)
2863        self.assertFalse(transport.disconnected)
2864
2865    def test_losingConnectionWithNoAbortTimeOut(self):
2866        """
2867        When a L{H2Connection} has timed the connection out but the
2868        C{abortTimeout} is set to L{None}, the connection is never aborted.
2869        """
2870        reactor, conn, transport = self.prepareAbortTest(abortTimeout=None)
2871
2872        # Advance the clock an arbitrarily long way, and confirm it never
2873        # aborts.
2874        reactor.advance(2 ** 32)
2875        self.assertTrue(transport.disconnecting)
2876        self.assertFalse(transport.disconnected)
2877
2878    def test_connectionLostAfterForceClose(self):
2879        """
2880        If a timed out transport doesn't close after 15 seconds, the
2881        L{HTTPChannel} will forcibly close it.
2882        """
2883        reactor, conn, transport = self.prepareAbortTest()
2884
2885        # Force the follow-on forced closure.
2886        reactor.advance(15)
2887        self.assertTrue(transport.disconnecting)
2888        self.assertTrue(transport.disconnected)
2889
2890        # Now call connectionLost on the protocol. This is done by some
2891        # transports, including TCP and TLS. We don't have anything we can
2892        # assert on here: this just must not explode.
2893        conn.connectionLost(error.ConnectionDone)
2894
2895    def test_timeOutClientThatSendsOnlyInvalidFrames(self):
2896        """
2897        A client that sends only invalid frames is eventually timed out.
2898        """
2899        memoryReactor = MemoryReactorClock()
2900
2901        connection = H2Connection(memoryReactor)
2902        connection.callLater = memoryReactor.callLater
2903        connection.timeOut = 60
2904
2905        frameFactory = FrameFactory()
2906        transport = StringTransport()
2907
2908        clientConnectionPreface = frameFactory.clientConnectionPreface()
2909        connection.makeConnection(transport)
2910        connection.dataReceived(clientConnectionPreface)
2911
2912        # Send data until both the loseConnection and abortConnection
2913        # timeouts have elapsed.
2914        for _ in range(connection.timeOut + connection.abortTimeout):
2915            connection.dataReceived(frameFactory.buildRstStreamFrame(1).serialize())
2916            memoryReactor.advance(1)
2917
2918        # Invalid frames don't reset any timeouts, so the above has
2919        # forcibly disconnected us via abortConnection.
2920        self.assertTrue(transport.disconnected)
2921