1# Copyright (c) Twisted Matrix Laboratories. 2# See LICENSE for details. 3 4""" 5Test HTTP/2 support. 6""" 7 8 9import itertools 10 11from zope.interface import directlyProvides, providedBy 12 13from twisted.internet import defer, error, reactor, task 14from twisted.internet.address import IPv4Address 15from twisted.internet.testing import MemoryReactorClock, StringTransport 16from twisted.python import failure 17from twisted.python.compat import iterbytes 18from twisted.test.test_internet import DummyProducer 19from twisted.trial import unittest 20from twisted.web import http 21from twisted.web.test.test_http import ( 22 DelayedHTTPHandler, 23 DelayedHTTPHandlerProxy, 24 DummyHTTPHandler, 25 DummyHTTPHandlerProxy, 26 DummyPullProducerHandlerProxy, 27 _IDeprecatedHTTPChannelToRequestInterfaceProxy, 28 _makeRequestProxyFactory, 29) 30 31skipH2 = None 32 33try: 34 # These third-party imports are guaranteed to be present if HTTP/2 support 35 # is compiled in. We do not use them in the main code: only in the tests. 36 import h2 # type: ignore[import] 37 import h2.errors # type: ignore[import] 38 import h2.exceptions # type: ignore[import] 39 import hyperframe # type: ignore[import] 40 import priority # type: ignore[import] 41 from hpack.hpack import Decoder, Encoder # type: ignore[import] 42 43 from twisted.web._http2 import H2Connection 44except ImportError: 45 skipH2 = "HTTP/2 support not enabled" 46 47 48# Define some helpers for the rest of these tests. 49class FrameFactory: 50 """ 51 A class containing lots of helper methods and state to build frames. This 52 allows test cases to easily build correct HTTP/2 frames to feed to 53 hyper-h2. 54 """ 55 56 def __init__(self): 57 self.encoder = Encoder() 58 59 def refreshEncoder(self): 60 self.encoder = Encoder() 61 62 def clientConnectionPreface(self): 63 return b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" 64 65 def buildHeadersFrame(self, headers, flags=[], streamID=1, **priorityKwargs): 66 """ 67 Builds a single valid headers frame out of the contained headers. 68 """ 69 f = hyperframe.frame.HeadersFrame(streamID) 70 f.data = self.encoder.encode(headers) 71 f.flags.add("END_HEADERS") 72 for flag in flags: 73 f.flags.add(flag) 74 75 for k, v in priorityKwargs.items(): 76 setattr(f, k, v) 77 78 return f 79 80 def buildDataFrame(self, data, flags=None, streamID=1): 81 """ 82 Builds a single data frame out of a chunk of data. 83 """ 84 flags = set(flags) if flags is not None else set() 85 f = hyperframe.frame.DataFrame(streamID) 86 f.data = data 87 f.flags = flags 88 return f 89 90 def buildSettingsFrame(self, settings, ack=False): 91 """ 92 Builds a single settings frame. 93 """ 94 f = hyperframe.frame.SettingsFrame(0) 95 if ack: 96 f.flags.add("ACK") 97 98 f.settings = settings 99 return f 100 101 def buildWindowUpdateFrame(self, streamID, increment): 102 """ 103 Builds a single WindowUpdate frame. 104 """ 105 f = hyperframe.frame.WindowUpdateFrame(streamID) 106 f.window_increment = increment 107 return f 108 109 def buildGoAwayFrame(self, lastStreamID, errorCode=0, additionalData=b""): 110 """ 111 Builds a single GOAWAY frame. 112 """ 113 f = hyperframe.frame.GoAwayFrame(0) 114 f.error_code = errorCode 115 f.last_stream_id = lastStreamID 116 f.additional_data = additionalData 117 return f 118 119 def buildRstStreamFrame(self, streamID, errorCode=0): 120 """ 121 Builds a single RST_STREAM frame. 122 """ 123 f = hyperframe.frame.RstStreamFrame(streamID) 124 f.error_code = errorCode 125 return f 126 127 def buildPriorityFrame(self, streamID, weight, dependsOn=0, exclusive=False): 128 """ 129 Builds a single priority frame. 130 """ 131 f = hyperframe.frame.PriorityFrame(streamID) 132 f.depends_on = dependsOn 133 f.stream_weight = weight 134 f.exclusive = exclusive 135 return f 136 137 def buildPushPromiseFrame(self, streamID, promisedStreamID, headers, flags=[]): 138 """ 139 Builds a single Push Promise frame. 140 """ 141 f = hyperframe.frame.PushPromiseFrame(streamID) 142 f.promised_stream_id = promisedStreamID 143 f.data = self.encoder.encode(headers) 144 f.flags = set(flags) 145 f.flags.add("END_HEADERS") 146 return f 147 148 149class FrameBuffer: 150 """ 151 A test object that converts data received from Twisted's HTTP/2 stack and 152 turns it into a sequence of hyperframe frame objects. 153 154 This is primarily used to make it easier to write and debug tests: rather 155 than have to serialize the expected frames and then do byte-level 156 comparison (which can be unclear in debugging output), this object makes it 157 possible to work with the frames directly. 158 159 It also ensures that headers are properly decompressed. 160 """ 161 162 def __init__(self): 163 self.decoder = Decoder() 164 self._data = b"" 165 166 def receiveData(self, data): 167 self._data += data 168 169 def __iter__(self): 170 return self 171 172 def next(self): 173 if len(self._data) < 9: 174 raise StopIteration() 175 176 frame, length = hyperframe.frame.Frame.parse_frame_header(self._data[:9]) 177 if len(self._data) < length + 9: 178 raise StopIteration() 179 180 frame.parse_body(memoryview(self._data[9 : 9 + length])) 181 self._data = self._data[9 + length :] 182 183 if isinstance(frame, hyperframe.frame.HeadersFrame): 184 frame.data = self.decoder.decode(frame.data, raw=True) 185 186 return frame 187 188 __next__ = next 189 190 191def buildRequestFrames(headers, data, frameFactory=None, streamID=1): 192 """ 193 Provides a sequence of HTTP/2 frames that encode a single HTTP request. 194 This should be used when you want to control the serialization yourself, 195 e.g. because you want to interleave other frames with these. If that's not 196 necessary, prefer L{buildRequestBytes}. 197 198 @param headers: The HTTP/2 headers to send. 199 @type headers: L{list} of L{tuple} of L{bytes} 200 201 @param data: The HTTP data to send. Each list entry will be sent in its own 202 frame. 203 @type data: L{list} of L{bytes} 204 205 @param frameFactory: The L{FrameFactory} that will be used to construct the 206 frames. 207 @type frameFactory: L{FrameFactory} 208 209 @param streamID: The ID of the stream on which to send the request. 210 @type streamID: L{int} 211 """ 212 if frameFactory is None: 213 frameFactory = FrameFactory() 214 215 frames = [] 216 frames.append(frameFactory.buildHeadersFrame(headers=headers, streamID=streamID)) 217 frames.extend( 218 frameFactory.buildDataFrame(chunk, streamID=streamID) for chunk in data 219 ) 220 frames[-1].flags.add("END_STREAM") 221 return frames 222 223 224def buildRequestBytes(headers, data, frameFactory=None, streamID=1): 225 """ 226 Provides the byte sequence for a collection of HTTP/2 frames representing 227 the provided request. 228 229 @param headers: The HTTP/2 headers to send. 230 @type headers: L{list} of L{tuple} of L{bytes} 231 232 @param data: The HTTP data to send. Each list entry will be sent in its own 233 frame. 234 @type data: L{list} of L{bytes} 235 236 @param frameFactory: The L{FrameFactory} that will be used to construct the 237 frames. 238 @type frameFactory: L{FrameFactory} 239 240 @param streamID: The ID of the stream on which to send the request. 241 @type streamID: L{int} 242 """ 243 frames = buildRequestFrames(headers, data, frameFactory, streamID) 244 return b"".join(f.serialize() for f in frames) 245 246 247def framesFromBytes(data): 248 """ 249 Given a sequence of bytes, decodes them into frames. 250 251 Note that this method should almost always be called only once, before 252 making some assertions. This is because decoding HTTP/2 frames is extremely 253 stateful, and this function doesn't preserve any of that state between 254 calls. 255 256 @param data: The serialized HTTP/2 frames. 257 @type data: L{bytes} 258 259 @returns: A list of HTTP/2 frames. 260 @rtype: L{list} of L{hyperframe.frame.Frame} subclasses. 261 """ 262 buffer = FrameBuffer() 263 buffer.receiveData(data) 264 return list(buffer) 265 266 267class ChunkedHTTPHandler(http.Request): 268 """ 269 A HTTP request object that writes chunks of data back to the network based 270 on the URL. 271 272 Must be called with a path /chunked/<num_chunks> 273 """ 274 275 chunkData = b"hello world!" 276 277 def process(self): 278 chunks = int(self.uri.split(b"/")[-1]) 279 self.setResponseCode(200) 280 281 for _ in range(chunks): 282 self.write(self.chunkData) 283 284 self.finish() 285 286 287ChunkedHTTPHandlerProxy = _makeRequestProxyFactory(ChunkedHTTPHandler) 288 289 290class ConsumerDummyHandler(http.Request): 291 """ 292 This is a HTTP request handler that works with the C{IPushProducer} 293 implementation in the L{H2Stream} object. No current IRequest object does 294 that, but in principle future implementations could: that codepath should 295 therefore be tested. 296 """ 297 298 def __init__(self, *args, **kwargs): 299 http.Request.__init__(self, *args, **kwargs) 300 301 # Production starts paused. 302 self.channel.pauseProducing() 303 self._requestReceived = False 304 self._data = None 305 306 def acceptData(self): 307 """ 308 Start the data pipe. 309 """ 310 self.channel.resumeProducing() 311 312 def requestReceived(self, *args, **kwargs): 313 self._requestReceived = True 314 return http.Request.requestReceived(self, *args, **kwargs) 315 316 def process(self): 317 self.setResponseCode(200) 318 self._data = self.content.read() 319 returnData = b"this is a response from a consumer dummy handler" 320 self.write(returnData) 321 self.finish() 322 323 324ConsumerDummyHandlerProxy = _makeRequestProxyFactory(ConsumerDummyHandler) 325 326 327class AbortingConsumerDummyHandler(ConsumerDummyHandler): 328 """ 329 This is a HTTP request handler that works with the C{IPushProducer} 330 implementation in the L{H2Stream} object. The difference between this and 331 the ConsumerDummyHandler is that after resuming production it immediately 332 aborts it again. 333 """ 334 335 def acceptData(self): 336 """ 337 Start and then immediately stop the data pipe. 338 """ 339 self.channel.resumeProducing() 340 self.channel.stopProducing() 341 342 343AbortingConsumerDummyHandlerProxy = _makeRequestProxyFactory( 344 AbortingConsumerDummyHandler 345) 346 347 348class DummyProducerHandler(http.Request): 349 """ 350 An HTTP request handler that registers a dummy producer to serve the body. 351 352 The owner must call C{finish} to complete the response. 353 """ 354 355 def process(self): 356 self.setResponseCode(200) 357 self.registerProducer(DummyProducer(), True) 358 359 360DummyProducerHandlerProxy = _makeRequestProxyFactory(DummyProducerHandler) 361 362 363class NotifyingRequestFactory: 364 """ 365 A L{http.Request} factory that calls L{http.Request.notifyFinish} on all 366 L{http.Request} objects before it returns them, and squirrels the resulting 367 L{defer.Deferred} away on the class for later use. This is done as early 368 as possible to ensure that we always see the result. 369 """ 370 371 def __init__(self, wrappedFactory): 372 self.results = [] 373 self._wrappedFactory = wrappedFactory 374 375 # Add interfaces provided by the factory we are wrapping. We expect 376 # this only to be INonQueuedRequestFactory, but we don't want to 377 # hard-code that rule. 378 for interface in providedBy(self._wrappedFactory): 379 directlyProvides(self, interface) 380 381 def __call__(self, *args, **kwargs): 382 req = self._wrappedFactory(*args, **kwargs) 383 self.results.append(req.notifyFinish()) 384 return _IDeprecatedHTTPChannelToRequestInterfaceProxy(req) 385 386 387NotifyingRequestFactoryProxy = _makeRequestProxyFactory(NotifyingRequestFactory) 388 389 390class HTTP2TestHelpers: 391 """ 392 A superclass that contains no tests but provides test helpers for HTTP/2 393 tests. 394 """ 395 396 if skipH2: 397 skip = skipH2 398 399 def assertAllStreamsBlocked(self, connection): 400 """ 401 Confirm that all streams are blocked: that is, the priority tree 402 believes that none of the streams have data ready to send. 403 """ 404 self.assertRaises(priority.DeadlockError, next, connection.priority) 405 406 407class HTTP2ServerTests(unittest.TestCase, HTTP2TestHelpers): 408 getRequestHeaders = [ 409 (b":method", b"GET"), 410 (b":authority", b"localhost"), 411 (b":path", b"/"), 412 (b":scheme", b"https"), 413 (b"user-agent", b"twisted-test-code"), 414 (b"custom-header", b"1"), 415 (b"custom-header", b"2"), 416 ] 417 418 postRequestHeaders = [ 419 (b":method", b"POST"), 420 (b":authority", b"localhost"), 421 (b":path", b"/post_endpoint"), 422 (b":scheme", b"https"), 423 (b"user-agent", b"twisted-test-code"), 424 (b"content-length", b"25"), 425 ] 426 427 postRequestData = [b"hello ", b"world, ", b"it's ", b"http/2!"] 428 429 getResponseHeaders = [ 430 (b":status", b"200"), 431 (b"request", b"/"), 432 (b"command", b"GET"), 433 (b"version", b"HTTP/2"), 434 (b"content-length", b"13"), 435 ] 436 437 getResponseData = b"'''\nNone\n'''\n" 438 439 postResponseHeaders = [ 440 (b":status", b"200"), 441 (b"request", b"/post_endpoint"), 442 (b"command", b"POST"), 443 (b"version", b"HTTP/2"), 444 (b"content-length", b"36"), 445 ] 446 447 postResponseData = b"'''\n25\nhello world, it's http/2!'''\n" 448 449 def connectAndReceive(self, connection, headers, body): 450 """ 451 Takes a single L{H2Connection} object and connects it to a 452 L{StringTransport} using a brand new L{FrameFactory}. 453 454 @param connection: The L{H2Connection} object to connect. 455 @type connection: L{H2Connection} 456 457 @param headers: The headers to send on the first request. 458 @type headers: L{Iterable} of L{tuple} of C{(bytes, bytes)} 459 460 @param body: Chunks of body to send, if any. 461 @type body: L{Iterable} of L{bytes} 462 463 @return: A tuple of L{FrameFactory}, L{StringTransport} 464 """ 465 frameFactory = FrameFactory() 466 transport = StringTransport() 467 468 requestBytes = frameFactory.clientConnectionPreface() 469 requestBytes += buildRequestBytes(headers, body, frameFactory) 470 471 connection.makeConnection(transport) 472 # One byte at a time, to stress the implementation. 473 for byte in iterbytes(requestBytes): 474 connection.dataReceived(byte) 475 476 return frameFactory, transport 477 478 def test_basicRequest(self): 479 """ 480 Send request over a TCP connection and confirm that we get back the 481 expected data in the order and style we expect. 482 """ 483 # This test is complex because it validates the data very closely: it 484 # specifically checks frame ordering and type. 485 connection = H2Connection() 486 connection.requestFactory = DummyHTTPHandlerProxy 487 _, transport = self.connectAndReceive(connection, self.getRequestHeaders, []) 488 489 def validate(streamID): 490 frames = framesFromBytes(transport.value()) 491 492 self.assertEqual(len(frames), 4) 493 self.assertTrue(all(f.stream_id == 1 for f in frames[1:])) 494 495 self.assertTrue(isinstance(frames[1], hyperframe.frame.HeadersFrame)) 496 self.assertTrue(isinstance(frames[2], hyperframe.frame.DataFrame)) 497 self.assertTrue(isinstance(frames[3], hyperframe.frame.DataFrame)) 498 499 self.assertEqual(dict(frames[1].data), dict(self.getResponseHeaders)) 500 self.assertEqual(frames[2].data, self.getResponseData) 501 self.assertEqual(frames[3].data, b"") 502 self.assertTrue("END_STREAM" in frames[3].flags) 503 504 return connection._streamCleanupCallbacks[1].addCallback(validate) 505 506 def test_postRequest(self): 507 """ 508 Send a POST request and confirm that the data is safely transferred. 509 """ 510 connection = H2Connection() 511 connection.requestFactory = DummyHTTPHandlerProxy 512 _, transport = self.connectAndReceive( 513 connection, self.postRequestHeaders, self.postRequestData 514 ) 515 516 def validate(streamID): 517 frames = framesFromBytes(transport.value()) 518 519 # One Settings frame, one Headers frame and two Data frames. 520 self.assertEqual(len(frames), 4) 521 self.assertTrue(all(f.stream_id == 1 for f in frames[-3:])) 522 523 self.assertTrue(isinstance(frames[-3], hyperframe.frame.HeadersFrame)) 524 self.assertTrue(isinstance(frames[-2], hyperframe.frame.DataFrame)) 525 self.assertTrue(isinstance(frames[-1], hyperframe.frame.DataFrame)) 526 527 self.assertEqual(dict(frames[-3].data), dict(self.postResponseHeaders)) 528 self.assertEqual(frames[-2].data, self.postResponseData) 529 self.assertEqual(frames[-1].data, b"") 530 self.assertTrue("END_STREAM" in frames[-1].flags) 531 532 return connection._streamCleanupCallbacks[1].addCallback(validate) 533 534 def test_postRequestNoLength(self): 535 """ 536 Send a POST request without length and confirm that the data is safely 537 transferred. 538 """ 539 postResponseHeaders = [ 540 (b":status", b"200"), 541 (b"request", b"/post_endpoint"), 542 (b"command", b"POST"), 543 (b"version", b"HTTP/2"), 544 (b"content-length", b"38"), 545 ] 546 postResponseData = b"'''\nNone\nhello world, it's http/2!'''\n" 547 548 # Strip the content-length header. 549 postRequestHeaders = [ 550 (x, y) for x, y in self.postRequestHeaders if x != b"content-length" 551 ] 552 553 connection = H2Connection() 554 connection.requestFactory = DummyHTTPHandlerProxy 555 _, transport = self.connectAndReceive( 556 connection, postRequestHeaders, self.postRequestData 557 ) 558 559 def validate(streamID): 560 frames = framesFromBytes(transport.value()) 561 562 # One Settings frame, one Headers frame, and two Data frames 563 self.assertEqual(len(frames), 4) 564 self.assertTrue(all(f.stream_id == 1 for f in frames[-3:])) 565 566 self.assertTrue(isinstance(frames[-3], hyperframe.frame.HeadersFrame)) 567 self.assertTrue(isinstance(frames[-2], hyperframe.frame.DataFrame)) 568 self.assertTrue(isinstance(frames[-1], hyperframe.frame.DataFrame)) 569 570 self.assertEqual(dict(frames[-3].data), dict(postResponseHeaders)) 571 self.assertEqual(frames[-2].data, postResponseData) 572 self.assertEqual(frames[-1].data, b"") 573 self.assertTrue("END_STREAM" in frames[-1].flags) 574 575 return connection._streamCleanupCallbacks[1].addCallback(validate) 576 577 def test_interleavedRequests(self): 578 """ 579 Many interleaved POST requests all get received and responded to 580 appropriately. 581 """ 582 # Unfortunately this test is pretty complex. 583 REQUEST_COUNT = 40 584 585 f = FrameFactory() 586 b = StringTransport() 587 a = H2Connection() 588 a.requestFactory = DummyHTTPHandlerProxy 589 590 # Stream IDs are always odd numbers. 591 streamIDs = list(range(1, REQUEST_COUNT * 2, 2)) 592 frames = [ 593 buildRequestFrames( 594 self.postRequestHeaders, self.postRequestData, f, streamID 595 ) 596 for streamID in streamIDs 597 ] 598 599 requestBytes = f.clientConnectionPreface() 600 601 # Interleave the frames. That is, send one frame from each stream at a 602 # time. This wacky line lets us do that. 603 frames = itertools.chain.from_iterable(zip(*frames)) 604 requestBytes += b"".join(frame.serialize() for frame in frames) 605 a.makeConnection(b) 606 # one byte at a time, to stress the implementation. 607 for byte in iterbytes(requestBytes): 608 a.dataReceived(byte) 609 610 def validate(results): 611 frames = framesFromBytes(b.value()) 612 613 # We expect 1 Settings frame for the connection, and then 3 frames 614 # *per stream* (1 Headers frame, 2 Data frames). This doesn't send 615 # enough data to trigger a window update. 616 self.assertEqual(len(frames), 1 + (3 * 40)) 617 618 # Let's check the data is ok. We need the non-WindowUpdate frames 619 # for each stream. 620 for streamID in streamIDs: 621 streamFrames = [ 622 f 623 for f in frames 624 if f.stream_id == streamID 625 and not isinstance(f, hyperframe.frame.WindowUpdateFrame) 626 ] 627 628 self.assertEqual(len(streamFrames), 3) 629 630 self.assertEqual( 631 dict(streamFrames[0].data), dict(self.postResponseHeaders) 632 ) 633 self.assertEqual(streamFrames[1].data, self.postResponseData) 634 self.assertEqual(streamFrames[2].data, b"") 635 self.assertTrue("END_STREAM" in streamFrames[2].flags) 636 637 return defer.DeferredList(list(a._streamCleanupCallbacks.values())).addCallback( 638 validate 639 ) 640 641 def test_sendAccordingToPriority(self): 642 """ 643 Data in responses is interleaved according to HTTP/2 priorities. 644 """ 645 # We want to start three parallel GET requests that will each return 646 # four chunks of data. These chunks will be interleaved according to 647 # HTTP/2 priorities. Stream 1 will be set to weight 64, Stream 3 to 648 # weight 32, and Stream 5 to weight 16 but dependent on Stream 1. 649 # That will cause data frames for these streams to be emitted in this 650 # order: 1, 3, 1, 1, 3, 1, 1, 3, 5, 3, 5, 3, 5, 5, 5. 651 # 652 # The reason there are so many frames is because the implementation 653 # interleaves stream completion according to priority order as well, 654 # because it is sent on a Data frame. 655 # 656 # This doesn't fully test priority, but tests *almost* enough of it to 657 # be worthwhile. 658 f = FrameFactory() 659 b = StringTransport() 660 a = H2Connection() 661 a.requestFactory = ChunkedHTTPHandlerProxy 662 getRequestHeaders = self.getRequestHeaders 663 getRequestHeaders[2] = (":path", "/chunked/4") 664 665 frames = [ 666 buildRequestFrames(getRequestHeaders, [], f, streamID) 667 for streamID in [1, 3, 5] 668 ] 669 670 # Set the priorities. The first two will use their HEADERS frame, the 671 # third will have a PRIORITY frame sent before the headers. 672 frames[0][0].flags.add("PRIORITY") 673 frames[0][0].stream_weight = 64 674 675 frames[1][0].flags.add("PRIORITY") 676 frames[1][0].stream_weight = 32 677 678 priorityFrame = f.buildPriorityFrame( 679 streamID=5, 680 weight=16, 681 dependsOn=1, 682 exclusive=True, 683 ) 684 frames[2].insert(0, priorityFrame) 685 686 frames = itertools.chain.from_iterable(frames) 687 requestBytes = f.clientConnectionPreface() 688 requestBytes += b"".join(frame.serialize() for frame in frames) 689 690 a.makeConnection(b) 691 # one byte at a time, to stress the implementation. 692 for byte in iterbytes(requestBytes): 693 a.dataReceived(byte) 694 695 def validate(results): 696 frames = framesFromBytes(b.value()) 697 698 # We expect 1 Settings frame for the connection, and then 6 frames 699 # per stream (1 Headers frame, 5 data frames), for a total of 19. 700 self.assertEqual(len(frames), 19) 701 702 streamIDs = [ 703 f.stream_id for f in frames if isinstance(f, hyperframe.frame.DataFrame) 704 ] 705 expectedOrder = [1, 3, 1, 1, 3, 1, 1, 3, 5, 3, 5, 3, 5, 5, 5] 706 self.assertEqual(streamIDs, expectedOrder) 707 708 return defer.DeferredList(list(a._streamCleanupCallbacks.values())).addCallback( 709 validate 710 ) 711 712 def test_protocolErrorTerminatesConnection(self): 713 """ 714 A protocol error from the remote peer terminates the connection. 715 """ 716 f = FrameFactory() 717 b = StringTransport() 718 a = H2Connection() 719 a.requestFactory = DummyHTTPHandlerProxy 720 721 # We're going to open a stream and then send a PUSH_PROMISE frame, 722 # which is forbidden. 723 requestBytes = f.clientConnectionPreface() 724 requestBytes += buildRequestBytes(self.getRequestHeaders, [], f) 725 requestBytes += f.buildPushPromiseFrame( 726 streamID=1, 727 promisedStreamID=2, 728 headers=self.getRequestHeaders, 729 flags=["END_HEADERS"], 730 ).serialize() 731 732 a.makeConnection(b) 733 # one byte at a time, to stress the implementation. 734 for byte in iterbytes(requestBytes): 735 a.dataReceived(byte) 736 737 # Check whether the transport got shut down: if it did, stop 738 # sending more data. 739 if b.disconnecting: 740 break 741 742 frames = framesFromBytes(b.value()) 743 744 # The send loop never gets to terminate the stream, but *some* data 745 # does get sent. We get a Settings frame, a Headers frame, and then the 746 # GoAway frame. 747 self.assertEqual(len(frames), 3) 748 self.assertTrue(isinstance(frames[-1], hyperframe.frame.GoAwayFrame)) 749 self.assertTrue(b.disconnecting) 750 751 def test_streamProducingData(self): 752 """ 753 The H2Stream data implements IPushProducer, and can have its data 754 production controlled by the Request if the Request chooses to. 755 """ 756 connection = H2Connection() 757 connection.requestFactory = ConsumerDummyHandlerProxy 758 _, transport = self.connectAndReceive( 759 connection, self.postRequestHeaders, self.postRequestData 760 ) 761 762 # At this point no data should have been received by the request *or* 763 # the response. We need to dig the request out of the tree of objects. 764 request = connection.streams[1]._request.original 765 self.assertFalse(request._requestReceived) 766 767 # We should have only received the Settings frame. It's important that 768 # the WindowUpdate frames don't land before data is delivered to the 769 # Request. 770 frames = framesFromBytes(transport.value()) 771 self.assertEqual(len(frames), 1) 772 773 # At this point, we can kick off the producing. This will force the 774 # H2Stream object to deliver the request data all at once, so check 775 # that it was delivered correctly. 776 request.acceptData() 777 self.assertTrue(request._requestReceived) 778 self.assertTrue(request._data, b"hello world, it's http/2!") 779 780 # *That* will have also caused the H2Connection object to emit almost 781 # all the data it needs. That'll be a Headers frame, as well as the 782 # original SETTINGS frame. 783 frames = framesFromBytes(transport.value()) 784 self.assertEqual(len(frames), 2) 785 786 def validate(streamID): 787 # Confirm that the response is ok. 788 frames = framesFromBytes(transport.value()) 789 790 # The only new frames here are the two Data frames. 791 self.assertEqual(len(frames), 4) 792 self.assertTrue("END_STREAM" in frames[-1].flags) 793 794 return connection._streamCleanupCallbacks[1].addCallback(validate) 795 796 def test_abortStreamProducingData(self): 797 """ 798 The H2Stream data implements IPushProducer, and can have its data 799 production controlled by the Request if the Request chooses to. 800 When the production is stopped, that causes the stream connection to 801 be lost. 802 """ 803 f = FrameFactory() 804 b = StringTransport() 805 a = H2Connection() 806 a.requestFactory = AbortingConsumerDummyHandlerProxy 807 808 # We're going to send in a POST request. 809 frames = buildRequestFrames(self.postRequestHeaders, self.postRequestData, f) 810 frames[-1].flags = set() # Remove END_STREAM flag. 811 requestBytes = f.clientConnectionPreface() 812 requestBytes += b"".join(f.serialize() for f in frames) 813 a.makeConnection(b) 814 # one byte at a time, to stress the implementation. 815 for byte in iterbytes(requestBytes): 816 a.dataReceived(byte) 817 818 # At this point no data should have been received by the request *or* 819 # the response. We need to dig the request out of the tree of objects. 820 request = a.streams[1]._request.original 821 self.assertFalse(request._requestReceived) 822 823 # Save off the cleanup deferred now, it'll be removed when the 824 # RstStream frame is sent. 825 cleanupCallback = a._streamCleanupCallbacks[1] 826 827 # At this point, we can kick off the production and immediate abort. 828 request.acceptData() 829 830 # The stream will now have been aborted. 831 def validate(streamID): 832 # Confirm that the response is ok. 833 frames = framesFromBytes(b.value()) 834 835 # We expect a Settings frame and a RstStream frame. 836 self.assertEqual(len(frames), 2) 837 self.assertTrue(isinstance(frames[-1], hyperframe.frame.RstStreamFrame)) 838 self.assertEqual(frames[-1].stream_id, 1) 839 840 return cleanupCallback.addCallback(validate) 841 842 def test_terminatedRequest(self): 843 """ 844 When a RstStream frame is received, the L{H2Connection} and L{H2Stream} 845 objects tear down the L{http.Request} and swallow all outstanding 846 writes. 847 """ 848 # Here we want to use the DummyProducerHandler primarily for the side 849 # effect it has of not writing to the connection. That means we can 850 # delay some writes until *after* the RstStream frame is received. 851 connection = H2Connection() 852 connection.requestFactory = DummyProducerHandlerProxy 853 frameFactory, transport = self.connectAndReceive( 854 connection, self.getRequestHeaders, [] 855 ) 856 857 # Get the request object. 858 request = connection.streams[1]._request.original 859 860 # Send two writes in. 861 request.write(b"first chunk") 862 request.write(b"second chunk") 863 864 # Save off the cleanup deferred now, it'll be removed when the 865 # RstStream frame is received. 866 cleanupCallback = connection._streamCleanupCallbacks[1] 867 868 # Now fire the RstStream frame. 869 connection.dataReceived( 870 frameFactory.buildRstStreamFrame(1, errorCode=1).serialize() 871 ) 872 873 # This should have cancelled the request. 874 self.assertTrue(request._disconnected) 875 self.assertTrue(request.channel is None) 876 877 # This should not raise an exception, the function will just return 878 # immediately, and do nothing 879 request.write(b"third chunk") 880 881 # Check that everything is fine. 882 # We expect that only the Settings and Headers frames will have been 883 # emitted. The two writes are lost because the delayed call never had 884 # another chance to execute before the RstStream frame got processed. 885 def validate(streamID): 886 frames = framesFromBytes(transport.value()) 887 888 self.assertEqual(len(frames), 2) 889 self.assertEqual(frames[1].stream_id, 1) 890 891 self.assertTrue(isinstance(frames[1], hyperframe.frame.HeadersFrame)) 892 893 return cleanupCallback.addCallback(validate) 894 895 def test_terminatedConnection(self): 896 """ 897 When a GoAway frame is received, the L{H2Connection} and L{H2Stream} 898 objects tear down all outstanding L{http.Request} objects and stop all 899 writing. 900 """ 901 # Here we want to use the DummyProducerHandler primarily for the side 902 # effect it has of not writing to the connection. That means we can 903 # delay some writes until *after* the GoAway frame is received. 904 connection = H2Connection() 905 connection.requestFactory = DummyProducerHandlerProxy 906 frameFactory, transport = self.connectAndReceive( 907 connection, self.getRequestHeaders, [] 908 ) 909 910 # Get the request object. 911 request = connection.streams[1]._request.original 912 913 # Send two writes in. 914 request.write(b"first chunk") 915 request.write(b"second chunk") 916 917 # Save off the cleanup deferred now, it'll be removed when the 918 # GoAway frame is received. 919 cleanupCallback = connection._streamCleanupCallbacks[1] 920 921 # Now fire the GoAway frame. 922 connection.dataReceived( 923 frameFactory.buildGoAwayFrame(lastStreamID=0).serialize() 924 ) 925 926 # This should have cancelled the request. 927 self.assertTrue(request._disconnected) 928 self.assertTrue(request.channel is None) 929 930 # It should also have cancelled the sending loop. 931 self.assertFalse(connection._stillProducing) 932 933 # Check that everything is fine. 934 # We expect that only the Settings and Headers frames will have been 935 # emitted. The writes are lost because the callLater never had 936 # a chance to execute before the GoAway frame got processed. 937 def validate(streamID): 938 frames = framesFromBytes(transport.value()) 939 940 self.assertEqual(len(frames), 2) 941 self.assertEqual(frames[1].stream_id, 1) 942 943 self.assertTrue(isinstance(frames[1], hyperframe.frame.HeadersFrame)) 944 945 return cleanupCallback.addCallback(validate) 946 947 def test_respondWith100Continue(self): 948 """ 949 Requests containing Expect: 100-continue cause provisional 100 950 responses to be emitted. 951 """ 952 connection = H2Connection() 953 connection.requestFactory = DummyHTTPHandlerProxy 954 955 # Add Expect: 100-continue for this request. 956 headers = self.getRequestHeaders + [(b"expect", b"100-continue")] 957 958 _, transport = self.connectAndReceive(connection, headers, []) 959 960 # We expect 5 frames now: Settings, two Headers frames, and two Data 961 # frames. We're only really interested in validating the first Headers 962 # frame which contains the 100. 963 def validate(streamID): 964 frames = framesFromBytes(transport.value()) 965 966 self.assertEqual(len(frames), 5) 967 self.assertTrue(all(f.stream_id == 1 for f in frames[1:])) 968 969 self.assertTrue(isinstance(frames[1], hyperframe.frame.HeadersFrame)) 970 self.assertEqual(frames[1].data, [(b":status", b"100")]) 971 self.assertTrue("END_STREAM" in frames[-1].flags) 972 973 return connection._streamCleanupCallbacks[1].addCallback(validate) 974 975 def test_respondWith400(self): 976 """ 977 Triggering the call to L{H2Stream._respondToBadRequestAndDisconnect} 978 leads to a 400 error being sent automatically and the stream being torn 979 down. 980 """ 981 # The only "natural" way to trigger this in the current codebase is to 982 # send a multipart/form-data request that the cgi module doesn't like. 983 # That's absurdly hard, so instead we'll just call it ourselves. For 984 # this reason we use the DummyProducerHandler, which doesn't write the 985 # headers straight away. 986 connection = H2Connection() 987 connection.requestFactory = DummyProducerHandlerProxy 988 _, transport = self.connectAndReceive(connection, self.getRequestHeaders, []) 989 990 # Grab the request and the completion callback. 991 stream = connection.streams[1] 992 request = stream._request.original 993 cleanupCallback = connection._streamCleanupCallbacks[1] 994 995 # Abort the stream. 996 stream._respondToBadRequestAndDisconnect() 997 998 # This should have cancelled the request. 999 self.assertTrue(request._disconnected) 1000 self.assertTrue(request.channel is None) 1001 1002 # We expect 2 frames Settings and the 400 Headers. 1003 def validate(streamID): 1004 frames = framesFromBytes(transport.value()) 1005 1006 self.assertEqual(len(frames), 2) 1007 1008 self.assertTrue(isinstance(frames[1], hyperframe.frame.HeadersFrame)) 1009 self.assertEqual(frames[1].data, [(b":status", b"400")]) 1010 self.assertTrue("END_STREAM" in frames[-1].flags) 1011 1012 return cleanupCallback.addCallback(validate) 1013 1014 def test_loseH2StreamConnection(self): 1015 """ 1016 Calling L{Request.loseConnection} causes all data that has previously 1017 been sent to be flushed, and then the stream cleanly closed. 1018 """ 1019 # Here we again want to use the DummyProducerHandler because it doesn't 1020 # close the connection on its own. 1021 connection = H2Connection() 1022 connection.requestFactory = DummyProducerHandlerProxy 1023 _, transport = self.connectAndReceive(connection, self.getRequestHeaders, []) 1024 1025 # Grab the request. 1026 stream = connection.streams[1] 1027 request = stream._request.original 1028 1029 # Send in some writes. 1030 dataChunks = [b"hello", b"world", b"here", b"are", b"some", b"writes"] 1031 for chunk in dataChunks: 1032 request.write(chunk) 1033 1034 # Now lose the connection. 1035 request.loseConnection() 1036 1037 # Check that the data was all written out correctly and that the stream 1038 # state is cleaned up. 1039 def validate(streamID): 1040 frames = framesFromBytes(transport.value()) 1041 1042 # Settings, Headers, 7 Data frames. 1043 self.assertEqual(len(frames), 9) 1044 self.assertTrue(all(f.stream_id == 1 for f in frames[1:])) 1045 1046 self.assertTrue(isinstance(frames[1], hyperframe.frame.HeadersFrame)) 1047 self.assertTrue("END_STREAM" in frames[-1].flags) 1048 1049 receivedDataChunks = [ 1050 f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame) 1051 ] 1052 self.assertEqual( 1053 receivedDataChunks, 1054 dataChunks + [b""], 1055 ) 1056 1057 return connection._streamCleanupCallbacks[1].addCallback(validate) 1058 1059 def test_cannotRegisterTwoProducers(self): 1060 """ 1061 The L{H2Stream} object forbids registering two producers. 1062 """ 1063 connection = H2Connection() 1064 connection.requestFactory = DummyProducerHandlerProxy 1065 self.connectAndReceive(connection, self.getRequestHeaders, []) 1066 1067 # Grab the request. 1068 stream = connection.streams[1] 1069 request = stream._request.original 1070 1071 self.assertRaises(ValueError, stream.registerProducer, request, True) 1072 1073 def test_handlesPullProducer(self): 1074 """ 1075 L{Request} objects that have registered pull producers get blocked and 1076 unblocked according to HTTP/2 flow control. 1077 """ 1078 connection = H2Connection() 1079 connection.requestFactory = DummyPullProducerHandlerProxy 1080 _, transport = self.connectAndReceive(connection, self.getRequestHeaders, []) 1081 1082 # Get the producer completion deferred and ensure we call 1083 # request.finish. 1084 stream = connection.streams[1] 1085 request = stream._request.original 1086 producerComplete = request._actualProducer.result 1087 producerComplete.addCallback(lambda x: request.finish()) 1088 1089 # Check that the sending loop sends all the appropriate data. 1090 def validate(streamID): 1091 frames = framesFromBytes(transport.value()) 1092 1093 # Check that the stream is correctly terminated. 1094 self.assertTrue("END_STREAM" in frames[-1].flags) 1095 1096 # Grab the data from the frames. 1097 dataChunks = [ 1098 f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame) 1099 ] 1100 self.assertEqual( 1101 dataChunks, 1102 [b"0", b"1", b"2", b"3", b"4", b"5", b"6", b"7", b"8", b"9", b""], 1103 ) 1104 1105 return connection._streamCleanupCallbacks[1].addCallback(validate) 1106 1107 def test_isSecureWorksProperly(self): 1108 """ 1109 L{Request} objects can correctly ask isSecure on HTTP/2. 1110 """ 1111 connection = H2Connection() 1112 connection.requestFactory = DelayedHTTPHandlerProxy 1113 self.connectAndReceive(connection, self.getRequestHeaders, []) 1114 1115 request = connection.streams[1]._request.original 1116 self.assertFalse(request.isSecure()) 1117 connection.streams[1].abortConnection() 1118 1119 def test_lateCompletionWorks(self): 1120 """ 1121 L{H2Connection} correctly unblocks when a stream is ended. 1122 """ 1123 connection = H2Connection() 1124 connection.requestFactory = DelayedHTTPHandlerProxy 1125 _, transport = self.connectAndReceive(connection, self.getRequestHeaders, []) 1126 1127 # Delay a call to end request, forcing the connection to block because 1128 # it has no data to send. 1129 request = connection.streams[1]._request.original 1130 reactor.callLater(0.01, request.finish) 1131 1132 def validateComplete(*args): 1133 frames = framesFromBytes(transport.value()) 1134 1135 # Check that the stream is correctly terminated. 1136 self.assertEqual(len(frames), 3) 1137 self.assertTrue("END_STREAM" in frames[-1].flags) 1138 1139 return connection._streamCleanupCallbacks[1].addCallback(validateComplete) 1140 1141 def test_writeSequenceForChannels(self): 1142 """ 1143 L{H2Stream} objects can send a series of frames via C{writeSequence}. 1144 """ 1145 connection = H2Connection() 1146 connection.requestFactory = DelayedHTTPHandlerProxy 1147 _, transport = self.connectAndReceive(connection, self.getRequestHeaders, []) 1148 1149 stream = connection.streams[1] 1150 request = stream._request.original 1151 1152 request.setResponseCode(200) 1153 stream.writeSequence([b"Hello", b",", b"world!"]) 1154 request.finish() 1155 1156 completionDeferred = connection._streamCleanupCallbacks[1] 1157 1158 def validate(streamID): 1159 frames = framesFromBytes(transport.value()) 1160 1161 # Check that the stream is correctly terminated. 1162 self.assertTrue("END_STREAM" in frames[-1].flags) 1163 1164 # Grab the data from the frames. 1165 dataChunks = [ 1166 f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame) 1167 ] 1168 self.assertEqual(dataChunks, [b"Hello", b",", b"world!", b""]) 1169 1170 return completionDeferred.addCallback(validate) 1171 1172 def test_delayWrites(self): 1173 """ 1174 Delaying writes from L{Request} causes the L{H2Connection} to block on 1175 sending until data is available. However, data is *not* sent if there's 1176 no room in the flow control window. 1177 """ 1178 # Here we again want to use the DummyProducerHandler because it doesn't 1179 # close the connection on its own. 1180 f = FrameFactory() 1181 b = StringTransport() 1182 a = H2Connection() 1183 a.requestFactory = DelayedHTTPHandlerProxy 1184 1185 requestBytes = f.clientConnectionPreface() 1186 requestBytes += f.buildSettingsFrame( 1187 {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5} 1188 ).serialize() 1189 requestBytes += buildRequestBytes(self.getRequestHeaders, [], f) 1190 a.makeConnection(b) 1191 # one byte at a time, to stress the implementation. 1192 for byte in iterbytes(requestBytes): 1193 a.dataReceived(byte) 1194 1195 # Grab the request. 1196 stream = a.streams[1] 1197 request = stream._request.original 1198 1199 # Write the first 5 bytes. 1200 request.write(b"fiver") 1201 dataChunks = [b"here", b"are", b"some", b"writes"] 1202 1203 def write_chunks(): 1204 # Send in some writes. 1205 for chunk in dataChunks: 1206 request.write(chunk) 1207 request.finish() 1208 1209 d = task.deferLater(reactor, 0.01, write_chunks) 1210 d.addCallback( 1211 lambda *args: a.dataReceived( 1212 f.buildWindowUpdateFrame(streamID=1, increment=50).serialize() 1213 ) 1214 ) 1215 1216 # Check that the data was all written out correctly and that the stream 1217 # state is cleaned up. 1218 def validate(streamID): 1219 frames = framesFromBytes(b.value()) 1220 1221 # 2 Settings, Headers, 7 Data frames. 1222 self.assertEqual(len(frames), 9) 1223 self.assertTrue(all(f.stream_id == 1 for f in frames[2:])) 1224 1225 self.assertTrue(isinstance(frames[2], hyperframe.frame.HeadersFrame)) 1226 self.assertTrue("END_STREAM" in frames[-1].flags) 1227 1228 receivedDataChunks = [ 1229 f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame) 1230 ] 1231 self.assertEqual( 1232 receivedDataChunks, 1233 [b"fiver"] + dataChunks + [b""], 1234 ) 1235 1236 return a._streamCleanupCallbacks[1].addCallback(validate) 1237 1238 def test_resetAfterBody(self): 1239 """ 1240 A client that immediately resets after sending the body causes Twisted 1241 to send no response. 1242 """ 1243 frameFactory = FrameFactory() 1244 transport = StringTransport() 1245 a = H2Connection() 1246 a.requestFactory = DummyHTTPHandlerProxy 1247 1248 requestBytes = frameFactory.clientConnectionPreface() 1249 requestBytes += buildRequestBytes( 1250 headers=self.getRequestHeaders, data=[], frameFactory=frameFactory 1251 ) 1252 requestBytes += frameFactory.buildRstStreamFrame(streamID=1).serialize() 1253 a.makeConnection(transport) 1254 a.dataReceived(requestBytes) 1255 1256 frames = framesFromBytes(transport.value()) 1257 1258 self.assertEqual(len(frames), 1) 1259 self.assertNotIn(1, a._streamCleanupCallbacks) 1260 1261 def test_RequestRequiringFactorySiteInConstructor(self): 1262 """ 1263 A custom L{Request} subclass that requires the site and factory in the 1264 constructor is able to get them. 1265 """ 1266 d = defer.Deferred() 1267 1268 class SuperRequest(DummyHTTPHandler): 1269 def __init__(self, *args, **kwargs): 1270 DummyHTTPHandler.__init__(self, *args, **kwargs) 1271 d.callback((self.channel.site, self.channel.factory)) 1272 1273 connection = H2Connection() 1274 httpFactory = http.HTTPFactory() 1275 connection.requestFactory = _makeRequestProxyFactory(SuperRequest) 1276 1277 # Create some sentinels to look for. 1278 connection.factory = httpFactory 1279 connection.site = object() 1280 1281 self.connectAndReceive(connection, self.getRequestHeaders, []) 1282 1283 def validateFactoryAndSite(args): 1284 site, factory = args 1285 self.assertIs(site, connection.site) 1286 self.assertIs(factory, connection.factory) 1287 1288 d.addCallback(validateFactoryAndSite) 1289 1290 # We need to wait for the stream cleanup callback to drain the 1291 # response. 1292 cleanupCallback = connection._streamCleanupCallbacks[1] 1293 return defer.gatherResults([d, cleanupCallback]) 1294 1295 def test_notifyOnCompleteRequest(self): 1296 """ 1297 A request sent to a HTTP/2 connection fires the 1298 L{http.Request.notifyFinish} callback with a L{None} value. 1299 """ 1300 connection = H2Connection() 1301 connection.requestFactory = NotifyingRequestFactory(DummyHTTPHandler) 1302 _, transport = self.connectAndReceive(connection, self.getRequestHeaders, []) 1303 1304 deferreds = connection.requestFactory.results 1305 self.assertEqual(len(deferreds), 1) 1306 1307 def validate(result): 1308 self.assertIsNone(result) 1309 1310 d = deferreds[0] 1311 d.addCallback(validate) 1312 1313 # We need to wait for the stream cleanup callback to drain the 1314 # response. 1315 cleanupCallback = connection._streamCleanupCallbacks[1] 1316 return defer.gatherResults([d, cleanupCallback]) 1317 1318 def test_notifyOnResetStream(self): 1319 """ 1320 A HTTP/2 reset stream fires the L{http.Request.notifyFinish} deferred 1321 with L{ConnectionLost}. 1322 """ 1323 connection = H2Connection() 1324 connection.requestFactory = NotifyingRequestFactory(DelayedHTTPHandler) 1325 frameFactory, transport = self.connectAndReceive( 1326 connection, self.getRequestHeaders, [] 1327 ) 1328 1329 deferreds = connection.requestFactory.results 1330 self.assertEqual(len(deferreds), 1) 1331 1332 # We need this to errback with a Failure indicating the RSTSTREAM 1333 # frame. 1334 def callback(ign): 1335 self.fail("Didn't errback, called back instead") 1336 1337 def errback(reason): 1338 self.assertIsInstance(reason, failure.Failure) 1339 self.assertIs(reason.type, error.ConnectionLost) 1340 return None # Trap the error 1341 1342 d = deferreds[0] 1343 d.addCallbacks(callback, errback) 1344 1345 # Now send the RSTSTREAM frame. 1346 invalidData = frameFactory.buildRstStreamFrame(streamID=1).serialize() 1347 connection.dataReceived(invalidData) 1348 1349 return d 1350 1351 def test_failWithProtocolError(self): 1352 """ 1353 A HTTP/2 protocol error triggers the L{http.Request.notifyFinish} 1354 deferred for all outstanding requests with a Failure that contains the 1355 underlying exception. 1356 """ 1357 # We need to set up two requests concurrently so that we can validate 1358 # that these all fail. connectAndReceive will set up one: we will need 1359 # to manually send the rest. 1360 connection = H2Connection() 1361 connection.requestFactory = NotifyingRequestFactory(DelayedHTTPHandler) 1362 frameFactory, transport = self.connectAndReceive( 1363 connection, self.getRequestHeaders, [] 1364 ) 1365 1366 secondRequest = buildRequestBytes( 1367 self.getRequestHeaders, [], frameFactory=frameFactory, streamID=3 1368 ) 1369 connection.dataReceived(secondRequest) 1370 1371 # Now we want to grab the deferreds from the notifying factory. 1372 deferreds = connection.requestFactory.results 1373 self.assertEqual(len(deferreds), 2) 1374 1375 # We need these to errback with a Failure representing the 1376 # ProtocolError. 1377 def callback(ign): 1378 self.fail("Didn't errback, called back instead") 1379 1380 def errback(reason): 1381 self.assertIsInstance(reason, failure.Failure) 1382 self.assertIsInstance(reason.value, h2.exceptions.ProtocolError) 1383 return None # Trap the error 1384 1385 for d in deferreds: 1386 d.addCallbacks(callback, errback) 1387 1388 # Now trigger the protocol error. The easiest protocol error to trigger 1389 # is to send a data frame for a non-existent stream. 1390 invalidData = frameFactory.buildDataFrame(data=b"yo", streamID=0xF0).serialize() 1391 connection.dataReceived(invalidData) 1392 1393 return defer.gatherResults(deferreds) 1394 1395 def test_failOnGoaway(self): 1396 """ 1397 A HTTP/2 GoAway triggers the L{http.Request.notifyFinish} 1398 deferred for all outstanding requests with a Failure that contains a 1399 RemoteGoAway error. 1400 """ 1401 # We need to set up two requests concurrently so that we can validate 1402 # that these all fail. connectAndReceive will set up one: we will need 1403 # to manually send the rest. 1404 connection = H2Connection() 1405 connection.requestFactory = NotifyingRequestFactory(DelayedHTTPHandler) 1406 frameFactory, transport = self.connectAndReceive( 1407 connection, self.getRequestHeaders, [] 1408 ) 1409 1410 secondRequest = buildRequestBytes( 1411 self.getRequestHeaders, [], frameFactory=frameFactory, streamID=3 1412 ) 1413 connection.dataReceived(secondRequest) 1414 1415 # Now we want to grab the deferreds from the notifying factory. 1416 deferreds = connection.requestFactory.results 1417 self.assertEqual(len(deferreds), 2) 1418 1419 # We need these to errback with a Failure indicating the GOAWAY frame. 1420 def callback(ign): 1421 self.fail("Didn't errback, called back instead") 1422 1423 def errback(reason): 1424 self.assertIsInstance(reason, failure.Failure) 1425 self.assertIs(reason.type, error.ConnectionLost) 1426 return None # Trap the error 1427 1428 for d in deferreds: 1429 d.addCallbacks(callback, errback) 1430 1431 # Now send the GoAway frame. 1432 invalidData = frameFactory.buildGoAwayFrame(lastStreamID=3).serialize() 1433 connection.dataReceived(invalidData) 1434 1435 return defer.gatherResults(deferreds) 1436 1437 def test_failOnStopProducing(self): 1438 """ 1439 The transport telling the HTTP/2 connection to stop producing will 1440 fire all L{http.Request.notifyFinish} errbacks with L{error.} 1441 """ 1442 # We need to set up two requests concurrently so that we can validate 1443 # that these all fail. connectAndReceive will set up one: we will need 1444 # to manually send the rest. 1445 connection = H2Connection() 1446 connection.requestFactory = NotifyingRequestFactory(DelayedHTTPHandler) 1447 frameFactory, transport = self.connectAndReceive( 1448 connection, self.getRequestHeaders, [] 1449 ) 1450 1451 secondRequest = buildRequestBytes( 1452 self.getRequestHeaders, [], frameFactory=frameFactory, streamID=3 1453 ) 1454 connection.dataReceived(secondRequest) 1455 1456 # Now we want to grab the deferreds from the notifying factory. 1457 deferreds = connection.requestFactory.results 1458 self.assertEqual(len(deferreds), 2) 1459 1460 # We need these to errback with a Failure indicating the consumer 1461 # aborted our data production. 1462 def callback(ign): 1463 self.fail("Didn't errback, called back instead") 1464 1465 def errback(reason): 1466 self.assertIsInstance(reason, failure.Failure) 1467 self.assertIs(reason.type, error.ConnectionLost) 1468 return None # Trap the error 1469 1470 for d in deferreds: 1471 d.addCallbacks(callback, errback) 1472 1473 # Now call stopProducing. 1474 connection.stopProducing() 1475 1476 return defer.gatherResults(deferreds) 1477 1478 def test_notifyOnFast400(self): 1479 """ 1480 A HTTP/2 stream that has had _respondToBadRequestAndDisconnect called 1481 on it from a request handler calls the L{http.Request.notifyFinish} 1482 errback with L{ConnectionLost}. 1483 """ 1484 connection = H2Connection() 1485 connection.requestFactory = NotifyingRequestFactory(DelayedHTTPHandler) 1486 frameFactory, transport = self.connectAndReceive( 1487 connection, self.getRequestHeaders, [] 1488 ) 1489 1490 deferreds = connection.requestFactory.results 1491 self.assertEqual(len(deferreds), 1) 1492 1493 # We need this to errback with a Failure indicating the loss of the 1494 # connection. 1495 def callback(ign): 1496 self.fail("Didn't errback, called back instead") 1497 1498 def errback(reason): 1499 self.assertIsInstance(reason, failure.Failure) 1500 self.assertIs(reason.type, error.ConnectionLost) 1501 return None # Trap the error 1502 1503 d = deferreds[0] 1504 d.addCallbacks(callback, errback) 1505 1506 # Abort the stream. The only "natural" way to trigger this in the 1507 # current codebase is to send a multipart/form-data request that the 1508 # cgi module doesn't like. 1509 # That's absurdly hard, so instead we'll just call it ourselves. For 1510 # this reason we use the DummyProducerHandler, which doesn't write the 1511 # headers straight away. 1512 stream = connection.streams[1] 1513 stream._respondToBadRequestAndDisconnect() 1514 1515 return d 1516 1517 def test_fast400WithCircuitBreaker(self): 1518 """ 1519 A HTTP/2 stream that has had _respondToBadRequestAndDisconnect 1520 called on it does not write control frame data if its 1521 transport is paused and its control frame limit has been 1522 reached. 1523 """ 1524 # Set the connection up. 1525 memoryReactor = MemoryReactorClock() 1526 connection = H2Connection(memoryReactor) 1527 connection.callLater = memoryReactor.callLater 1528 # Use the DelayedHTTPHandler to prevent the connection from 1529 # writing any response bytes after receiving a request that 1530 # establishes the stream. 1531 connection.requestFactory = DelayedHTTPHandler 1532 1533 streamID = 1 1534 1535 frameFactory = FrameFactory() 1536 transport = StringTransport() 1537 1538 # Establish the connection 1539 clientConnectionPreface = frameFactory.clientConnectionPreface() 1540 connection.makeConnection(transport) 1541 connection.dataReceived(clientConnectionPreface) 1542 # Establish the stream. 1543 connection.dataReceived( 1544 buildRequestBytes( 1545 self.getRequestHeaders, [], frameFactory, streamID=streamID 1546 ) 1547 ) 1548 1549 # Pause the connection and limit the number of outbound bytes 1550 # to 0, so that attempting to send the 400 aborts the 1551 # connection. 1552 connection.pauseProducing() 1553 connection._maxBufferedControlFrameBytes = 0 1554 1555 connection._respondToBadRequestAndDisconnect(streamID) 1556 1557 self.assertTrue(transport.disconnected) 1558 1559 def test_bufferingAutomaticFrameData(self): 1560 """ 1561 If a the L{H2Connection} has been paused by the transport, it will 1562 not write automatic frame data triggered by writes. 1563 """ 1564 # Set the connection up. 1565 connection = H2Connection() 1566 connection.requestFactory = DummyHTTPHandlerProxy 1567 frameFactory = FrameFactory() 1568 transport = StringTransport() 1569 1570 clientConnectionPreface = frameFactory.clientConnectionPreface() 1571 connection.makeConnection(transport) 1572 connection.dataReceived(clientConnectionPreface) 1573 1574 # Now we're going to pause the producer. 1575 connection.pauseProducing() 1576 1577 # Now we're going to send a bunch of empty SETTINGS frames. This 1578 # should not cause writes. 1579 for _ in range(0, 100): 1580 connection.dataReceived(frameFactory.buildSettingsFrame({}).serialize()) 1581 1582 frames = framesFromBytes(transport.value()) 1583 self.assertEqual(len(frames), 1) 1584 1585 # Re-enable the transport. 1586 connection.resumeProducing() 1587 frames = framesFromBytes(transport.value()) 1588 self.assertEqual(len(frames), 101) 1589 1590 def test_bufferingAutomaticFrameDataWithCircuitBreaker(self): 1591 """ 1592 If the L{H2Connection} has been paused by the transport, it will 1593 not write automatic frame data triggered by writes. If this buffer 1594 gets too large, the connection will be dropped. 1595 """ 1596 # Set the connection up. 1597 connection = H2Connection() 1598 connection.requestFactory = DummyHTTPHandlerProxy 1599 frameFactory = FrameFactory() 1600 transport = StringTransport() 1601 1602 clientConnectionPreface = frameFactory.clientConnectionPreface() 1603 connection.makeConnection(transport) 1604 connection.dataReceived(clientConnectionPreface) 1605 1606 # Now we're going to pause the producer. 1607 connection.pauseProducing() 1608 1609 # Now we're going to limit the outstanding buffered bytes to 1610 # 100 bytes. 1611 connection._maxBufferedControlFrameBytes = 100 1612 1613 # Now we're going to send 11 empty SETTINGS frames. This 1614 # should not cause writes, or a close. 1615 self.assertFalse(transport.disconnecting) 1616 for _ in range(0, 11): 1617 connection.dataReceived(frameFactory.buildSettingsFrame({}).serialize()) 1618 self.assertFalse(transport.disconnecting) 1619 1620 # Send a last settings frame, which will push us over the buffer limit. 1621 connection.dataReceived(frameFactory.buildSettingsFrame({}).serialize()) 1622 self.assertTrue(transport.disconnected) 1623 1624 def test_bufferingContinuesIfProducerIsPausedOnWrite(self): 1625 """ 1626 If the L{H2Connection} has buffered control frames, is unpaused, and then 1627 paused while unbuffering, it persists the buffer and stops trying to write. 1628 """ 1629 1630 class AutoPausingStringTransport(StringTransport): 1631 def write(self, *args, **kwargs): 1632 StringTransport.write(self, *args, **kwargs) 1633 self.producer.pauseProducing() 1634 1635 # Set the connection up. 1636 connection = H2Connection() 1637 connection.requestFactory = DummyHTTPHandlerProxy 1638 frameFactory = FrameFactory() 1639 transport = AutoPausingStringTransport() 1640 transport.registerProducer(connection, True) 1641 1642 clientConnectionPreface = frameFactory.clientConnectionPreface() 1643 connection.makeConnection(transport) 1644 connection.dataReceived(clientConnectionPreface) 1645 1646 # The connection should already be paused. 1647 self.assertIsNotNone(connection._consumerBlocked) 1648 frames = framesFromBytes(transport.value()) 1649 self.assertEqual(len(frames), 1) 1650 self.assertEqual(connection._bufferedControlFrameBytes, 0) 1651 1652 # Now we're going to send 11 empty SETTINGS frames. This should produce 1653 # no output, but some buffered settings ACKs. 1654 for _ in range(0, 11): 1655 connection.dataReceived(frameFactory.buildSettingsFrame({}).serialize()) 1656 1657 frames = framesFromBytes(transport.value()) 1658 self.assertEqual(len(frames), 1) 1659 self.assertEqual(connection._bufferedControlFrameBytes, 9 * 11) 1660 1661 # Ok, now we're going to unpause the producer. This should write only one of the 1662 # SETTINGS ACKs, as the connection gets repaused. 1663 connection.resumeProducing() 1664 1665 frames = framesFromBytes(transport.value()) 1666 self.assertEqual(len(frames), 2) 1667 self.assertEqual(connection._bufferedControlFrameBytes, 9 * 10) 1668 1669 def test_circuitBreakerAbortsAfterProtocolError(self): 1670 """ 1671 A client that triggers a L{h2.exceptions.ProtocolError} over a 1672 paused connection that's reached its buffered control frame 1673 limit causes that connection to be aborted. 1674 """ 1675 memoryReactor = MemoryReactorClock() 1676 connection = H2Connection(memoryReactor) 1677 connection.callLater = memoryReactor.callLater 1678 1679 frameFactory = FrameFactory() 1680 transport = StringTransport() 1681 1682 # Establish the connection. 1683 clientConnectionPreface = frameFactory.clientConnectionPreface() 1684 connection.makeConnection(transport) 1685 connection.dataReceived(clientConnectionPreface) 1686 1687 # Pause it and limit the number of outbound bytes to 0, so 1688 # that a ProtocolError aborts the connection 1689 connection.pauseProducing() 1690 connection._maxBufferedControlFrameBytes = 0 1691 1692 # Trigger a ProtocolError with a data frame that refers to an 1693 # unknown stream. 1694 invalidData = frameFactory.buildDataFrame(data=b"yo", streamID=0xF0).serialize() 1695 1696 # The frame should have aborted the connection. 1697 connection.dataReceived(invalidData) 1698 self.assertTrue(transport.disconnected) 1699 1700 1701class H2FlowControlTests(unittest.TestCase, HTTP2TestHelpers): 1702 """ 1703 Tests that ensure that we handle HTTP/2 flow control limits appropriately. 1704 """ 1705 1706 getRequestHeaders = [ 1707 (b":method", b"GET"), 1708 (b":authority", b"localhost"), 1709 (b":path", b"/"), 1710 (b":scheme", b"https"), 1711 (b"user-agent", b"twisted-test-code"), 1712 ] 1713 1714 getResponseData = b"'''\nNone\n'''\n" 1715 1716 postRequestHeaders = [ 1717 (b":method", b"POST"), 1718 (b":authority", b"localhost"), 1719 (b":path", b"/post_endpoint"), 1720 (b":scheme", b"https"), 1721 (b"user-agent", b"twisted-test-code"), 1722 (b"content-length", b"25"), 1723 ] 1724 1725 postRequestData = [b"hello ", b"world, ", b"it's ", b"http/2!"] 1726 1727 postResponseData = b"'''\n25\nhello world, it's http/2!'''\n" 1728 1729 def test_bufferExcessData(self): 1730 """ 1731 When a L{Request} object is not using C{IProducer} to generate data and 1732 so is not having backpressure exerted on it, the L{H2Stream} object 1733 will buffer data until the flow control window is opened. 1734 """ 1735 f = FrameFactory() 1736 b = StringTransport() 1737 a = H2Connection() 1738 a.requestFactory = DummyHTTPHandlerProxy 1739 1740 # Shrink the window to 5 bytes, then send the request. 1741 requestBytes = f.clientConnectionPreface() 1742 requestBytes += f.buildSettingsFrame( 1743 {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5} 1744 ).serialize() 1745 requestBytes += buildRequestBytes(self.getRequestHeaders, [], f) 1746 a.makeConnection(b) 1747 # one byte at a time, to stress the implementation. 1748 for byte in iterbytes(requestBytes): 1749 a.dataReceived(byte) 1750 1751 # Send in WindowUpdate frames that open the window one byte at a time, 1752 # to repeatedly temporarily unbuffer data. 5 bytes will have already 1753 # been sent. 1754 bonusFrames = len(self.getResponseData) - 5 1755 for _ in range(bonusFrames): 1756 frame = f.buildWindowUpdateFrame(streamID=1, increment=1) 1757 a.dataReceived(frame.serialize()) 1758 1759 # Give the sending loop a chance to catch up! 1760 def validate(streamID): 1761 frames = framesFromBytes(b.value()) 1762 1763 # Check that the stream is correctly terminated. 1764 self.assertTrue("END_STREAM" in frames[-1].flags) 1765 1766 # Put the Data frames together to confirm we're all good. 1767 actualResponseData = b"".join( 1768 f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame) 1769 ) 1770 self.assertEqual(self.getResponseData, actualResponseData) 1771 1772 return a._streamCleanupCallbacks[1].addCallback(validate) 1773 1774 def test_producerBlockingUnblocking(self): 1775 """ 1776 L{Request} objects that have registered producers get blocked and 1777 unblocked according to HTTP/2 flow control. 1778 """ 1779 f = FrameFactory() 1780 b = StringTransport() 1781 a = H2Connection() 1782 a.requestFactory = DummyProducerHandlerProxy 1783 1784 # Shrink the window to 5 bytes, then send the request. 1785 requestBytes = f.clientConnectionPreface() 1786 requestBytes += f.buildSettingsFrame( 1787 {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5} 1788 ).serialize() 1789 requestBytes += buildRequestBytes(self.getRequestHeaders, [], f) 1790 a.makeConnection(b) 1791 # one byte at a time, to stress the implementation. 1792 for byte in iterbytes(requestBytes): 1793 a.dataReceived(byte) 1794 1795 # Grab the request object. 1796 stream = a.streams[1] 1797 request = stream._request.original 1798 1799 # Confirm that the stream believes the producer is producing. 1800 self.assertTrue(stream._producerProducing) 1801 1802 # Write 10 bytes to the connection. 1803 request.write(b"helloworld") 1804 1805 # The producer should have been paused. 1806 self.assertFalse(stream._producerProducing) 1807 self.assertEqual(request.producer.events, ["pause"]) 1808 1809 # Open the flow control window by 5 bytes. This should not unpause the 1810 # producer. 1811 a.dataReceived(f.buildWindowUpdateFrame(streamID=1, increment=5).serialize()) 1812 self.assertFalse(stream._producerProducing) 1813 self.assertEqual(request.producer.events, ["pause"]) 1814 1815 # Open the connection window by 5 bytes as well. This should also not 1816 # unpause the producer. 1817 a.dataReceived(f.buildWindowUpdateFrame(streamID=0, increment=5).serialize()) 1818 self.assertFalse(stream._producerProducing) 1819 self.assertEqual(request.producer.events, ["pause"]) 1820 1821 # Open it by five more bytes. This should unpause the producer. 1822 a.dataReceived(f.buildWindowUpdateFrame(streamID=1, increment=5).serialize()) 1823 self.assertTrue(stream._producerProducing) 1824 self.assertEqual(request.producer.events, ["pause", "resume"]) 1825 1826 # Write another 10 bytes, which should force us to pause again. When 1827 # written this chunk will be sent as one lot, simply because of the 1828 # fact that the sending loop is not currently running. 1829 request.write(b"helloworld") 1830 self.assertFalse(stream._producerProducing) 1831 self.assertEqual(request.producer.events, ["pause", "resume", "pause"]) 1832 1833 # Open the window wide and then complete the request. 1834 a.dataReceived(f.buildWindowUpdateFrame(streamID=1, increment=50).serialize()) 1835 self.assertTrue(stream._producerProducing) 1836 self.assertEqual( 1837 request.producer.events, ["pause", "resume", "pause", "resume"] 1838 ) 1839 request.unregisterProducer() 1840 request.finish() 1841 1842 # Check that the sending loop sends all the appropriate data. 1843 def validate(streamID): 1844 frames = framesFromBytes(b.value()) 1845 1846 # Check that the stream is correctly terminated. 1847 self.assertTrue("END_STREAM" in frames[-1].flags) 1848 1849 # Grab the data from the frames. 1850 dataChunks = [ 1851 f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame) 1852 ] 1853 self.assertEqual(dataChunks, [b"helloworld", b"helloworld", b""]) 1854 1855 return a._streamCleanupCallbacks[1].addCallback(validate) 1856 1857 def test_flowControlExact(self): 1858 """ 1859 Exactly filling the flow control window still blocks producers. 1860 """ 1861 f = FrameFactory() 1862 b = StringTransport() 1863 a = H2Connection() 1864 a.requestFactory = DummyProducerHandlerProxy 1865 1866 # Shrink the window to 5 bytes, then send the request. 1867 requestBytes = f.clientConnectionPreface() 1868 requestBytes += f.buildSettingsFrame( 1869 {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5} 1870 ).serialize() 1871 requestBytes += buildRequestBytes(self.getRequestHeaders, [], f) 1872 a.makeConnection(b) 1873 # one byte at a time, to stress the implementation. 1874 for byte in iterbytes(requestBytes): 1875 a.dataReceived(byte) 1876 1877 # Grab the request object. 1878 stream = a.streams[1] 1879 request = stream._request.original 1880 1881 # Confirm that the stream believes the producer is producing. 1882 self.assertTrue(stream._producerProducing) 1883 1884 # Write 10 bytes to the connection. This should block the producer 1885 # immediately. 1886 request.write(b"helloworld") 1887 self.assertFalse(stream._producerProducing) 1888 self.assertEqual(request.producer.events, ["pause"]) 1889 1890 # Despite the producer being blocked, write one more byte. This should 1891 # not get sent or force any other data to be sent. 1892 request.write(b"h") 1893 1894 # Open the window wide and then complete the request. We do this by 1895 # means of callLater to ensure that the sending loop has time to run. 1896 def window_open(): 1897 a.dataReceived( 1898 f.buildWindowUpdateFrame(streamID=1, increment=50).serialize() 1899 ) 1900 self.assertTrue(stream._producerProducing) 1901 self.assertEqual(request.producer.events, ["pause", "resume"]) 1902 request.unregisterProducer() 1903 request.finish() 1904 1905 windowDefer = task.deferLater(reactor, 0, window_open) 1906 1907 # Check that the sending loop sends all the appropriate data. 1908 def validate(streamID): 1909 frames = framesFromBytes(b.value()) 1910 1911 # Check that the stream is correctly terminated. 1912 self.assertTrue("END_STREAM" in frames[-1].flags) 1913 1914 # Grab the data from the frames. 1915 dataChunks = [ 1916 f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame) 1917 ] 1918 self.assertEqual(dataChunks, [b"hello", b"world", b"h", b""]) 1919 1920 validateDefer = a._streamCleanupCallbacks[1].addCallback(validate) 1921 return defer.DeferredList([windowDefer, validateDefer]) 1922 1923 def test_endingBlockedStream(self): 1924 """ 1925 L{Request} objects that end a stream that is currently blocked behind 1926 flow control can still end the stream and get cleaned up. 1927 """ 1928 f = FrameFactory() 1929 b = StringTransport() 1930 a = H2Connection() 1931 a.requestFactory = DummyProducerHandlerProxy 1932 1933 # Shrink the window to 5 bytes, then send the request. 1934 requestBytes = f.clientConnectionPreface() 1935 requestBytes += f.buildSettingsFrame( 1936 {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5} 1937 ).serialize() 1938 requestBytes += buildRequestBytes(self.getRequestHeaders, [], f) 1939 a.makeConnection(b) 1940 # one byte at a time, to stress the implementation. 1941 for byte in iterbytes(requestBytes): 1942 a.dataReceived(byte) 1943 1944 # Grab the request object. 1945 stream = a.streams[1] 1946 request = stream._request.original 1947 1948 # Confirm that the stream believes the producer is producing. 1949 self.assertTrue(stream._producerProducing) 1950 1951 # Write 10 bytes to the connection, then complete the connection. 1952 request.write(b"helloworld") 1953 request.unregisterProducer() 1954 request.finish() 1955 1956 # This should have completed the request. 1957 self.assertTrue(request.finished) 1958 1959 # Open the window wide and then complete the request. 1960 reactor.callLater( 1961 0, 1962 a.dataReceived, 1963 f.buildWindowUpdateFrame(streamID=1, increment=50).serialize(), 1964 ) 1965 1966 # Check that the sending loop sends all the appropriate data. 1967 def validate(streamID): 1968 frames = framesFromBytes(b.value()) 1969 1970 # Check that the stream is correctly terminated. 1971 self.assertTrue("END_STREAM" in frames[-1].flags) 1972 1973 # Grab the data from the frames. 1974 dataChunks = [ 1975 f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame) 1976 ] 1977 self.assertEqual(dataChunks, [b"hello", b"world", b""]) 1978 1979 return a._streamCleanupCallbacks[1].addCallback(validate) 1980 1981 def test_responseWithoutBody(self): 1982 """ 1983 We safely handle responses without bodies. 1984 """ 1985 f = FrameFactory() 1986 b = StringTransport() 1987 a = H2Connection() 1988 1989 # We use the DummyProducerHandler just because we can guarantee that it 1990 # doesn't end up with a body. 1991 a.requestFactory = DummyProducerHandlerProxy 1992 1993 # Send the request. 1994 requestBytes = f.clientConnectionPreface() 1995 requestBytes += buildRequestBytes(self.getRequestHeaders, [], f) 1996 a.makeConnection(b) 1997 # one byte at a time, to stress the implementation. 1998 for byte in iterbytes(requestBytes): 1999 a.dataReceived(byte) 2000 2001 # Grab the request object and the stream completion callback. 2002 stream = a.streams[1] 2003 request = stream._request.original 2004 cleanupCallback = a._streamCleanupCallbacks[1] 2005 2006 # Complete the connection immediately. 2007 request.unregisterProducer() 2008 request.finish() 2009 2010 # This should have completed the request. 2011 self.assertTrue(request.finished) 2012 2013 # Check that the sending loop sends all the appropriate data. 2014 def validate(streamID): 2015 frames = framesFromBytes(b.value()) 2016 2017 self.assertEqual(len(frames), 3) 2018 2019 # Check that the stream is correctly terminated. 2020 self.assertTrue("END_STREAM" in frames[-1].flags) 2021 2022 # Grab the data from the frames. 2023 dataChunks = [ 2024 f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame) 2025 ] 2026 self.assertEqual( 2027 dataChunks, 2028 [b""], 2029 ) 2030 2031 return cleanupCallback.addCallback(validate) 2032 2033 def test_windowUpdateForCompleteStream(self): 2034 """ 2035 WindowUpdate frames received after we've completed the stream are 2036 safely handled. 2037 """ 2038 # To test this with the data sending loop working the way it does, we 2039 # need to send *no* body on the response. That's unusual, but fine. 2040 f = FrameFactory() 2041 b = StringTransport() 2042 a = H2Connection() 2043 2044 # We use the DummyProducerHandler just because we can guarantee that it 2045 # doesn't end up with a body. 2046 a.requestFactory = DummyProducerHandlerProxy 2047 2048 # Send the request. 2049 requestBytes = f.clientConnectionPreface() 2050 requestBytes += buildRequestBytes(self.getRequestHeaders, [], f) 2051 a.makeConnection(b) 2052 # one byte at a time, to stress the implementation. 2053 for byte in iterbytes(requestBytes): 2054 a.dataReceived(byte) 2055 2056 # Grab the request object and the stream completion callback. 2057 stream = a.streams[1] 2058 request = stream._request.original 2059 cleanupCallback = a._streamCleanupCallbacks[1] 2060 2061 # Complete the connection immediately. 2062 request.unregisterProducer() 2063 request.finish() 2064 2065 # This should have completed the request. 2066 self.assertTrue(request.finished) 2067 2068 # Now open the flow control window a bit. This should cause no 2069 # problems. 2070 a.dataReceived(f.buildWindowUpdateFrame(streamID=1, increment=50).serialize()) 2071 2072 # Check that the sending loop sends all the appropriate data. 2073 def validate(streamID): 2074 frames = framesFromBytes(b.value()) 2075 2076 self.assertEqual(len(frames), 3) 2077 2078 # Check that the stream is correctly terminated. 2079 self.assertTrue("END_STREAM" in frames[-1].flags) 2080 2081 # Grab the data from the frames. 2082 dataChunks = [ 2083 f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame) 2084 ] 2085 self.assertEqual( 2086 dataChunks, 2087 [b""], 2088 ) 2089 2090 return cleanupCallback.addCallback(validate) 2091 2092 def test_producerUnblocked(self): 2093 """ 2094 L{Request} objects that have registered producers that are not blocked 2095 behind flow control do not have their producer notified. 2096 """ 2097 f = FrameFactory() 2098 b = StringTransport() 2099 a = H2Connection() 2100 a.requestFactory = DummyProducerHandlerProxy 2101 2102 # Shrink the window to 5 bytes, then send the request. 2103 requestBytes = f.clientConnectionPreface() 2104 requestBytes += f.buildSettingsFrame( 2105 {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5} 2106 ).serialize() 2107 requestBytes += buildRequestBytes(self.getRequestHeaders, [], f) 2108 a.makeConnection(b) 2109 # one byte at a time, to stress the implementation. 2110 for byte in iterbytes(requestBytes): 2111 a.dataReceived(byte) 2112 2113 # Grab the request object. 2114 stream = a.streams[1] 2115 request = stream._request.original 2116 2117 # Confirm that the stream believes the producer is producing. 2118 self.assertTrue(stream._producerProducing) 2119 2120 # Write 4 bytes to the connection, leaving space in the window. 2121 request.write(b"word") 2122 2123 # The producer should not have been paused. 2124 self.assertTrue(stream._producerProducing) 2125 self.assertEqual(request.producer.events, []) 2126 2127 # Open the flow control window by 5 bytes. This should not notify the 2128 # producer. 2129 a.dataReceived(f.buildWindowUpdateFrame(streamID=1, increment=5).serialize()) 2130 self.assertTrue(stream._producerProducing) 2131 self.assertEqual(request.producer.events, []) 2132 2133 # Open the window wide complete the request. 2134 request.unregisterProducer() 2135 request.finish() 2136 2137 # Check that the sending loop sends all the appropriate data. 2138 def validate(streamID): 2139 frames = framesFromBytes(b.value()) 2140 2141 # Check that the stream is correctly terminated. 2142 self.assertTrue("END_STREAM" in frames[-1].flags) 2143 2144 # Grab the data from the frames. 2145 dataChunks = [ 2146 f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame) 2147 ] 2148 self.assertEqual(dataChunks, [b"word", b""]) 2149 2150 return a._streamCleanupCallbacks[1].addCallback(validate) 2151 2152 def test_unnecessaryWindowUpdate(self): 2153 """ 2154 When a WindowUpdate frame is received for the whole connection but no 2155 data is currently waiting, nothing exciting happens. 2156 """ 2157 f = FrameFactory() 2158 b = StringTransport() 2159 a = H2Connection() 2160 a.requestFactory = DummyHTTPHandlerProxy 2161 2162 # Send the request. 2163 frames = buildRequestFrames(self.postRequestHeaders, self.postRequestData, f) 2164 frames.insert(1, f.buildWindowUpdateFrame(streamID=0, increment=5)) 2165 requestBytes = f.clientConnectionPreface() 2166 requestBytes += b"".join(f.serialize() for f in frames) 2167 a.makeConnection(b) 2168 # one byte at a time, to stress the implementation. 2169 for byte in iterbytes(requestBytes): 2170 a.dataReceived(byte) 2171 2172 # Give the sending loop a chance to catch up! 2173 def validate(streamID): 2174 frames = framesFromBytes(b.value()) 2175 2176 # Check that the stream is correctly terminated. 2177 self.assertTrue("END_STREAM" in frames[-1].flags) 2178 2179 # Put the Data frames together to confirm we're all good. 2180 actualResponseData = b"".join( 2181 f.data for f in frames if isinstance(f, hyperframe.frame.DataFrame) 2182 ) 2183 self.assertEqual(self.postResponseData, actualResponseData) 2184 2185 return a._streamCleanupCallbacks[1].addCallback(validate) 2186 2187 def test_unnecessaryWindowUpdateForStream(self): 2188 """ 2189 When a WindowUpdate frame is received for a stream but no data is 2190 currently waiting, that stream is not marked as unblocked and the 2191 priority tree continues to assert that no stream can progress. 2192 """ 2193 f = FrameFactory() 2194 transport = StringTransport() 2195 conn = H2Connection() 2196 conn.requestFactory = DummyHTTPHandlerProxy 2197 2198 # Send a request that implies a body is coming. Twisted doesn't send a 2199 # response until the entire request is received, so it won't queue any 2200 # data yet. Then, fire off a WINDOW_UPDATE frame. 2201 frames = [] 2202 frames.append(f.buildHeadersFrame(headers=self.postRequestHeaders, streamID=1)) 2203 frames.append(f.buildWindowUpdateFrame(streamID=1, increment=5)) 2204 data = f.clientConnectionPreface() 2205 data += b"".join(f.serialize() for f in frames) 2206 2207 conn.makeConnection(transport) 2208 conn.dataReceived(data) 2209 2210 self.assertAllStreamsBlocked(conn) 2211 2212 def test_windowUpdateAfterTerminate(self): 2213 """ 2214 When a WindowUpdate frame is received for a stream that has been 2215 aborted it is ignored. 2216 """ 2217 f = FrameFactory() 2218 b = StringTransport() 2219 a = H2Connection() 2220 a.requestFactory = DummyHTTPHandlerProxy 2221 2222 # Send the request. 2223 frames = buildRequestFrames(self.postRequestHeaders, self.postRequestData, f) 2224 requestBytes = f.clientConnectionPreface() 2225 requestBytes += b"".join(f.serialize() for f in frames) 2226 a.makeConnection(b) 2227 # one byte at a time, to stress the implementation. 2228 for byte in iterbytes(requestBytes): 2229 a.dataReceived(byte) 2230 2231 # Abort the connection. 2232 a.streams[1].abortConnection() 2233 2234 # Send a WindowUpdate 2235 windowUpdateFrame = f.buildWindowUpdateFrame(streamID=1, increment=5) 2236 a.dataReceived(windowUpdateFrame.serialize()) 2237 2238 # Give the sending loop a chance to catch up! 2239 frames = framesFromBytes(b.value()) 2240 2241 # Check that the stream is terminated. 2242 self.assertTrue(isinstance(frames[-1], hyperframe.frame.RstStreamFrame)) 2243 2244 def test_windowUpdateAfterComplete(self): 2245 """ 2246 When a WindowUpdate frame is received for a stream that has been 2247 completed it is ignored. 2248 """ 2249 f = FrameFactory() 2250 b = StringTransport() 2251 a = H2Connection() 2252 a.requestFactory = DummyHTTPHandlerProxy 2253 2254 # Send the request. 2255 frames = buildRequestFrames(self.postRequestHeaders, self.postRequestData, f) 2256 requestBytes = f.clientConnectionPreface() 2257 requestBytes += b"".join(f.serialize() for f in frames) 2258 a.makeConnection(b) 2259 # one byte at a time, to stress the implementation. 2260 for byte in iterbytes(requestBytes): 2261 a.dataReceived(byte) 2262 2263 def update_window(*args): 2264 # Send a WindowUpdate 2265 windowUpdateFrame = f.buildWindowUpdateFrame(streamID=1, increment=5) 2266 a.dataReceived(windowUpdateFrame.serialize()) 2267 2268 def validate(*args): 2269 # Give the sending loop a chance to catch up! 2270 frames = framesFromBytes(b.value()) 2271 2272 # Check that the stream is ended neatly. 2273 self.assertIn("END_STREAM", frames[-1].flags) 2274 2275 d = a._streamCleanupCallbacks[1].addCallback(update_window) 2276 return d.addCallback(validate) 2277 2278 def test_dataAndRstStream(self): 2279 """ 2280 When a DATA frame is received at the same time as RST_STREAM, 2281 Twisted does not send WINDOW_UPDATE frames for the stream. 2282 """ 2283 frameFactory = FrameFactory() 2284 transport = StringTransport() 2285 a = H2Connection() 2286 a.requestFactory = DummyHTTPHandlerProxy 2287 2288 # Send the request, but instead of the last frame send a RST_STREAM 2289 # frame instead. This needs to be very long to actually force the 2290 # WINDOW_UPDATE frames out. 2291 frameData = [b"\x00" * (2 ** 14)] * 4 2292 bodyLength = f"{sum(len(data) for data in frameData)}" 2293 headers = self.postRequestHeaders[:-1] + [("content-length", bodyLength)] 2294 frames = buildRequestFrames( 2295 headers=headers, data=frameData, frameFactory=frameFactory 2296 ) 2297 del frames[-1] 2298 frames.append( 2299 frameFactory.buildRstStreamFrame( 2300 streamID=1, errorCode=h2.errors.ErrorCodes.INTERNAL_ERROR 2301 ) 2302 ) 2303 2304 requestBytes = frameFactory.clientConnectionPreface() 2305 requestBytes += b"".join(f.serialize() for f in frames) 2306 a.makeConnection(transport) 2307 2308 # Feed all the bytes at once. This is important: if they arrive slowly, 2309 # Twisted doesn't have any problems. 2310 a.dataReceived(requestBytes) 2311 2312 # Check the frames we got. We expect a WINDOW_UPDATE frame only for the 2313 # connection, because Twisted knew the stream was going to be reset. 2314 frames = framesFromBytes(transport.value()) 2315 2316 # Check that the only WINDOW_UPDATE frame came for the connection. 2317 windowUpdateFrameIDs = [ 2318 f.stream_id 2319 for f in frames 2320 if isinstance(f, hyperframe.frame.WindowUpdateFrame) 2321 ] 2322 self.assertEqual([0], windowUpdateFrameIDs) 2323 2324 # While we're here: we shouldn't have received HEADERS or DATA for this 2325 # either. 2326 headersFrames = [ 2327 f for f in frames if isinstance(f, hyperframe.frame.HeadersFrame) 2328 ] 2329 dataFrames = [f for f in frames if isinstance(f, hyperframe.frame.DataFrame)] 2330 self.assertFalse(headersFrames) 2331 self.assertFalse(dataFrames) 2332 2333 def test_abortRequestWithCircuitBreaker(self): 2334 """ 2335 Aborting a request associated with a paused connection that's 2336 reached its buffered control frame limit causes that 2337 connection to be aborted. 2338 """ 2339 memoryReactor = MemoryReactorClock() 2340 connection = H2Connection(memoryReactor) 2341 connection.callLater = memoryReactor.callLater 2342 connection.requestFactory = DummyHTTPHandlerProxy 2343 2344 frameFactory = FrameFactory() 2345 transport = StringTransport() 2346 2347 # Establish the connection. 2348 clientConnectionPreface = frameFactory.clientConnectionPreface() 2349 connection.makeConnection(transport) 2350 connection.dataReceived(clientConnectionPreface) 2351 2352 # Send a headers frame for a stream 2353 streamID = 1 2354 headersFrameData = frameFactory.buildHeadersFrame( 2355 headers=self.postRequestHeaders, streamID=streamID 2356 ).serialize() 2357 connection.dataReceived(headersFrameData) 2358 2359 # Pause it and limit the number of outbound bytes to 1, so 2360 # that a ProtocolError aborts the connection 2361 connection.pauseProducing() 2362 connection._maxBufferedControlFrameBytes = 0 2363 2364 # Remove anything sent by the preceding frames. 2365 transport.clear() 2366 2367 # Abort the request. 2368 connection.abortRequest(streamID) 2369 2370 # No RST_STREAM frame was sent... 2371 self.assertFalse(transport.value()) 2372 # ...and the transport was disconnected (abortConnection was 2373 # called) 2374 self.assertTrue(transport.disconnected) 2375 2376 2377class HTTP2TransportChecking(unittest.TestCase, HTTP2TestHelpers): 2378 getRequestHeaders = [ 2379 (b":method", b"GET"), 2380 (b":authority", b"localhost"), 2381 (b":path", b"/"), 2382 (b":scheme", b"https"), 2383 (b"user-agent", b"twisted-test-code"), 2384 (b"custom-header", b"1"), 2385 (b"custom-header", b"2"), 2386 ] 2387 2388 def test_registerProducerWithTransport(self): 2389 """ 2390 L{H2Connection} can be registered with the transport as a producer. 2391 """ 2392 b = StringTransport() 2393 a = H2Connection() 2394 a.requestFactory = DummyHTTPHandlerProxy 2395 2396 b.registerProducer(a, True) 2397 self.assertTrue(b.producer is a) 2398 2399 def test_pausingProducerPreventsDataSend(self): 2400 """ 2401 L{H2Connection} can be paused by its consumer. When paused it stops 2402 sending data to the transport. 2403 """ 2404 f = FrameFactory() 2405 b = StringTransport() 2406 a = H2Connection() 2407 a.requestFactory = DummyHTTPHandlerProxy 2408 2409 # Send the request. 2410 frames = buildRequestFrames(self.getRequestHeaders, [], f) 2411 requestBytes = f.clientConnectionPreface() 2412 requestBytes += b"".join(f.serialize() for f in frames) 2413 a.makeConnection(b) 2414 b.registerProducer(a, True) 2415 2416 # one byte at a time, to stress the implementation. 2417 for byte in iterbytes(requestBytes): 2418 a.dataReceived(byte) 2419 2420 # The headers will be sent immediately, but the body will be waiting 2421 # until the reactor gets to spin. Before it does we'll pause 2422 # production. 2423 a.pauseProducing() 2424 2425 # Now we want to build up a whole chain of Deferreds. We want to 2426 # 1. deferLater for a moment to let the sending loop run, which should 2427 # block. 2428 # 2. After that deferred fires, we want to validate that no data has 2429 # been sent yet. 2430 # 3. Then we want to resume the production. 2431 # 4. Then, we want to wait for the stream completion deferred. 2432 # 5. Validate that the data is correct. 2433 cleanupCallback = a._streamCleanupCallbacks[1] 2434 2435 def validateNotSent(*args): 2436 frames = framesFromBytes(b.value()) 2437 2438 self.assertEqual(len(frames), 2) 2439 self.assertFalse(isinstance(frames[-1], hyperframe.frame.DataFrame)) 2440 a.resumeProducing() 2441 2442 # Resume producing is a no-op, so let's call it a bunch more times. 2443 a.resumeProducing() 2444 a.resumeProducing() 2445 a.resumeProducing() 2446 a.resumeProducing() 2447 return cleanupCallback 2448 2449 def validateComplete(*args): 2450 frames = framesFromBytes(b.value()) 2451 2452 # Check that the stream is correctly terminated. 2453 self.assertEqual(len(frames), 4) 2454 self.assertTrue("END_STREAM" in frames[-1].flags) 2455 2456 d = task.deferLater(reactor, 0.01, validateNotSent) 2457 d.addCallback(validateComplete) 2458 2459 return d 2460 2461 def test_stopProducing(self): 2462 """ 2463 L{H2Connection} can be stopped by its producer. That causes it to lose 2464 its transport. 2465 """ 2466 f = FrameFactory() 2467 b = StringTransport() 2468 a = H2Connection() 2469 a.requestFactory = DummyHTTPHandlerProxy 2470 2471 # Send the request. 2472 frames = buildRequestFrames(self.getRequestHeaders, [], f) 2473 requestBytes = f.clientConnectionPreface() 2474 requestBytes += b"".join(f.serialize() for f in frames) 2475 a.makeConnection(b) 2476 b.registerProducer(a, True) 2477 2478 # one byte at a time, to stress the implementation. 2479 for byte in iterbytes(requestBytes): 2480 a.dataReceived(byte) 2481 2482 # The headers will be sent immediately, but the body will be waiting 2483 # until the reactor gets to spin. Before it does we'll stop production. 2484 a.stopProducing() 2485 2486 frames = framesFromBytes(b.value()) 2487 2488 self.assertEqual(len(frames), 2) 2489 self.assertFalse(isinstance(frames[-1], hyperframe.frame.DataFrame)) 2490 self.assertFalse(a._stillProducing) 2491 2492 def test_passthroughHostAndPeer(self): 2493 """ 2494 A L{H2Stream} object correctly passes through host and peer information 2495 from its L{H2Connection}. 2496 """ 2497 hostAddress = IPv4Address("TCP", "17.52.24.8", 443) 2498 peerAddress = IPv4Address("TCP", "17.188.0.12", 32008) 2499 2500 frameFactory = FrameFactory() 2501 transport = StringTransport(hostAddress=hostAddress, peerAddress=peerAddress) 2502 connection = H2Connection() 2503 connection.requestFactory = DummyHTTPHandlerProxy 2504 connection.makeConnection(transport) 2505 2506 frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory) 2507 requestBytes = frameFactory.clientConnectionPreface() 2508 requestBytes += b"".join(frame.serialize() for frame in frames) 2509 2510 for byte in iterbytes(requestBytes): 2511 connection.dataReceived(byte) 2512 2513 # The stream is present. Go grab the stream object. 2514 stream = connection.streams[1] 2515 self.assertEqual(stream.getHost(), hostAddress) 2516 self.assertEqual(stream.getPeer(), peerAddress) 2517 2518 # Allow the stream to finish up and check the result. 2519 cleanupCallback = connection._streamCleanupCallbacks[1] 2520 2521 def validate(*args): 2522 self.assertEqual(stream.getHost(), hostAddress) 2523 self.assertEqual(stream.getPeer(), peerAddress) 2524 2525 return cleanupCallback.addCallback(validate) 2526 2527 2528class HTTP2SchedulingTests(unittest.TestCase, HTTP2TestHelpers): 2529 """ 2530 The H2Connection object schedules certain events (mostly its data sending 2531 loop) using callbacks from the reactor. These tests validate that the calls 2532 are scheduled correctly. 2533 """ 2534 2535 def test_initiallySchedulesOneDataCall(self): 2536 """ 2537 When a H2Connection is established it schedules one call to be run as 2538 soon as the reactor has time. 2539 """ 2540 reactor = task.Clock() 2541 a = H2Connection(reactor) 2542 2543 calls = reactor.getDelayedCalls() 2544 self.assertEqual(len(calls), 1) 2545 call = calls[0] 2546 2547 # Validate that the call is scheduled for right now, but hasn't run, 2548 # and that it's correct. 2549 self.assertTrue(call.active()) 2550 self.assertEqual(call.time, 0) 2551 self.assertEqual(call.func, a._sendPrioritisedData) 2552 self.assertEqual(call.args, ()) 2553 self.assertEqual(call.kw, {}) 2554 2555 2556class HTTP2TimeoutTests(unittest.TestCase, HTTP2TestHelpers): 2557 """ 2558 The L{H2Connection} object times out idle connections. 2559 """ 2560 2561 getRequestHeaders = [ 2562 (b":method", b"GET"), 2563 (b":authority", b"localhost"), 2564 (b":path", b"/"), 2565 (b":scheme", b"https"), 2566 (b"user-agent", b"twisted-test-code"), 2567 (b"custom-header", b"1"), 2568 (b"custom-header", b"2"), 2569 ] 2570 2571 # A sentinel object used to flag default timeouts 2572 _DEFAULT = object() 2573 2574 def patch_TimeoutMixin_clock(self, connection, reactor): 2575 """ 2576 Unfortunately, TimeoutMixin does not allow passing an explicit reactor 2577 to test timeouts. For that reason, we need to monkeypatch the method 2578 set up by the TimeoutMixin. 2579 2580 @param connection: The HTTP/2 connection object to patch. 2581 @type connection: L{H2Connection} 2582 2583 @param reactor: The reactor whose callLater method we want. 2584 @type reactor: An object implementing 2585 L{twisted.internet.interfaces.IReactorTime} 2586 """ 2587 connection.callLater = reactor.callLater 2588 2589 def initiateH2Connection(self, initialData, requestFactory): 2590 """ 2591 Performs test setup by building a HTTP/2 connection object, a transport 2592 to back it, a reactor to run in, and sending in some initial data as 2593 needed. 2594 2595 @param initialData: The initial HTTP/2 data to be fed into the 2596 connection after setup. 2597 @type initialData: L{bytes} 2598 2599 @param requestFactory: The L{Request} factory to use with the 2600 connection. 2601 """ 2602 reactor = task.Clock() 2603 conn = H2Connection(reactor) 2604 conn.timeOut = 100 2605 self.patch_TimeoutMixin_clock(conn, reactor) 2606 2607 transport = StringTransport() 2608 conn.requestFactory = _makeRequestProxyFactory(requestFactory) 2609 conn.makeConnection(transport) 2610 2611 # one byte at a time, to stress the implementation. 2612 for byte in iterbytes(initialData): 2613 conn.dataReceived(byte) 2614 2615 return (reactor, conn, transport) 2616 2617 def assertTimedOut(self, data, frameCount, errorCode, lastStreamID): 2618 """ 2619 Confirm that the data that was sent matches what we expect from a 2620 timeout: namely, that it ends with a GOAWAY frame carrying an 2621 appropriate error code and last stream ID. 2622 """ 2623 frames = framesFromBytes(data) 2624 2625 self.assertEqual(len(frames), frameCount) 2626 self.assertTrue(isinstance(frames[-1], hyperframe.frame.GoAwayFrame)) 2627 self.assertEqual(frames[-1].error_code, errorCode) 2628 self.assertEqual(frames[-1].last_stream_id, lastStreamID) 2629 2630 def prepareAbortTest(self, abortTimeout=_DEFAULT): 2631 """ 2632 Does the common setup for tests that want to test the aborting 2633 functionality of the HTTP/2 stack. 2634 2635 @param abortTimeout: The value to use for the abortTimeout. Defaults to 2636 whatever is set on L{H2Connection.abortTimeout}. 2637 @type abortTimeout: L{int} or L{None} 2638 2639 @return: A tuple of the reactor being used for the connection, the 2640 connection itself, and the transport. 2641 """ 2642 if abortTimeout is self._DEFAULT: 2643 abortTimeout = H2Connection.abortTimeout 2644 2645 frameFactory = FrameFactory() 2646 initialData = frameFactory.clientConnectionPreface() 2647 2648 reactor, conn, transport = self.initiateH2Connection( 2649 initialData, 2650 requestFactory=DummyHTTPHandler, 2651 ) 2652 conn.abortTimeout = abortTimeout 2653 2654 # Advance the clock. 2655 reactor.advance(100) 2656 2657 self.assertTimedOut( 2658 transport.value(), 2659 frameCount=2, 2660 errorCode=h2.errors.ErrorCodes.NO_ERROR, 2661 lastStreamID=0, 2662 ) 2663 self.assertTrue(transport.disconnecting) 2664 self.assertFalse(transport.disconnected) 2665 2666 return reactor, conn, transport 2667 2668 def test_timeoutAfterInactivity(self): 2669 """ 2670 When a L{H2Connection} does not receive any data for more than the 2671 time out interval, it closes the connection cleanly. 2672 """ 2673 frameFactory = FrameFactory() 2674 initialData = frameFactory.clientConnectionPreface() 2675 2676 reactor, conn, transport = self.initiateH2Connection( 2677 initialData, 2678 requestFactory=DummyHTTPHandler, 2679 ) 2680 2681 # Save the response preamble. 2682 preamble = transport.value() 2683 2684 # Advance the clock. 2685 reactor.advance(99) 2686 2687 # Everything is fine, no extra data got sent. 2688 self.assertEqual(preamble, transport.value()) 2689 self.assertFalse(transport.disconnecting) 2690 2691 # Advance the clock. 2692 reactor.advance(2) 2693 2694 self.assertTimedOut( 2695 transport.value(), 2696 frameCount=2, 2697 errorCode=h2.errors.ErrorCodes.NO_ERROR, 2698 lastStreamID=0, 2699 ) 2700 self.assertTrue(transport.disconnecting) 2701 2702 def test_timeoutResetByRequestData(self): 2703 """ 2704 When a L{H2Connection} receives data, the timeout is reset. 2705 """ 2706 # Don't send any initial data, we'll send the preamble manually. 2707 frameFactory = FrameFactory() 2708 initialData = b"" 2709 2710 reactor, conn, transport = self.initiateH2Connection( 2711 initialData, 2712 requestFactory=DummyHTTPHandler, 2713 ) 2714 2715 # Send one byte of the preamble every 99 'seconds'. 2716 for byte in iterbytes(frameFactory.clientConnectionPreface()): 2717 conn.dataReceived(byte) 2718 2719 # Advance the clock. 2720 reactor.advance(99) 2721 2722 # Everything is fine. 2723 self.assertFalse(transport.disconnecting) 2724 2725 # Advance the clock. 2726 reactor.advance(2) 2727 2728 self.assertTimedOut( 2729 transport.value(), 2730 frameCount=2, 2731 errorCode=h2.errors.ErrorCodes.NO_ERROR, 2732 lastStreamID=0, 2733 ) 2734 self.assertTrue(transport.disconnecting) 2735 2736 def test_timeoutResetByResponseData(self): 2737 """ 2738 When a L{H2Connection} sends data, the timeout is reset. 2739 """ 2740 # Don't send any initial data, we'll send the preamble manually. 2741 frameFactory = FrameFactory() 2742 initialData = b"" 2743 requests = [] 2744 2745 frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory) 2746 initialData = frameFactory.clientConnectionPreface() 2747 initialData += b"".join(f.serialize() for f in frames) 2748 2749 def saveRequest(stream, queued): 2750 req = DelayedHTTPHandler(stream, queued=queued) 2751 requests.append(req) 2752 return req 2753 2754 reactor, conn, transport = self.initiateH2Connection( 2755 initialData, 2756 requestFactory=saveRequest, 2757 ) 2758 2759 conn.dataReceived(frameFactory.clientConnectionPreface()) 2760 2761 # Advance the clock. 2762 reactor.advance(99) 2763 self.assertEquals(len(requests), 1) 2764 2765 for x in range(10): 2766 # It doesn't time out as it's being written... 2767 requests[0].write(b"some bytes") 2768 reactor.advance(99) 2769 self.assertFalse(transport.disconnecting) 2770 2771 # but the timer is still running, and it times out when it idles. 2772 reactor.advance(2) 2773 self.assertTimedOut( 2774 transport.value(), 2775 frameCount=13, 2776 errorCode=h2.errors.ErrorCodes.PROTOCOL_ERROR, 2777 lastStreamID=1, 2778 ) 2779 2780 def test_timeoutWithProtocolErrorIfStreamsOpen(self): 2781 """ 2782 When a L{H2Connection} times out with active streams, the error code 2783 returned is L{h2.errors.ErrorCodes.PROTOCOL_ERROR}. 2784 """ 2785 frameFactory = FrameFactory() 2786 frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory) 2787 initialData = frameFactory.clientConnectionPreface() 2788 initialData += b"".join(f.serialize() for f in frames) 2789 2790 reactor, conn, transport = self.initiateH2Connection( 2791 initialData, 2792 requestFactory=DummyProducerHandler, 2793 ) 2794 2795 # Advance the clock to time out the request. 2796 reactor.advance(101) 2797 2798 self.assertTimedOut( 2799 transport.value(), 2800 frameCount=2, 2801 errorCode=h2.errors.ErrorCodes.PROTOCOL_ERROR, 2802 lastStreamID=1, 2803 ) 2804 self.assertTrue(transport.disconnecting) 2805 2806 def test_noTimeoutIfConnectionLost(self): 2807 """ 2808 When a L{H2Connection} loses its connection it cancels its timeout. 2809 """ 2810 frameFactory = FrameFactory() 2811 frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory) 2812 initialData = frameFactory.clientConnectionPreface() 2813 initialData += b"".join(f.serialize() for f in frames) 2814 2815 reactor, conn, transport = self.initiateH2Connection( 2816 initialData, 2817 requestFactory=DummyProducerHandler, 2818 ) 2819 2820 sentData = transport.value() 2821 oldCallCount = len(reactor.getDelayedCalls()) 2822 2823 # Now lose the connection. 2824 conn.connectionLost("reason") 2825 2826 # There should be one fewer call than there was. 2827 currentCallCount = len(reactor.getDelayedCalls()) 2828 self.assertEqual(oldCallCount - 1, currentCallCount) 2829 2830 # Advancing the clock should do nothing. 2831 reactor.advance(101) 2832 self.assertEqual(transport.value(), sentData) 2833 2834 def test_timeoutEventuallyForcesConnectionClosed(self): 2835 """ 2836 When a L{H2Connection} has timed the connection out, and the transport 2837 doesn't get torn down within 15 seconds, it gets forcibly closed. 2838 """ 2839 reactor, conn, transport = self.prepareAbortTest() 2840 2841 # Advance the clock to see that we abort the connection. 2842 reactor.advance(14) 2843 self.assertTrue(transport.disconnecting) 2844 self.assertFalse(transport.disconnected) 2845 reactor.advance(1) 2846 self.assertTrue(transport.disconnecting) 2847 self.assertTrue(transport.disconnected) 2848 2849 def test_losingConnectionCancelsTheAbort(self): 2850 """ 2851 When a L{H2Connection} has timed the connection out, getting 2852 C{connectionLost} called on it cancels the forcible connection close. 2853 """ 2854 reactor, conn, transport = self.prepareAbortTest() 2855 2856 # Advance the clock, but right before the end fire connectionLost. 2857 reactor.advance(14) 2858 conn.connectionLost(None) 2859 2860 # Check that the transport isn't forcibly closed. 2861 reactor.advance(1) 2862 self.assertTrue(transport.disconnecting) 2863 self.assertFalse(transport.disconnected) 2864 2865 def test_losingConnectionWithNoAbortTimeOut(self): 2866 """ 2867 When a L{H2Connection} has timed the connection out but the 2868 C{abortTimeout} is set to L{None}, the connection is never aborted. 2869 """ 2870 reactor, conn, transport = self.prepareAbortTest(abortTimeout=None) 2871 2872 # Advance the clock an arbitrarily long way, and confirm it never 2873 # aborts. 2874 reactor.advance(2 ** 32) 2875 self.assertTrue(transport.disconnecting) 2876 self.assertFalse(transport.disconnected) 2877 2878 def test_connectionLostAfterForceClose(self): 2879 """ 2880 If a timed out transport doesn't close after 15 seconds, the 2881 L{HTTPChannel} will forcibly close it. 2882 """ 2883 reactor, conn, transport = self.prepareAbortTest() 2884 2885 # Force the follow-on forced closure. 2886 reactor.advance(15) 2887 self.assertTrue(transport.disconnecting) 2888 self.assertTrue(transport.disconnected) 2889 2890 # Now call connectionLost on the protocol. This is done by some 2891 # transports, including TCP and TLS. We don't have anything we can 2892 # assert on here: this just must not explode. 2893 conn.connectionLost(error.ConnectionDone) 2894 2895 def test_timeOutClientThatSendsOnlyInvalidFrames(self): 2896 """ 2897 A client that sends only invalid frames is eventually timed out. 2898 """ 2899 memoryReactor = MemoryReactorClock() 2900 2901 connection = H2Connection(memoryReactor) 2902 connection.callLater = memoryReactor.callLater 2903 connection.timeOut = 60 2904 2905 frameFactory = FrameFactory() 2906 transport = StringTransport() 2907 2908 clientConnectionPreface = frameFactory.clientConnectionPreface() 2909 connection.makeConnection(transport) 2910 connection.dataReceived(clientConnectionPreface) 2911 2912 # Send data until both the loseConnection and abortConnection 2913 # timeouts have elapsed. 2914 for _ in range(connection.timeOut + connection.abortTimeout): 2915 connection.dataReceived(frameFactory.buildRstStreamFrame(1).serialize()) 2916 memoryReactor.advance(1) 2917 2918 # Invalid frames don't reset any timeouts, so the above has 2919 # forcibly disconnected us via abortConnection. 2920 self.assertTrue(transport.disconnected) 2921