1# -*- test-case-name: twisted.web.test.test_http2 -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6HTTP2 Implementation 7 8This is the basic server-side protocol implementation used by the Twisted 9Web server for HTTP2. This functionality is intended to be combined with the 10HTTP/1.1 and HTTP/1.0 functionality in twisted.web.http to provide complete 11protocol support for HTTP-type protocols. 12 13This API is currently considered private because it's in early draft form. When 14it has stabilised, it'll be made public. 15""" 16 17 18import io 19from collections import deque 20from typing import List 21 22from zope.interface import implementer 23 24import h2.config # type: ignore[import] 25import h2.connection # type: ignore[import] 26import h2.errors # type: ignore[import] 27import h2.events # type: ignore[import] 28import h2.exceptions # type: ignore[import] 29import priority # type: ignore[import] 30 31from twisted.internet._producer_helpers import _PullToPush 32from twisted.internet.defer import Deferred 33from twisted.internet.error import ConnectionLost 34from twisted.internet.interfaces import ( 35 IConsumer, 36 IProtocol, 37 IPushProducer, 38 ISSLTransport, 39 ITransport, 40) 41from twisted.internet.protocol import Protocol 42from twisted.logger import Logger 43from twisted.protocols.policies import TimeoutMixin 44from twisted.python.failure import Failure 45from twisted.web.error import ExcessiveBufferingError 46 47# This API is currently considered private. 48__all__: List[str] = [] 49 50 51_END_STREAM_SENTINEL = object() 52 53 54@implementer(IProtocol, IPushProducer) 55class H2Connection(Protocol, TimeoutMixin): 56 """ 57 A class representing a single HTTP/2 connection. 58 59 This implementation of L{IProtocol} works hand in hand with L{H2Stream}. 60 This is because we have the requirement to register multiple producers for 61 a single HTTP/2 connection, one for each stream. The standard Twisted 62 interfaces don't really allow for this, so instead there's a custom 63 interface between the two objects that allows them to work hand-in-hand here. 64 65 @ivar conn: The HTTP/2 connection state machine. 66 @type conn: L{h2.connection.H2Connection} 67 68 @ivar streams: A mapping of stream IDs to L{H2Stream} objects, used to call 69 specific methods on streams when events occur. 70 @type streams: L{dict}, mapping L{int} stream IDs to L{H2Stream} objects. 71 72 @ivar priority: A HTTP/2 priority tree used to ensure that responses are 73 prioritised appropriately. 74 @type priority: L{priority.PriorityTree} 75 76 @ivar _consumerBlocked: A flag tracking whether or not the L{IConsumer} 77 that is consuming this data has asked us to stop producing. 78 @type _consumerBlocked: L{bool} 79 80 @ivar _sendingDeferred: A L{Deferred} used to restart the data-sending loop 81 when more response data has been produced. Will not be present if there 82 is outstanding data still to send. 83 @type _consumerBlocked: A L{twisted.internet.defer.Deferred}, or L{None} 84 85 @ivar _outboundStreamQueues: A map of stream IDs to queues, used to store 86 data blocks that are yet to be sent on the connection. These are used 87 both to handle producers that do not respect L{IConsumer} but also to 88 allow priority to multiplex data appropriately. 89 @type _outboundStreamQueues: A L{dict} mapping L{int} stream IDs to 90 L{collections.deque} queues, which contain either L{bytes} objects or 91 C{_END_STREAM_SENTINEL}. 92 93 @ivar _sender: A handle to the data-sending loop, allowing it to be 94 terminated if needed. 95 @type _sender: L{twisted.internet.task.LoopingCall} 96 97 @ivar abortTimeout: The number of seconds to wait after we attempt to shut 98 the transport down cleanly to give up and forcibly terminate it. This 99 is only used when we time a connection out, to prevent errors causing 100 the FD to get leaked. If this is L{None}, we will wait forever. 101 @type abortTimeout: L{int} 102 103 @ivar _abortingCall: The L{twisted.internet.base.DelayedCall} that will be 104 used to forcibly close the transport if it doesn't close cleanly. 105 @type _abortingCall: L{twisted.internet.base.DelayedCall} 106 """ 107 108 factory = None 109 site = None 110 abortTimeout = 15 111 112 _log = Logger() 113 _abortingCall = None 114 115 def __init__(self, reactor=None): 116 config = h2.config.H2Configuration(client_side=False, header_encoding=None) 117 self.conn = h2.connection.H2Connection(config=config) 118 self.streams = {} 119 120 self.priority = priority.PriorityTree() 121 self._consumerBlocked = None 122 self._sendingDeferred = None 123 self._outboundStreamQueues = {} 124 self._streamCleanupCallbacks = {} 125 self._stillProducing = True 126 127 # Limit the number of buffered control frame (e.g. PING and 128 # SETTINGS) bytes. 129 self._maxBufferedControlFrameBytes = 1024 * 17 130 self._bufferedControlFrames = deque() 131 self._bufferedControlFrameBytes = 0 132 133 if reactor is None: 134 from twisted.internet import reactor 135 self._reactor = reactor 136 137 # Start the data sending function. 138 self._reactor.callLater(0, self._sendPrioritisedData) 139 140 # Implementation of IProtocol 141 def connectionMade(self): 142 """ 143 Called by the reactor when a connection is received. May also be called 144 by the L{twisted.web.http._GenericHTTPChannelProtocol} during upgrade 145 to HTTP/2. 146 """ 147 self.setTimeout(self.timeOut) 148 self.conn.initiate_connection() 149 self.transport.write(self.conn.data_to_send()) 150 151 def dataReceived(self, data): 152 """ 153 Called whenever a chunk of data is received from the transport. 154 155 @param data: The data received from the transport. 156 @type data: L{bytes} 157 """ 158 try: 159 events = self.conn.receive_data(data) 160 except h2.exceptions.ProtocolError: 161 stillActive = self._tryToWriteControlData() 162 if stillActive: 163 self.transport.loseConnection() 164 self.connectionLost(Failure(), _cancelTimeouts=False) 165 return 166 167 # Only reset the timeout if we've received an actual H2 168 # protocol message 169 self.resetTimeout() 170 171 for event in events: 172 if isinstance(event, h2.events.RequestReceived): 173 self._requestReceived(event) 174 elif isinstance(event, h2.events.DataReceived): 175 self._requestDataReceived(event) 176 elif isinstance(event, h2.events.StreamEnded): 177 self._requestEnded(event) 178 elif isinstance(event, h2.events.StreamReset): 179 self._requestAborted(event) 180 elif isinstance(event, h2.events.WindowUpdated): 181 self._handleWindowUpdate(event) 182 elif isinstance(event, h2.events.PriorityUpdated): 183 self._handlePriorityUpdate(event) 184 elif isinstance(event, h2.events.ConnectionTerminated): 185 self.transport.loseConnection() 186 self.connectionLost( 187 Failure(ConnectionLost("Remote peer sent GOAWAY")), 188 _cancelTimeouts=False, 189 ) 190 191 self._tryToWriteControlData() 192 193 def timeoutConnection(self): 194 """ 195 Called when the connection has been inactive for 196 L{self.timeOut<twisted.protocols.policies.TimeoutMixin.timeOut>} 197 seconds. Cleanly tears the connection down, attempting to notify the 198 peer if needed. 199 200 We override this method to add two extra bits of functionality: 201 202 - We want to log the timeout. 203 - We want to send a GOAWAY frame indicating that the connection is 204 being terminated, and whether it was clean or not. We have to do this 205 before the connection is torn down. 206 """ 207 self._log.info("Timing out client {client}", client=self.transport.getPeer()) 208 209 # Check whether there are open streams. If there are, we're going to 210 # want to use the error code PROTOCOL_ERROR. If there aren't, use 211 # NO_ERROR. 212 if self.conn.open_outbound_streams > 0 or self.conn.open_inbound_streams > 0: 213 error_code = h2.errors.ErrorCodes.PROTOCOL_ERROR 214 else: 215 error_code = h2.errors.ErrorCodes.NO_ERROR 216 217 self.conn.close_connection(error_code=error_code) 218 self.transport.write(self.conn.data_to_send()) 219 220 # Don't let the client hold this connection open too long. 221 if self.abortTimeout is not None: 222 # We use self.callLater because that's what TimeoutMixin does, even 223 # though we have a perfectly good reactor sitting around. See 224 # https://twistedmatrix.com/trac/ticket/8488. 225 self._abortingCall = self.callLater( 226 self.abortTimeout, self.forceAbortClient 227 ) 228 229 # We're done, throw the connection away. 230 self.transport.loseConnection() 231 232 def forceAbortClient(self): 233 """ 234 Called if C{abortTimeout} seconds have passed since the timeout fired, 235 and the connection still hasn't gone away. This can really only happen 236 on extremely bad connections or when clients are maliciously attempting 237 to keep connections open. 238 """ 239 self._log.info( 240 "Forcibly timing out client: {client}", client=self.transport.getPeer() 241 ) 242 # We want to lose track of the _abortingCall so that no-one tries to 243 # cancel it. 244 self._abortingCall = None 245 self.transport.abortConnection() 246 247 def connectionLost(self, reason, _cancelTimeouts=True): 248 """ 249 Called when the transport connection is lost. 250 251 Informs all outstanding response handlers that the connection 252 has been lost, and cleans up all internal state. 253 254 @param reason: See L{IProtocol.connectionLost} 255 256 @param _cancelTimeouts: Propagate the C{reason} to this 257 connection's streams but don't cancel any timers, so that 258 peers who never read the data we've written are eventually 259 timed out. 260 """ 261 self._stillProducing = False 262 if _cancelTimeouts: 263 self.setTimeout(None) 264 265 for stream in self.streams.values(): 266 stream.connectionLost(reason) 267 268 for streamID in list(self.streams.keys()): 269 self._requestDone(streamID) 270 271 # If we were going to force-close the transport, we don't have to now. 272 if _cancelTimeouts and self._abortingCall is not None: 273 self._abortingCall.cancel() 274 self._abortingCall = None 275 276 # Implementation of IPushProducer 277 # 278 # Here's how we handle IPushProducer. We have multiple outstanding 279 # H2Streams. Each of these exposes an IConsumer interface to the response 280 # handler that allows it to push data into the H2Stream. The H2Stream then 281 # writes the data into the H2Connection object. 282 # 283 # The H2Connection needs to manage these writes to account for: 284 # 285 # - flow control 286 # - priority 287 # 288 # We manage each of these in different ways. 289 # 290 # For flow control, we simply use the equivalent of the IPushProducer 291 # interface. We simply tell the H2Stream: "Hey, you can't send any data 292 # right now, sorry!". When that stream becomes unblocked, we free it up 293 # again. This allows the H2Stream to propagate this backpressure up the 294 # chain. 295 # 296 # For priority, we need to keep a backlog of data frames that we can send, 297 # and interleave them appropriately. This backlog is most sensibly kept in 298 # the H2Connection object itself. We keep one queue per stream, which is 299 # where the writes go, and then we have a loop that manages popping these 300 # streams off in priority order. 301 # 302 # Logically then, we go as follows: 303 # 304 # 1. Stream calls writeDataToStream(). This causes a DataFrame to be placed 305 # on the queue for that stream. It also informs the priority 306 # implementation that this stream is unblocked. 307 # 2. The _sendPrioritisedData() function spins in a tight loop. Each 308 # iteration it asks the priority implementation which stream should send 309 # next, and pops a data frame off that stream's queue. If, after sending 310 # that frame, there is no data left on that stream's queue, the function 311 # informs the priority implementation that the stream is blocked. 312 # 313 # If all streams are blocked, or if there are no outstanding streams, the 314 # _sendPrioritisedData function waits to be awoken when more data is ready 315 # to send. 316 # 317 # Note that all of this only applies to *data*. Headers and other control 318 # frames deliberately skip this processing as they are not subject to flow 319 # control or priority constraints. Instead, they are stored in their own buffer 320 # which is used primarily to detect excessive buffering. 321 def stopProducing(self): 322 """ 323 Stop producing data. 324 325 This tells the L{H2Connection} that its consumer has died, so it must 326 stop producing data for good. 327 """ 328 self.connectionLost(Failure(ConnectionLost("Producing stopped"))) 329 330 def pauseProducing(self): 331 """ 332 Pause producing data. 333 334 Tells the L{H2Connection} that it has produced too much data to process 335 for the time being, and to stop until resumeProducing() is called. 336 """ 337 self._consumerBlocked = Deferred() 338 # Ensure pending control data (if any) are sent first. 339 self._consumerBlocked.addCallback(self._flushBufferedControlData) 340 341 def resumeProducing(self): 342 """ 343 Resume producing data. 344 345 This tells the L{H2Connection} to re-add itself to the main loop and 346 produce more data for the consumer. 347 """ 348 if self._consumerBlocked is not None: 349 d = self._consumerBlocked 350 self._consumerBlocked = None 351 d.callback(None) 352 353 def _sendPrioritisedData(self, *args): 354 """ 355 The data sending loop. This function repeatedly calls itself, either 356 from L{Deferred}s or from 357 L{reactor.callLater<twisted.internet.interfaces.IReactorTime.callLater>} 358 359 This function sends data on streams according to the rules of HTTP/2 360 priority. It ensures that the data from each stream is interleved 361 according to the priority signalled by the client, making sure that the 362 connection is used with maximal efficiency. 363 364 This function will execute if data is available: if all data is 365 exhausted, the function will place a deferred onto the L{H2Connection} 366 object and wait until it is called to resume executing. 367 """ 368 # If producing has stopped, we're done. Don't reschedule ourselves 369 if not self._stillProducing: 370 return 371 372 stream = None 373 374 while stream is None: 375 try: 376 stream = next(self.priority) 377 except priority.DeadlockError: 378 # All streams are currently blocked or not progressing. Wait 379 # until a new one becomes available. 380 assert self._sendingDeferred is None 381 self._sendingDeferred = Deferred() 382 self._sendingDeferred.addCallback(self._sendPrioritisedData) 383 return 384 385 # Wait behind the transport. 386 if self._consumerBlocked is not None: 387 self._consumerBlocked.addCallback(self._sendPrioritisedData) 388 return 389 390 self.resetTimeout() 391 392 remainingWindow = self.conn.local_flow_control_window(stream) 393 frameData = self._outboundStreamQueues[stream].popleft() 394 maxFrameSize = min(self.conn.max_outbound_frame_size, remainingWindow) 395 396 if frameData is _END_STREAM_SENTINEL: 397 # There's no error handling here even though this can throw 398 # ProtocolError because we really shouldn't encounter this problem. 399 # If we do, that's a nasty bug. 400 self.conn.end_stream(stream) 401 self.transport.write(self.conn.data_to_send()) 402 403 # Clean up the stream 404 self._requestDone(stream) 405 else: 406 # Respect the max frame size. 407 if len(frameData) > maxFrameSize: 408 excessData = frameData[maxFrameSize:] 409 frameData = frameData[:maxFrameSize] 410 self._outboundStreamQueues[stream].appendleft(excessData) 411 412 # There's deliberately no error handling here, because this just 413 # absolutely should not happen. 414 # If for whatever reason the max frame length is zero and so we 415 # have no frame data to send, don't send any. 416 if frameData: 417 self.conn.send_data(stream, frameData) 418 self.transport.write(self.conn.data_to_send()) 419 420 # If there's no data left, this stream is now blocked. 421 if not self._outboundStreamQueues[stream]: 422 self.priority.block(stream) 423 424 # Also, if the stream's flow control window is exhausted, tell it 425 # to stop. 426 if self.remainingOutboundWindow(stream) <= 0: 427 self.streams[stream].flowControlBlocked() 428 429 self._reactor.callLater(0, self._sendPrioritisedData) 430 431 # Internal functions. 432 def _requestReceived(self, event): 433 """ 434 Internal handler for when a request has been received. 435 436 @param event: The Hyper-h2 event that encodes information about the 437 received request. 438 @type event: L{h2.events.RequestReceived} 439 """ 440 stream = H2Stream( 441 event.stream_id, 442 self, 443 event.headers, 444 self.requestFactory, 445 self.site, 446 self.factory, 447 ) 448 self.streams[event.stream_id] = stream 449 self._streamCleanupCallbacks[event.stream_id] = Deferred() 450 self._outboundStreamQueues[event.stream_id] = deque() 451 452 # Add the stream to the priority tree but immediately block it. 453 try: 454 self.priority.insert_stream(event.stream_id) 455 except priority.DuplicateStreamError: 456 # Stream already in the tree. This can happen if we received a 457 # PRIORITY frame before a HEADERS frame. Just move on: we set the 458 # stream up properly in _handlePriorityUpdate. 459 pass 460 else: 461 self.priority.block(event.stream_id) 462 463 def _requestDataReceived(self, event): 464 """ 465 Internal handler for when a chunk of data is received for a given 466 request. 467 468 @param event: The Hyper-h2 event that encodes information about the 469 received data. 470 @type event: L{h2.events.DataReceived} 471 """ 472 stream = self.streams[event.stream_id] 473 stream.receiveDataChunk(event.data, event.flow_controlled_length) 474 475 def _requestEnded(self, event): 476 """ 477 Internal handler for when a request is complete, and we expect no 478 further data for that request. 479 480 @param event: The Hyper-h2 event that encodes information about the 481 completed stream. 482 @type event: L{h2.events.StreamEnded} 483 """ 484 stream = self.streams[event.stream_id] 485 stream.requestComplete() 486 487 def _requestAborted(self, event): 488 """ 489 Internal handler for when a request is aborted by a remote peer. 490 491 @param event: The Hyper-h2 event that encodes information about the 492 reset stream. 493 @type event: L{h2.events.StreamReset} 494 """ 495 stream = self.streams[event.stream_id] 496 stream.connectionLost( 497 Failure(ConnectionLost("Stream reset with code %s" % event.error_code)) 498 ) 499 self._requestDone(event.stream_id) 500 501 def _handlePriorityUpdate(self, event): 502 """ 503 Internal handler for when a stream priority is updated. 504 505 @param event: The Hyper-h2 event that encodes information about the 506 stream reprioritization. 507 @type event: L{h2.events.PriorityUpdated} 508 """ 509 try: 510 self.priority.reprioritize( 511 stream_id=event.stream_id, 512 depends_on=event.depends_on or None, 513 weight=event.weight, 514 exclusive=event.exclusive, 515 ) 516 except priority.MissingStreamError: 517 # A PRIORITY frame arrived before the HEADERS frame that would 518 # trigger us to insert the stream into the tree. That's fine: we 519 # can create the stream here and mark it as blocked. 520 self.priority.insert_stream( 521 stream_id=event.stream_id, 522 depends_on=event.depends_on or None, 523 weight=event.weight, 524 exclusive=event.exclusive, 525 ) 526 self.priority.block(event.stream_id) 527 528 def writeHeaders(self, version, code, reason, headers, streamID): 529 """ 530 Called by L{twisted.web.http.Request} objects to write a complete set 531 of HTTP headers to a stream. 532 533 @param version: The HTTP version in use. Unused in HTTP/2. 534 @type version: L{bytes} 535 536 @param code: The HTTP status code to write. 537 @type code: L{bytes} 538 539 @param reason: The HTTP reason phrase to write. Unused in HTTP/2. 540 @type reason: L{bytes} 541 542 @param headers: The headers to write to the stream. 543 @type headers: L{twisted.web.http_headers.Headers} 544 545 @param streamID: The ID of the stream to write the headers to. 546 @type streamID: L{int} 547 """ 548 headers.insert(0, (b":status", code)) 549 550 try: 551 self.conn.send_headers(streamID, headers) 552 except h2.exceptions.StreamClosedError: 553 # Stream was closed by the client at some point. We need to not 554 # explode here: just swallow the error. That's what write() does 555 # when a connection is lost, so that's what we do too. 556 return 557 else: 558 self._tryToWriteControlData() 559 560 def writeDataToStream(self, streamID, data): 561 """ 562 May be called by L{H2Stream} objects to write response data to a given 563 stream. Writes a single data frame. 564 565 @param streamID: The ID of the stream to write the data to. 566 @type streamID: L{int} 567 568 @param data: The data chunk to write to the stream. 569 @type data: L{bytes} 570 """ 571 self._outboundStreamQueues[streamID].append(data) 572 573 # There's obviously no point unblocking this stream and the sending 574 # loop if the data can't actually be sent, so confirm that there's 575 # some room to send data. 576 if self.conn.local_flow_control_window(streamID) > 0: 577 self.priority.unblock(streamID) 578 if self._sendingDeferred is not None: 579 d = self._sendingDeferred 580 self._sendingDeferred = None 581 d.callback(streamID) 582 583 if self.remainingOutboundWindow(streamID) <= 0: 584 self.streams[streamID].flowControlBlocked() 585 586 def endRequest(self, streamID): 587 """ 588 Called by L{H2Stream} objects to signal completion of a response. 589 590 @param streamID: The ID of the stream to write the data to. 591 @type streamID: L{int} 592 """ 593 self._outboundStreamQueues[streamID].append(_END_STREAM_SENTINEL) 594 self.priority.unblock(streamID) 595 if self._sendingDeferred is not None: 596 d = self._sendingDeferred 597 self._sendingDeferred = None 598 d.callback(streamID) 599 600 def abortRequest(self, streamID): 601 """ 602 Called by L{H2Stream} objects to request early termination of a stream. 603 This emits a RstStream frame and then removes all stream state. 604 605 @param streamID: The ID of the stream to write the data to. 606 @type streamID: L{int} 607 """ 608 self.conn.reset_stream(streamID) 609 stillActive = self._tryToWriteControlData() 610 if stillActive: 611 self._requestDone(streamID) 612 613 def _requestDone(self, streamID): 614 """ 615 Called internally by the data sending loop to clean up state that was 616 being used for the stream. Called when the stream is complete. 617 618 @param streamID: The ID of the stream to clean up state for. 619 @type streamID: L{int} 620 """ 621 del self._outboundStreamQueues[streamID] 622 self.priority.remove_stream(streamID) 623 del self.streams[streamID] 624 cleanupCallback = self._streamCleanupCallbacks.pop(streamID) 625 cleanupCallback.callback(streamID) 626 627 def remainingOutboundWindow(self, streamID): 628 """ 629 Called to determine how much room is left in the send window for a 630 given stream. Allows us to handle blocking and unblocking producers. 631 632 @param streamID: The ID of the stream whose flow control window we'll 633 check. 634 @type streamID: L{int} 635 636 @return: The amount of room remaining in the send window for the given 637 stream, including the data queued to be sent. 638 @rtype: L{int} 639 """ 640 # TODO: This involves a fair bit of looping and computation for 641 # something that is called a lot. Consider caching values somewhere. 642 windowSize = self.conn.local_flow_control_window(streamID) 643 sendQueue = self._outboundStreamQueues[streamID] 644 alreadyConsumed = sum( 645 len(chunk) for chunk in sendQueue if chunk is not _END_STREAM_SENTINEL 646 ) 647 648 return windowSize - alreadyConsumed 649 650 def _handleWindowUpdate(self, event): 651 """ 652 Manage flow control windows. 653 654 Streams that are blocked on flow control will register themselves with 655 the connection. This will fire deferreds that wake those streams up and 656 allow them to continue processing. 657 658 @param event: The Hyper-h2 event that encodes information about the 659 flow control window change. 660 @type event: L{h2.events.WindowUpdated} 661 """ 662 streamID = event.stream_id 663 664 if streamID: 665 if not self._streamIsActive(streamID): 666 # We may have already cleaned up our stream state, making this 667 # a late WINDOW_UPDATE frame. That's fine: the update is 668 # unnecessary but benign. We'll ignore it. 669 return 670 671 # If we haven't got any data to send, don't unblock the stream. If 672 # we do, we'll eventually get an exception inside the 673 # _sendPrioritisedData loop some time later. 674 if self._outboundStreamQueues.get(streamID): 675 self.priority.unblock(streamID) 676 self.streams[streamID].windowUpdated() 677 else: 678 # Update strictly applies to all streams. 679 for stream in self.streams.values(): 680 stream.windowUpdated() 681 682 # If we still have data to send for this stream, unblock it. 683 if self._outboundStreamQueues.get(stream.streamID): 684 self.priority.unblock(stream.streamID) 685 686 def getPeer(self): 687 """ 688 Get the remote address of this connection. 689 690 Treat this method with caution. It is the unfortunate result of the 691 CGI and Jabber standards, but should not be considered reliable for 692 the usual host of reasons; port forwarding, proxying, firewalls, IP 693 masquerading, etc. 694 695 @return: An L{IAddress} provider. 696 """ 697 return self.transport.getPeer() 698 699 def getHost(self): 700 """ 701 Similar to getPeer, but returns an address describing this side of the 702 connection. 703 704 @return: An L{IAddress} provider. 705 """ 706 return self.transport.getHost() 707 708 def openStreamWindow(self, streamID, increment): 709 """ 710 Open the stream window by a given increment. 711 712 @param streamID: The ID of the stream whose window needs to be opened. 713 @type streamID: L{int} 714 715 @param increment: The amount by which the stream window must be 716 incremented. 717 @type increment: L{int} 718 """ 719 self.conn.acknowledge_received_data(increment, streamID) 720 self._tryToWriteControlData() 721 722 def _isSecure(self): 723 """ 724 Returns L{True} if this channel is using a secure transport. 725 726 @returns: L{True} if this channel is secure. 727 @rtype: L{bool} 728 """ 729 # A channel is secure if its transport is ISSLTransport. 730 return ISSLTransport(self.transport, None) is not None 731 732 def _send100Continue(self, streamID): 733 """ 734 Sends a 100 Continue response, used to signal to clients that further 735 processing will be performed. 736 737 @param streamID: The ID of the stream that needs the 100 Continue 738 response 739 @type streamID: L{int} 740 """ 741 headers = [(b":status", b"100")] 742 self.conn.send_headers(headers=headers, stream_id=streamID) 743 self._tryToWriteControlData() 744 745 def _respondToBadRequestAndDisconnect(self, streamID): 746 """ 747 This is a quick and dirty way of responding to bad requests. 748 749 As described by HTTP standard we should be patient and accept the 750 whole request from the client before sending a polite bad request 751 response, even in the case when clients send tons of data. 752 753 Unlike in the HTTP/1.1 case, this does not actually disconnect the 754 underlying transport: there's no need. This instead just sends a 400 755 response and terminates the stream. 756 757 @param streamID: The ID of the stream that needs the 100 Continue 758 response 759 @type streamID: L{int} 760 """ 761 headers = [(b":status", b"400")] 762 self.conn.send_headers(headers=headers, stream_id=streamID, end_stream=True) 763 stillActive = self._tryToWriteControlData() 764 if stillActive: 765 stream = self.streams[streamID] 766 stream.connectionLost(Failure(ConnectionLost("Invalid request"))) 767 self._requestDone(streamID) 768 769 def _streamIsActive(self, streamID): 770 """ 771 Checks whether Twisted has still got state for a given stream and so 772 can process events for that stream. 773 774 @param streamID: The ID of the stream that needs processing. 775 @type streamID: L{int} 776 777 @return: Whether the stream still has state allocated. 778 @rtype: L{bool} 779 """ 780 return streamID in self.streams 781 782 def _tryToWriteControlData(self): 783 """ 784 Checks whether the connection is blocked on flow control and, 785 if it isn't, writes any buffered control data. 786 787 @return: L{True} if the connection is still active and 788 L{False} if it was aborted because too many bytes have 789 been written but not consumed by the other end. 790 """ 791 bufferedBytes = self.conn.data_to_send() 792 if not bufferedBytes: 793 return True 794 795 if self._consumerBlocked is None and not self._bufferedControlFrames: 796 # The consumer isn't blocked, and we don't have any buffered frames: 797 # write this directly. 798 self.transport.write(bufferedBytes) 799 return True 800 else: 801 # Either the consumer is blocked or we have buffered frames. If the 802 # consumer is blocked, we'll write this when we unblock. If we have 803 # buffered frames, we have presumably been re-entered from 804 # transport.write, and so to avoid reordering issues we'll buffer anyway. 805 self._bufferedControlFrames.append(bufferedBytes) 806 self._bufferedControlFrameBytes += len(bufferedBytes) 807 808 if self._bufferedControlFrameBytes >= self._maxBufferedControlFrameBytes: 809 maxBuffCtrlFrameBytes = self._maxBufferedControlFrameBytes 810 self._log.error( 811 "Maximum number of control frame bytes buffered: " 812 "{bufferedControlFrameBytes} > = " 813 "{maxBufferedControlFrameBytes}. " 814 "Aborting connection to client: {client} ", 815 bufferedControlFrameBytes=self._bufferedControlFrameBytes, 816 maxBufferedControlFrameBytes=maxBuffCtrlFrameBytes, 817 client=self.transport.getPeer(), 818 ) 819 # We've exceeded a reasonable buffer size for max buffered 820 # control frames. This is a denial of service risk, so we're 821 # going to drop this connection. 822 self.transport.abortConnection() 823 self.connectionLost(Failure(ExcessiveBufferingError())) 824 return False 825 return True 826 827 def _flushBufferedControlData(self, *args): 828 """ 829 Called when the connection is marked writable again after being marked unwritable. 830 Attempts to flush buffered control data if there is any. 831 """ 832 # To respect backpressure here we send each write in order, paying attention to whether 833 # we got blocked 834 while self._consumerBlocked is None and self._bufferedControlFrames: 835 nextWrite = self._bufferedControlFrames.popleft() 836 self._bufferedControlFrameBytes -= len(nextWrite) 837 self.transport.write(nextWrite) 838 839 840@implementer(ITransport, IConsumer, IPushProducer) 841class H2Stream: 842 """ 843 A class representing a single HTTP/2 stream. 844 845 This class works hand-in-hand with L{H2Connection}. It acts to provide an 846 implementation of L{ITransport}, L{IConsumer}, and L{IProducer} that work 847 for a single HTTP/2 connection, while tightly cleaving to the interface 848 provided by those interfaces. It does this by having a tight coupling to 849 L{H2Connection}, which allows associating many of the functions of 850 L{ITransport}, L{IConsumer}, and L{IProducer} to objects on a 851 stream-specific level. 852 853 @ivar streamID: The numerical stream ID that this object corresponds to. 854 @type streamID: L{int} 855 856 @ivar producing: Whether this stream is currently allowed to produce data 857 to its consumer. 858 @type producing: L{bool} 859 860 @ivar command: The HTTP verb used on the request. 861 @type command: L{unicode} 862 863 @ivar path: The HTTP path used on the request. 864 @type path: L{unicode} 865 866 @ivar producer: The object producing the response, if any. 867 @type producer: L{IProducer} 868 869 @ivar site: The L{twisted.web.server.Site} object this stream belongs to, 870 if any. 871 @type site: L{twisted.web.server.Site} 872 873 @ivar factory: The L{twisted.web.http.HTTPFactory} object that constructed 874 this stream's parent connection. 875 @type factory: L{twisted.web.http.HTTPFactory} 876 877 @ivar _producerProducing: Whether the producer stored in producer is 878 currently producing data. 879 @type _producerProducing: L{bool} 880 881 @ivar _inboundDataBuffer: Any data that has been received from the network 882 but has not yet been received by the consumer. 883 @type _inboundDataBuffer: A L{collections.deque} containing L{bytes} 884 885 @ivar _conn: A reference to the connection this stream belongs to. 886 @type _conn: L{H2Connection} 887 888 @ivar _request: A request object that this stream corresponds to. 889 @type _request: L{twisted.web.iweb.IRequest} 890 891 @ivar _buffer: A buffer containing data produced by the producer that could 892 not be sent on the network at this time. 893 @type _buffer: L{io.BytesIO} 894 """ 895 896 # We need a transport property for t.w.h.Request, but HTTP/2 doesn't want 897 # to expose it. So we just set it to None. 898 transport = None 899 900 def __init__(self, streamID, connection, headers, requestFactory, site, factory): 901 """ 902 Initialize this HTTP/2 stream. 903 904 @param streamID: The numerical stream ID that this object corresponds 905 to. 906 @type streamID: L{int} 907 908 @param connection: The HTTP/2 connection this stream belongs to. 909 @type connection: L{H2Connection} 910 911 @param headers: The HTTP/2 request headers. 912 @type headers: A L{list} of L{tuple}s of header name and header value, 913 both as L{bytes}. 914 915 @param requestFactory: A function that builds appropriate request 916 request objects. 917 @type requestFactory: A callable that returns a 918 L{twisted.web.iweb.IRequest}. 919 920 @param site: The L{twisted.web.server.Site} object this stream belongs 921 to, if any. 922 @type site: L{twisted.web.server.Site} 923 924 @param factory: The L{twisted.web.http.HTTPFactory} object that 925 constructed this stream's parent connection. 926 @type factory: L{twisted.web.http.HTTPFactory} 927 """ 928 929 self.streamID = streamID 930 self.site = site 931 self.factory = factory 932 self.producing = True 933 self.command = None 934 self.path = None 935 self.producer = None 936 self._producerProducing = False 937 self._hasStreamingProducer = None 938 self._inboundDataBuffer = deque() 939 self._conn = connection 940 self._request = requestFactory(self, queued=False) 941 self._buffer = io.BytesIO() 942 943 self._convertHeaders(headers) 944 945 def _convertHeaders(self, headers): 946 """ 947 This method converts the HTTP/2 header set into something that looks 948 like HTTP/1.1. In particular, it strips the 'special' headers and adds 949 a Host: header. 950 951 @param headers: The HTTP/2 header set. 952 @type headers: A L{list} of L{tuple}s of header name and header value, 953 both as L{bytes}. 954 """ 955 gotLength = False 956 957 for header in headers: 958 if not header[0].startswith(b":"): 959 gotLength = _addHeaderToRequest(self._request, header) or gotLength 960 elif header[0] == b":method": 961 self.command = header[1] 962 elif header[0] == b":path": 963 self.path = header[1] 964 elif header[0] == b":authority": 965 # This is essentially the Host: header from HTTP/1.1 966 _addHeaderToRequest(self._request, (b"host", header[1])) 967 968 if not gotLength: 969 if self.command in (b"GET", b"HEAD"): 970 self._request.gotLength(0) 971 else: 972 self._request.gotLength(None) 973 974 self._request.parseCookies() 975 expectContinue = self._request.requestHeaders.getRawHeaders(b"expect") 976 if expectContinue and expectContinue[0].lower() == b"100-continue": 977 self._send100Continue() 978 979 # Methods called by the H2Connection 980 def receiveDataChunk(self, data, flowControlledLength): 981 """ 982 Called when the connection has received a chunk of data from the 983 underlying transport. If the stream has been registered with a 984 consumer, and is currently able to push data, immediately passes it 985 through. Otherwise, buffers the chunk until we can start producing. 986 987 @param data: The chunk of data that was received. 988 @type data: L{bytes} 989 990 @param flowControlledLength: The total flow controlled length of this 991 chunk, which is used when we want to re-open the window. May be 992 different to C{len(data)}. 993 @type flowControlledLength: L{int} 994 """ 995 if not self.producing: 996 # Buffer data. 997 self._inboundDataBuffer.append((data, flowControlledLength)) 998 else: 999 self._request.handleContentChunk(data) 1000 self._conn.openStreamWindow(self.streamID, flowControlledLength) 1001 1002 def requestComplete(self): 1003 """ 1004 Called by the L{H2Connection} when the all data for a request has been 1005 received. Currently, with the legacy L{twisted.web.http.Request} 1006 object, just calls requestReceived unless the producer wants us to be 1007 quiet. 1008 """ 1009 if self.producing: 1010 self._request.requestReceived(self.command, self.path, b"HTTP/2") 1011 else: 1012 self._inboundDataBuffer.append((_END_STREAM_SENTINEL, None)) 1013 1014 def connectionLost(self, reason): 1015 """ 1016 Called by the L{H2Connection} when a connection is lost or a stream is 1017 reset. 1018 1019 @param reason: The reason the connection was lost. 1020 @type reason: L{str} 1021 """ 1022 self._request.connectionLost(reason) 1023 1024 def windowUpdated(self): 1025 """ 1026 Called by the L{H2Connection} when this stream's flow control window 1027 has been opened. 1028 """ 1029 # If we don't have a producer, we have no-one to tell. 1030 if not self.producer: 1031 return 1032 1033 # If we're not blocked on flow control, we don't care. 1034 if self._producerProducing: 1035 return 1036 1037 # We check whether the stream's flow control window is actually above 1038 # 0, and then, if a producer is registered and we still have space in 1039 # the window, we unblock it. 1040 remainingWindow = self._conn.remainingOutboundWindow(self.streamID) 1041 if not remainingWindow > 0: 1042 return 1043 1044 # We have a producer and space in the window, so that producer can 1045 # start producing again! 1046 self._producerProducing = True 1047 self.producer.resumeProducing() 1048 1049 def flowControlBlocked(self): 1050 """ 1051 Called by the L{H2Connection} when this stream's flow control window 1052 has been exhausted. 1053 """ 1054 if not self.producer: 1055 return 1056 1057 if self._producerProducing: 1058 self.producer.pauseProducing() 1059 self._producerProducing = False 1060 1061 # Methods called by the consumer (usually an IRequest). 1062 def writeHeaders(self, version, code, reason, headers): 1063 """ 1064 Called by the consumer to write headers to the stream. 1065 1066 @param version: The HTTP version. 1067 @type version: L{bytes} 1068 1069 @param code: The status code. 1070 @type code: L{int} 1071 1072 @param reason: The reason phrase. Ignored in HTTP/2. 1073 @type reason: L{bytes} 1074 1075 @param headers: The HTTP response headers. 1076 @type headers: Any iterable of two-tuples of L{bytes}, representing header 1077 names and header values. 1078 """ 1079 self._conn.writeHeaders(version, code, reason, headers, self.streamID) 1080 1081 def requestDone(self, request): 1082 """ 1083 Called by a consumer to clean up whatever permanent state is in use. 1084 1085 @param request: The request calling the method. 1086 @type request: L{twisted.web.iweb.IRequest} 1087 """ 1088 self._conn.endRequest(self.streamID) 1089 1090 def _send100Continue(self): 1091 """ 1092 Sends a 100 Continue response, used to signal to clients that further 1093 processing will be performed. 1094 """ 1095 self._conn._send100Continue(self.streamID) 1096 1097 def _respondToBadRequestAndDisconnect(self): 1098 """ 1099 This is a quick and dirty way of responding to bad requests. 1100 1101 As described by HTTP standard we should be patient and accept the 1102 whole request from the client before sending a polite bad request 1103 response, even in the case when clients send tons of data. 1104 1105 Unlike in the HTTP/1.1 case, this does not actually disconnect the 1106 underlying transport: there's no need. This instead just sends a 400 1107 response and terminates the stream. 1108 """ 1109 self._conn._respondToBadRequestAndDisconnect(self.streamID) 1110 1111 # Implementation: ITransport 1112 def write(self, data): 1113 """ 1114 Write a single chunk of data into a data frame. 1115 1116 @param data: The data chunk to send. 1117 @type data: L{bytes} 1118 """ 1119 self._conn.writeDataToStream(self.streamID, data) 1120 return 1121 1122 def writeSequence(self, iovec): 1123 """ 1124 Write a sequence of chunks of data into data frames. 1125 1126 @param iovec: A sequence of chunks to send. 1127 @type iovec: An iterable of L{bytes} chunks. 1128 """ 1129 for chunk in iovec: 1130 self.write(chunk) 1131 1132 def loseConnection(self): 1133 """ 1134 Close the connection after writing all pending data. 1135 """ 1136 self._conn.endRequest(self.streamID) 1137 1138 def abortConnection(self): 1139 """ 1140 Forcefully abort the connection by sending a RstStream frame. 1141 """ 1142 self._conn.abortRequest(self.streamID) 1143 1144 def getPeer(self): 1145 """ 1146 Get information about the peer. 1147 """ 1148 return self._conn.getPeer() 1149 1150 def getHost(self): 1151 """ 1152 Similar to getPeer, but for this side of the connection. 1153 """ 1154 return self._conn.getHost() 1155 1156 def isSecure(self): 1157 """ 1158 Returns L{True} if this channel is using a secure transport. 1159 1160 @returns: L{True} if this channel is secure. 1161 @rtype: L{bool} 1162 """ 1163 return self._conn._isSecure() 1164 1165 # Implementation: IConsumer 1166 def registerProducer(self, producer, streaming): 1167 """ 1168 Register to receive data from a producer. 1169 1170 This sets self to be a consumer for a producer. When this object runs 1171 out of data (as when a send(2) call on a socket succeeds in moving the 1172 last data from a userspace buffer into a kernelspace buffer), it will 1173 ask the producer to resumeProducing(). 1174 1175 For L{IPullProducer} providers, C{resumeProducing} will be called once 1176 each time data is required. 1177 1178 For L{IPushProducer} providers, C{pauseProducing} will be called 1179 whenever the write buffer fills up and C{resumeProducing} will only be 1180 called when it empties. 1181 1182 @param producer: The producer to register. 1183 @type producer: L{IProducer} provider 1184 1185 @param streaming: L{True} if C{producer} provides L{IPushProducer}, 1186 L{False} if C{producer} provides L{IPullProducer}. 1187 @type streaming: L{bool} 1188 1189 @raise RuntimeError: If a producer is already registered. 1190 1191 @return: L{None} 1192 """ 1193 if self.producer: 1194 raise ValueError( 1195 "registering producer %s before previous one (%s) was " 1196 "unregistered" % (producer, self.producer) 1197 ) 1198 1199 if not streaming: 1200 self.hasStreamingProducer = False 1201 producer = _PullToPush(producer, self) 1202 producer.startStreaming() 1203 else: 1204 self.hasStreamingProducer = True 1205 1206 self.producer = producer 1207 self._producerProducing = True 1208 1209 def unregisterProducer(self): 1210 """ 1211 @see: L{IConsumer.unregisterProducer} 1212 """ 1213 # When the producer is unregistered, we're done. 1214 if self.producer is not None and not self.hasStreamingProducer: 1215 self.producer.stopStreaming() 1216 1217 self._producerProducing = False 1218 self.producer = None 1219 self.hasStreamingProducer = None 1220 1221 # Implementation: IPushProducer 1222 def stopProducing(self): 1223 """ 1224 @see: L{IProducer.stopProducing} 1225 """ 1226 self.producing = False 1227 self.abortConnection() 1228 1229 def pauseProducing(self): 1230 """ 1231 @see: L{IPushProducer.pauseProducing} 1232 """ 1233 self.producing = False 1234 1235 def resumeProducing(self): 1236 """ 1237 @see: L{IPushProducer.resumeProducing} 1238 """ 1239 self.producing = True 1240 consumedLength = 0 1241 1242 while self.producing and self._inboundDataBuffer: 1243 # Allow for pauseProducing to be called in response to a call to 1244 # resumeProducing. 1245 chunk, flowControlledLength = self._inboundDataBuffer.popleft() 1246 1247 if chunk is _END_STREAM_SENTINEL: 1248 self.requestComplete() 1249 else: 1250 consumedLength += flowControlledLength 1251 self._request.handleContentChunk(chunk) 1252 1253 self._conn.openStreamWindow(self.streamID, consumedLength) 1254 1255 1256def _addHeaderToRequest(request, header): 1257 """ 1258 Add a header tuple to a request header object. 1259 1260 @param request: The request to add the header tuple to. 1261 @type request: L{twisted.web.http.Request} 1262 1263 @param header: The header tuple to add to the request. 1264 @type header: A L{tuple} with two elements, the header name and header 1265 value, both as L{bytes}. 1266 1267 @return: If the header being added was the C{Content-Length} header. 1268 @rtype: L{bool} 1269 """ 1270 requestHeaders = request.requestHeaders 1271 name, value = header 1272 values = requestHeaders.getRawHeaders(name) 1273 1274 if values is not None: 1275 values.append(value) 1276 else: 1277 requestHeaders.setRawHeaders(name, [value]) 1278 1279 if name == b"content-length": 1280 request.gotLength(int(value)) 1281 return True 1282 1283 return False 1284