1# -*- coding: utf-8 -*- 2""" 3h2/connection 4~~~~~~~~~~~~~ 5 6An implementation of a HTTP/2 connection. 7""" 8import base64 9 10from enum import Enum, IntEnum 11 12from hyperframe.exceptions import InvalidPaddingError 13from hyperframe.frame import ( 14 GoAwayFrame, WindowUpdateFrame, HeadersFrame, DataFrame, PingFrame, 15 PushPromiseFrame, SettingsFrame, RstStreamFrame, PriorityFrame, 16 ContinuationFrame, AltSvcFrame 17) 18from hpack.hpack import Encoder, Decoder 19from hpack.exceptions import HPACKError 20 21from .config import H2Configuration 22from .errors import ErrorCodes, _error_code_from_int 23from .events import ( 24 WindowUpdated, RemoteSettingsChanged, PingAcknowledged, 25 SettingsAcknowledged, ConnectionTerminated, PriorityUpdated, 26 AlternativeServiceAvailable, 27) 28from .exceptions import ( 29 ProtocolError, NoSuchStreamError, FlowControlError, FrameTooLargeError, 30 TooManyStreamsError, StreamClosedError, StreamIDTooLowError, 31 NoAvailableStreamIDError, RFC1122Error, DenialOfServiceError 32) 33from .frame_buffer import FrameBuffer 34from .settings import Settings, SettingCodes 35from .stream import H2Stream, StreamClosedBy 36from .utilities import guard_increment_window 37from .windows import WindowManager 38 39try: 40 from hpack.exceptions import OversizedHeaderListError 41except ImportError: # Platform-specific: HPACK < 2.3.0 42 # If the exception doesn't exist, it cannot possibly be thrown. Define a 43 # placeholder name, but don't otherwise worry about it. 44 class OversizedHeaderListError(Exception): 45 pass 46 47 48try: 49 from hyperframe.frame import ExtensionFrame 50except ImportError: # Platform-specific: Hyperframe < 5.0.0 51 # If the frame doesn't exist, that's just fine: we'll define it ourselves 52 # and the method will just never be called. 53 class ExtensionFrame(object): 54 pass 55 56 57class ConnectionState(Enum): 58 IDLE = 0 59 CLIENT_OPEN = 1 60 SERVER_OPEN = 2 61 CLOSED = 3 62 63 64class ConnectionInputs(Enum): 65 SEND_HEADERS = 0 66 SEND_PUSH_PROMISE = 1 67 SEND_DATA = 2 68 SEND_GOAWAY = 3 69 SEND_WINDOW_UPDATE = 4 70 SEND_PING = 5 71 SEND_SETTINGS = 6 72 SEND_RST_STREAM = 7 73 SEND_PRIORITY = 8 74 RECV_HEADERS = 9 75 RECV_PUSH_PROMISE = 10 76 RECV_DATA = 11 77 RECV_GOAWAY = 12 78 RECV_WINDOW_UPDATE = 13 79 RECV_PING = 14 80 RECV_SETTINGS = 15 81 RECV_RST_STREAM = 16 82 RECV_PRIORITY = 17 83 SEND_ALTERNATIVE_SERVICE = 18 # Added in 2.3.0 84 RECV_ALTERNATIVE_SERVICE = 19 # Added in 2.3.0 85 86 87class AllowedStreamIDs(IntEnum): 88 EVEN = 0 89 ODD = 1 90 91 92class H2ConnectionStateMachine(object): 93 """ 94 A single HTTP/2 connection state machine. 95 96 This state machine, while defined in its own class, is logically part of 97 the H2Connection class also defined in this file. The state machine itself 98 maintains very little state directly, instead focusing entirely on managing 99 state transitions. 100 """ 101 # For the purposes of this state machine we treat HEADERS and their 102 # associated CONTINUATION frames as a single jumbo frame. The protocol 103 # allows/requires this by preventing other frames from being interleved in 104 # between HEADERS/CONTINUATION frames. 105 # 106 # The _transitions dictionary contains a mapping of tuples of 107 # (state, input) to tuples of (side_effect_function, end_state). This map 108 # contains all allowed transitions: anything not in this map is invalid 109 # and immediately causes a transition to ``closed``. 110 111 _transitions = { 112 # State: idle 113 (ConnectionState.IDLE, ConnectionInputs.SEND_HEADERS): 114 (None, ConnectionState.CLIENT_OPEN), 115 (ConnectionState.IDLE, ConnectionInputs.RECV_HEADERS): 116 (None, ConnectionState.SERVER_OPEN), 117 (ConnectionState.IDLE, ConnectionInputs.SEND_SETTINGS): 118 (None, ConnectionState.IDLE), 119 (ConnectionState.IDLE, ConnectionInputs.RECV_SETTINGS): 120 (None, ConnectionState.IDLE), 121 (ConnectionState.IDLE, ConnectionInputs.SEND_WINDOW_UPDATE): 122 (None, ConnectionState.IDLE), 123 (ConnectionState.IDLE, ConnectionInputs.RECV_WINDOW_UPDATE): 124 (None, ConnectionState.IDLE), 125 (ConnectionState.IDLE, ConnectionInputs.SEND_PING): 126 (None, ConnectionState.IDLE), 127 (ConnectionState.IDLE, ConnectionInputs.RECV_PING): 128 (None, ConnectionState.IDLE), 129 (ConnectionState.IDLE, ConnectionInputs.SEND_GOAWAY): 130 (None, ConnectionState.CLOSED), 131 (ConnectionState.IDLE, ConnectionInputs.RECV_GOAWAY): 132 (None, ConnectionState.CLOSED), 133 (ConnectionState.IDLE, ConnectionInputs.SEND_PRIORITY): 134 (None, ConnectionState.IDLE), 135 (ConnectionState.IDLE, ConnectionInputs.RECV_PRIORITY): 136 (None, ConnectionState.IDLE), 137 (ConnectionState.IDLE, ConnectionInputs.SEND_ALTERNATIVE_SERVICE): 138 (None, ConnectionState.SERVER_OPEN), 139 (ConnectionState.IDLE, ConnectionInputs.RECV_ALTERNATIVE_SERVICE): 140 (None, ConnectionState.CLIENT_OPEN), 141 142 # State: open, client side. 143 (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_HEADERS): 144 (None, ConnectionState.CLIENT_OPEN), 145 (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_DATA): 146 (None, ConnectionState.CLIENT_OPEN), 147 (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_GOAWAY): 148 (None, ConnectionState.CLOSED), 149 (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_WINDOW_UPDATE): 150 (None, ConnectionState.CLIENT_OPEN), 151 (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_PING): 152 (None, ConnectionState.CLIENT_OPEN), 153 (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_SETTINGS): 154 (None, ConnectionState.CLIENT_OPEN), 155 (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_PRIORITY): 156 (None, ConnectionState.CLIENT_OPEN), 157 (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_HEADERS): 158 (None, ConnectionState.CLIENT_OPEN), 159 (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PUSH_PROMISE): 160 (None, ConnectionState.CLIENT_OPEN), 161 (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_DATA): 162 (None, ConnectionState.CLIENT_OPEN), 163 (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_GOAWAY): 164 (None, ConnectionState.CLOSED), 165 (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_WINDOW_UPDATE): 166 (None, ConnectionState.CLIENT_OPEN), 167 (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PING): 168 (None, ConnectionState.CLIENT_OPEN), 169 (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_SETTINGS): 170 (None, ConnectionState.CLIENT_OPEN), 171 (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_RST_STREAM): 172 (None, ConnectionState.CLIENT_OPEN), 173 (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_RST_STREAM): 174 (None, ConnectionState.CLIENT_OPEN), 175 (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PRIORITY): 176 (None, ConnectionState.CLIENT_OPEN), 177 (ConnectionState.CLIENT_OPEN, 178 ConnectionInputs.RECV_ALTERNATIVE_SERVICE): 179 (None, ConnectionState.CLIENT_OPEN), 180 181 # State: open, server side. 182 (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_HEADERS): 183 (None, ConnectionState.SERVER_OPEN), 184 (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PUSH_PROMISE): 185 (None, ConnectionState.SERVER_OPEN), 186 (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_DATA): 187 (None, ConnectionState.SERVER_OPEN), 188 (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_GOAWAY): 189 (None, ConnectionState.CLOSED), 190 (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_WINDOW_UPDATE): 191 (None, ConnectionState.SERVER_OPEN), 192 (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PING): 193 (None, ConnectionState.SERVER_OPEN), 194 (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_SETTINGS): 195 (None, ConnectionState.SERVER_OPEN), 196 (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PRIORITY): 197 (None, ConnectionState.SERVER_OPEN), 198 (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_HEADERS): 199 (None, ConnectionState.SERVER_OPEN), 200 (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_DATA): 201 (None, ConnectionState.SERVER_OPEN), 202 (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_GOAWAY): 203 (None, ConnectionState.CLOSED), 204 (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_WINDOW_UPDATE): 205 (None, ConnectionState.SERVER_OPEN), 206 (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_PING): 207 (None, ConnectionState.SERVER_OPEN), 208 (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_SETTINGS): 209 (None, ConnectionState.SERVER_OPEN), 210 (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_PRIORITY): 211 (None, ConnectionState.SERVER_OPEN), 212 (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_RST_STREAM): 213 (None, ConnectionState.SERVER_OPEN), 214 (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_RST_STREAM): 215 (None, ConnectionState.SERVER_OPEN), 216 (ConnectionState.SERVER_OPEN, 217 ConnectionInputs.SEND_ALTERNATIVE_SERVICE): 218 (None, ConnectionState.SERVER_OPEN), 219 (ConnectionState.SERVER_OPEN, 220 ConnectionInputs.RECV_ALTERNATIVE_SERVICE): 221 (None, ConnectionState.SERVER_OPEN), 222 223 # State: closed 224 (ConnectionState.CLOSED, ConnectionInputs.SEND_GOAWAY): 225 (None, ConnectionState.CLOSED), 226 (ConnectionState.CLOSED, ConnectionInputs.RECV_GOAWAY): 227 (None, ConnectionState.CLOSED), 228 } 229 230 def __init__(self): 231 self.state = ConnectionState.IDLE 232 233 def process_input(self, input_): 234 """ 235 Process a specific input in the state machine. 236 """ 237 if not isinstance(input_, ConnectionInputs): 238 raise ValueError("Input must be an instance of ConnectionInputs") 239 240 try: 241 func, target_state = self._transitions[(self.state, input_)] 242 except KeyError: 243 old_state = self.state 244 self.state = ConnectionState.CLOSED 245 raise ProtocolError( 246 "Invalid input %s in state %s" % (input_, old_state) 247 ) 248 else: 249 self.state = target_state 250 if func is not None: # pragma: no cover 251 return func() 252 253 return [] 254 255 256class H2Connection(object): 257 """ 258 A low-level HTTP/2 connection object. This handles building and receiving 259 frames and maintains both connection and per-stream state for all streams 260 on this connection. 261 262 This wraps a HTTP/2 Connection state machine implementation, ensuring that 263 frames can only be sent/received when the connection is in a valid state. 264 It also builds stream state machines on demand to ensure that the 265 constraints of those state machines are met as well. Attempts to create 266 frames that cannot be sent will raise a ``ProtocolError``. 267 268 .. versionchanged:: 2.3.0 269 Added the ``header_encoding`` keyword argument. 270 271 .. versionchanged:: 2.5.0 272 Added the ``config`` keyword argument. Deprecated the ``client_side`` 273 and ``header_encoding`` parameters. 274 275 :param client_side: Whether this object is to be used on the client side of 276 a connection, or on the server side. Affects the logic used by the 277 state machine, the default settings values, the allowable stream IDs, 278 and several other properties. Defaults to ``True``. 279 280 .. deprecated:: 2.5.0 281 282 :type client_side: ``bool`` 283 284 :param header_encoding: Controls whether the headers emitted by this object 285 in events are transparently decoded to ``unicode`` strings, and what 286 encoding is used to do that decoding. For historical reason, this 287 defaults to ``'utf-8'``. To prevent the decoding of headers (that is, 288 to force them to be returned as bytestrings), this can be set to 289 ``False`` or the empty string. 290 291 .. deprecated:: 2.5.0 292 293 :type header_encoding: ``str`` or ``False`` 294 295 :param config: The configuration for the HTTP/2 connection. If provided, 296 supersedes the deprecated ``client_side`` and ``header_encoding`` 297 values. 298 299 .. versionadded:: 2.5.0 300 301 :type config: :class:`H2Configuration <h2.config.H2Configuration>` 302 """ 303 # The initial maximum outbound frame size. This can be changed by receiving 304 # a settings frame. 305 DEFAULT_MAX_OUTBOUND_FRAME_SIZE = 65535 306 307 # The initial maximum inbound frame size. This is somewhat arbitrarily 308 # chosen. 309 DEFAULT_MAX_INBOUND_FRAME_SIZE = 2**24 310 311 # The highest acceptable stream ID. 312 HIGHEST_ALLOWED_STREAM_ID = 2**31 - 1 313 314 # The largest acceptable window increment. 315 MAX_WINDOW_INCREMENT = 2**31 - 1 316 317 # The initial default value of SETTINGS_MAX_HEADER_LIST_SIZE. 318 DEFAULT_MAX_HEADER_LIST_SIZE = 2**16 319 320 def __init__(self, client_side=True, header_encoding='utf-8', config=None): 321 self.state_machine = H2ConnectionStateMachine() 322 self.streams = {} 323 self.highest_inbound_stream_id = 0 324 self.highest_outbound_stream_id = 0 325 self.encoder = Encoder() 326 self.decoder = Decoder() 327 328 # This won't always actually do anything: for versions of HPACK older 329 # than 2.3.0 it does nothing. However, we have to try! 330 self.decoder.max_header_list_size = self.DEFAULT_MAX_HEADER_LIST_SIZE 331 332 #: The configuration for this HTTP/2 connection object. 333 #: 334 #: .. versionadded:: 2.5.0 335 self.config = config 336 if self.config is None: 337 self.config = H2Configuration( 338 client_side=client_side, 339 header_encoding=header_encoding, 340 ) 341 342 # Objects that store settings, including defaults. 343 # 344 # We set the MAX_CONCURRENT_STREAMS value to 100 because its default is 345 # unbounded, and that's a dangerous default because it allows 346 # essentially unbounded resources to be allocated regardless of how 347 # they will be used. 100 should be suitable for the average 348 # application. This default obviously does not apply to the remote 349 # peer's settings: the remote peer controls them! 350 # 351 # We also set MAX_HEADER_LIST_SIZE to a reasonable value. This is to 352 # advertise our defence against CVE-2016-6581. However, not all 353 # versions of HPACK will let us do it. That's ok: we should at least 354 # suggest that we're not vulnerable. 355 self.local_settings = Settings( 356 client=self.config.client_side, 357 initial_values={ 358 SettingCodes.MAX_CONCURRENT_STREAMS: 100, 359 SettingCodes.MAX_HEADER_LIST_SIZE: 360 self.DEFAULT_MAX_HEADER_LIST_SIZE, 361 } 362 ) 363 self.remote_settings = Settings(client=not self.config.client_side) 364 365 # The curent value of the connection flow control windows on the 366 # connection. 367 self.outbound_flow_control_window = ( 368 self.remote_settings.initial_window_size 369 ) 370 371 #: The maximum size of a frame that can be emitted by this peer, in 372 #: bytes. 373 self.max_outbound_frame_size = self.remote_settings.max_frame_size 374 375 #: The maximum size of a frame that can be received by this peer, in 376 #: bytes. 377 self.max_inbound_frame_size = self.local_settings.max_frame_size 378 379 # Buffer for incoming data. 380 self.incoming_buffer = FrameBuffer(server=not self.config.client_side) 381 382 # A private variable to store a sequence of received header frames 383 # until completion. 384 self._header_frames = [] 385 386 # Data that needs to be sent. 387 self._data_to_send = b'' 388 389 # Keeps track of how streams are closed. 390 # Used to ensure that we don't blow up in the face of frames that were 391 # in flight when a RST_STREAM was sent. 392 # Also used to determine whether we should consider a frame received 393 # while a stream is closed as either a stream error or a connection 394 # error. 395 self._closed_streams = {} 396 397 # The flow control window manager for the connection. 398 self._inbound_flow_control_window_manager = WindowManager( 399 max_window_size=self.local_settings.initial_window_size 400 ) 401 402 # When in doubt use dict-dispatch. 403 self._frame_dispatch_table = { 404 HeadersFrame: self._receive_headers_frame, 405 PushPromiseFrame: self._receive_push_promise_frame, 406 SettingsFrame: self._receive_settings_frame, 407 DataFrame: self._receive_data_frame, 408 WindowUpdateFrame: self._receive_window_update_frame, 409 PingFrame: self._receive_ping_frame, 410 RstStreamFrame: self._receive_rst_stream_frame, 411 PriorityFrame: self._receive_priority_frame, 412 GoAwayFrame: self._receive_goaway_frame, 413 ContinuationFrame: self._receive_naked_continuation, 414 AltSvcFrame: self._receive_alt_svc_frame, 415 ExtensionFrame: self._receive_unknown_frame 416 } 417 418 def _prepare_for_sending(self, frames): 419 if not frames: 420 return 421 self._data_to_send += b''.join(f.serialize() for f in frames) 422 assert all(f.body_len <= self.max_outbound_frame_size for f in frames) 423 424 def _open_streams(self, remainder): 425 """ 426 A common method of counting number of open streams. Returns the number 427 of streams that are open *and* that have (stream ID % 2) == remainder. 428 While it iterates, also deletes any closed streams. 429 """ 430 count = 0 431 to_delete = [] 432 433 for stream_id, stream in self.streams.items(): 434 if stream.open and (stream_id % 2 == remainder): 435 count += 1 436 elif stream.closed: 437 to_delete.append(stream_id) 438 439 for stream_id in to_delete: 440 stream = self.streams.pop(stream_id) 441 self._closed_streams[stream_id] = stream.closed_by 442 443 return count 444 445 @property 446 def open_outbound_streams(self): 447 """ 448 The current number of open outbound streams. 449 """ 450 outbound_numbers = int(self.config.client_side) 451 return self._open_streams(outbound_numbers) 452 453 @property 454 def open_inbound_streams(self): 455 """ 456 The current number of open inbound streams. 457 """ 458 inbound_numbers = int(not self.config.client_side) 459 return self._open_streams(inbound_numbers) 460 461 @property 462 def header_encoding(self): 463 """ 464 Controls whether the headers emitted by this object in events are 465 transparently decoded to ``unicode`` strings, and what encoding is used 466 to do that decoding. For historical reason, this defaults to 467 ``'utf-8'``. To prevent the decoding of headers (that is, to force them 468 to be returned as bytestrings), this can be set to ``False`` or the 469 empty string. 470 471 .. versionadded:: 2.3.0 472 473 .. deprecated:: 2.5.0 474 Use :data:`config <h2.connection.H2Connection.config>` instead. 475 """ 476 return self.config.header_encoding 477 478 @header_encoding.setter 479 def header_encoding(self, value): 480 """ 481 Setter for header encoding config value. 482 """ 483 self.config.header_encoding = value 484 485 @property 486 def client_side(self): 487 """ 488 Whether this object is to be used on the client side of a connection, 489 or on the server side. Affects the logic used by the state machine, the 490 default settings values, the allowable stream IDs, and several other 491 properties. Defaults to ``True``. 492 493 .. deprecated:: 2.5.0 494 Use :data:`config <h2.connection.H2Connection.config>` instead. 495 """ 496 return self.config.client_side 497 498 @property 499 def inbound_flow_control_window(self): 500 """ 501 The size of the inbound flow control window for the connection. This is 502 rarely publicly useful: instead, use :meth:`remote_flow_control_window 503 <h2.connection.H2Connection.remote_flow_control_window>`. This 504 shortcut is largely present to provide a shortcut to this data. 505 """ 506 return self._inbound_flow_control_window_manager.current_window_size 507 508 def _begin_new_stream(self, stream_id, allowed_ids): 509 """ 510 Initiate a new stream. 511 512 .. versionchanged:: 2.0.0 513 Removed this function from the public API. 514 515 :param stream_id: The ID of the stream to open. 516 :param allowed_ids: What kind of stream ID is allowed. 517 """ 518 self.config.logger.debug( 519 "Attempting to initiate stream ID %d", stream_id 520 ) 521 outbound = self._stream_id_is_outbound(stream_id) 522 highest_stream_id = ( 523 self.highest_outbound_stream_id if outbound else 524 self.highest_inbound_stream_id 525 ) 526 527 if stream_id <= highest_stream_id: 528 raise StreamIDTooLowError(stream_id, highest_stream_id) 529 530 if (stream_id % 2) != int(allowed_ids): 531 raise ProtocolError( 532 "Invalid stream ID for peer." 533 ) 534 535 s = H2Stream( 536 stream_id, 537 config=self.config, 538 inbound_window_size=self.local_settings.initial_window_size, 539 outbound_window_size=self.remote_settings.initial_window_size 540 ) 541 self.config.logger.debug("Stream ID %d created", stream_id) 542 s.max_inbound_frame_size = self.max_inbound_frame_size 543 s.max_outbound_frame_size = self.max_outbound_frame_size 544 545 self.streams[stream_id] = s 546 self.config.logger.debug("Current streams: %s", self.streams.keys()) 547 548 if outbound: 549 self.highest_outbound_stream_id = stream_id 550 else: 551 self.highest_inbound_stream_id = stream_id 552 553 return s 554 555 def initiate_connection(self): 556 """ 557 Provides any data that needs to be sent at the start of the connection. 558 Must be called for both clients and servers. 559 """ 560 self.config.logger.debug("Initializing connection") 561 self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS) 562 if self.config.client_side: 563 preamble = b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n' 564 else: 565 preamble = b'' 566 567 f = SettingsFrame(0) 568 for setting, value in self.local_settings.items(): 569 f.settings[setting] = value 570 self.config.logger.debug( 571 "Send Settings frame: %s", self.local_settings 572 ) 573 574 self._data_to_send += preamble + f.serialize() 575 576 def initiate_upgrade_connection(self, settings_header=None): 577 """ 578 Call to initialise the connection object for use with an upgraded 579 HTTP/2 connection (i.e. a connection negotiated using the 580 ``Upgrade: h2c`` HTTP header). 581 582 This method differs from :meth:`initiate_connection 583 <h2.connection.H2Connection.initiate_connection>` in several ways. 584 Firstly, it handles the additional SETTINGS frame that is sent in the 585 ``HTTP2-Settings`` header field. When called on a client connection, 586 this method will return a bytestring that the caller can put in the 587 ``HTTP2-Settings`` field they send on their initial request. When 588 called on a server connection, the user **must** provide the value they 589 received from the client in the ``HTTP2-Settings`` header field to the 590 ``settings_header`` argument, which will be used appropriately. 591 592 Additionally, this method sets up stream 1 in a half-closed state 593 appropriate for this side of the connection, to reflect the fact that 594 the request is already complete. 595 596 Finally, this method also prepares the appropriate preamble to be sent 597 after the upgrade. 598 599 .. versionadded:: 2.3.0 600 601 :param settings_header: (optional, server-only): The value of the 602 ``HTTP2-Settings`` header field received from the client. 603 :type settings_header: ``bytes`` 604 605 :returns: For clients, a bytestring to put in the ``HTTP2-Settings``. 606 For servers, returns nothing. 607 :rtype: ``bytes`` or ``None`` 608 """ 609 self.config.logger.debug( 610 "Upgrade connection. Current settings: %s", self.local_settings 611 ) 612 613 frame_data = None 614 # Begin by getting the preamble in place. 615 self.initiate_connection() 616 617 if self.config.client_side: 618 f = SettingsFrame(0) 619 for setting, value in self.local_settings.items(): 620 f.settings[setting] = value 621 622 frame_data = f.serialize_body() 623 frame_data = base64.urlsafe_b64encode(frame_data) 624 elif settings_header: 625 # We have a settings header from the client. This needs to be 626 # applied, but we want to throw away the ACK. We do this by 627 # inserting the data into a Settings frame and then passing it to 628 # the state machine, but ignoring the return value. 629 settings_header = base64.urlsafe_b64decode(settings_header) 630 f = SettingsFrame(0) 631 f.parse_body(settings_header) 632 self._receive_settings_frame(f) 633 634 # Set up appropriate state. Stream 1 in a half-closed state: 635 # half-closed(local) for clients, half-closed(remote) for servers. 636 # Additionally, we need to set up the Connection state machine. 637 connection_input = ( 638 ConnectionInputs.SEND_HEADERS if self.config.client_side 639 else ConnectionInputs.RECV_HEADERS 640 ) 641 self.config.logger.debug("Process input %s", connection_input) 642 self.state_machine.process_input(connection_input) 643 644 # Set up stream 1. 645 self._begin_new_stream(stream_id=1, allowed_ids=AllowedStreamIDs.ODD) 646 self.streams[1].upgrade(self.config.client_side) 647 return frame_data 648 649 def _get_or_create_stream(self, stream_id, allowed_ids): 650 """ 651 Gets a stream by its stream ID. Will create one if one does not already 652 exist. Use allowed_ids to circumvent the usual stream ID rules for 653 clients and servers. 654 655 .. versionchanged:: 2.0.0 656 Removed this function from the public API. 657 """ 658 try: 659 return self.streams[stream_id] 660 except KeyError: 661 return self._begin_new_stream(stream_id, allowed_ids) 662 663 def _get_stream_by_id(self, stream_id): 664 """ 665 Gets a stream by its stream ID. Raises NoSuchStreamError if the stream 666 ID does not correspond to a known stream and is higher than the current 667 maximum: raises if it is lower than the current maximum. 668 669 .. versionchanged:: 2.0.0 670 Removed this function from the public API. 671 """ 672 try: 673 return self.streams[stream_id] 674 except KeyError: 675 outbound = self._stream_id_is_outbound(stream_id) 676 highest_stream_id = ( 677 self.highest_outbound_stream_id if outbound else 678 self.highest_inbound_stream_id 679 ) 680 681 if stream_id > highest_stream_id: 682 raise NoSuchStreamError(stream_id) 683 else: 684 raise StreamClosedError(stream_id) 685 686 def get_next_available_stream_id(self): 687 """ 688 Returns an integer suitable for use as the stream ID for the next 689 stream created by this endpoint. For server endpoints, this stream ID 690 will be even. For client endpoints, this stream ID will be odd. If no 691 stream IDs are available, raises :class:`NoAvailableStreamIDError 692 <h2.exceptions.NoAvailableStreamIDError>`. 693 694 .. warning:: The return value from this function does not change until 695 the stream ID has actually been used by sending or pushing 696 headers on that stream. For that reason, it should be 697 called as close as possible to the actual use of the 698 stream ID. 699 700 .. versionadded:: 2.0.0 701 702 :raises: :class:`NoAvailableStreamIDError 703 <h2.exceptions.NoAvailableStreamIDError>` 704 :returns: The next free stream ID this peer can use to initiate a 705 stream. 706 :rtype: ``int`` 707 """ 708 # No streams have been opened yet, so return the lowest allowed stream 709 # ID. 710 if not self.highest_outbound_stream_id: 711 next_stream_id = 1 if self.config.client_side else 2 712 else: 713 next_stream_id = self.highest_outbound_stream_id + 2 714 self.config.logger.debug( 715 "Next available stream ID %d", next_stream_id 716 ) 717 if next_stream_id > self.HIGHEST_ALLOWED_STREAM_ID: 718 raise NoAvailableStreamIDError("Exhausted allowed stream IDs") 719 720 return next_stream_id 721 722 def send_headers(self, stream_id, headers, end_stream=False, 723 priority_weight=None, priority_depends_on=None, 724 priority_exclusive=None): 725 """ 726 Send headers on a given stream. 727 728 This function can be used to send request or response headers: the kind 729 that are sent depends on whether this connection has been opened as a 730 client or server connection, and whether the stream was opened by the 731 remote peer or not. 732 733 If this is a client connection, calling ``send_headers`` will send the 734 headers as a request. It will also implicitly open the stream being 735 used. If this is a client connection and ``send_headers`` has *already* 736 been called, this will send trailers instead. 737 738 If this is a server connection, calling ``send_headers`` will send the 739 headers as a response. It is a protocol error for a server to open a 740 stream by sending headers. If this is a server connection and 741 ``send_headers`` has *already* been called, this will send trailers 742 instead. 743 744 When acting as a server, you may call ``send_headers`` any number of 745 times allowed by the following rules, in this order: 746 747 - zero or more times with ``(':status', '1XX')`` (where ``1XX`` is a 748 placeholder for any 100-level status code). 749 - once with any other status header. 750 - zero or one time for trailers. 751 752 That is, you are allowed to send as many informational responses as you 753 like, followed by one complete response and zero or one HTTP trailer 754 blocks. 755 756 Clients may send one or two header blocks: one request block, and 757 optionally one trailer block. 758 759 If it is important to send HPACK "never indexed" header fields (as 760 defined in `RFC 7451 Section 7.1.3 761 <https://tools.ietf.org/html/rfc7541#section-7.1.3>`_), the user may 762 instead provide headers using the HPACK library's :class:`HeaderTuple 763 <hpack:hpack.HeaderTuple>` and :class:`NeverIndexedHeaderTuple 764 <hpack:hpack.NeverIndexedHeaderTuple>` objects. 765 766 This method also allows users to prioritize the stream immediately, 767 by sending priority information on the HEADERS frame directly. To do 768 this, any one of ``priority_weight``, ``priority_depends_on``, or 769 ``priority_exclusive`` must be set to a value that is not ``None``. For 770 more information on the priority fields, see :meth:`prioritize 771 <h2.connection.H2Connection.prioritize>`. 772 773 .. warning:: In HTTP/2, it is mandatory that all the HTTP/2 special 774 headers (that is, ones whose header keys begin with ``:``) appear 775 at the start of the header block, before any normal headers. 776 If you pass a dictionary to the ``headers`` parameter, it is 777 unlikely that they will iterate in that order, and your connection 778 may fail. For this reason, passing a ``dict`` to ``headers`` is 779 *deprecated*, and will be removed in 3.0. 780 781 .. versionchanged:: 2.3.0 782 Added support for using :class:`HeaderTuple 783 <hpack:hpack.HeaderTuple>` objects to store headers. 784 785 .. versionchanged:: 2.4.0 786 Added the ability to provide priority keyword arguments: 787 ``priority_weight``, ``priority_depends_on``, and 788 ``priority_exclusive``. 789 790 :param stream_id: The stream ID to send the headers on. If this stream 791 does not currently exist, it will be created. 792 :type stream_id: ``int`` 793 794 :param headers: The request/response headers to send. 795 :type headers: An iterable of two tuples of bytestrings or 796 :class:`HeaderTuple <hpack:hpack.HeaderTuple>` objects. 797 798 :param end_stream: Whether this headers frame should end the stream 799 immediately (that is, whether no more data will be sent after this 800 frame). Defaults to ``False``. 801 :type end_stream: ``bool`` 802 803 :param priority_weight: Sets the priority weight of the stream. See 804 :meth:`prioritize <h2.connection.H2Connection.prioritize>` for more 805 about how this field works. Defaults to ``None``, which means that 806 no priority information will be sent. 807 :type priority_weight: ``int`` or ``None`` 808 809 :param priority_depends_on: Sets which stream this one depends on for 810 priority purposes. See :meth:`prioritize 811 <h2.connection.H2Connection.prioritize>` for more about how this 812 field works. Defaults to ``None``, which means that no priority 813 information will be sent. 814 :type priority_depends_on: ``int`` or ``None`` 815 816 :param priority_exclusive: Sets whether this stream exclusively depends 817 on the stream given in ``priority_depends_on`` for priority 818 purposes. See :meth:`prioritize 819 <h2.connection.H2Connection.prioritize>` for more about how this 820 field workds. Defaults to ``None``, which means that no priority 821 information will be sent. 822 :type priority_depends_on: ``bool`` or ``None`` 823 824 :returns: Nothing 825 """ 826 self.config.logger.debug( 827 "Send headers on stream ID %d", stream_id 828 ) 829 830 # Check we can open the stream. 831 if stream_id not in self.streams: 832 max_open_streams = self.remote_settings.max_concurrent_streams 833 if (self.open_outbound_streams + 1) > max_open_streams: 834 raise TooManyStreamsError( 835 "Max outbound streams is %d, %d open" % 836 (max_open_streams, self.open_outbound_streams) 837 ) 838 839 self.state_machine.process_input(ConnectionInputs.SEND_HEADERS) 840 stream = self._get_or_create_stream( 841 stream_id, AllowedStreamIDs(self.config.client_side) 842 ) 843 frames = stream.send_headers( 844 headers, self.encoder, end_stream 845 ) 846 847 # We may need to send priority information. 848 priority_present = ( 849 (priority_weight is not None) or 850 (priority_depends_on is not None) or 851 (priority_exclusive is not None) 852 ) 853 854 if priority_present: 855 if not self.config.client_side: 856 raise RFC1122Error("Servers SHOULD NOT prioritize streams.") 857 858 headers_frame = frames[0] 859 headers_frame.flags.add('PRIORITY') 860 frames[0] = _add_frame_priority( 861 headers_frame, 862 priority_weight, 863 priority_depends_on, 864 priority_exclusive 865 ) 866 867 self._prepare_for_sending(frames) 868 869 def send_data(self, stream_id, data, end_stream=False, pad_length=None): 870 """ 871 Send data on a given stream. 872 873 This method does no breaking up of data: if the data is larger than the 874 value returned by :meth:`local_flow_control_window 875 <h2.connection.H2Connection.local_flow_control_window>` for this stream 876 then a :class:`FlowControlError <h2.exceptions.FlowControlError>` will 877 be raised. If the data is larger than :data:`max_outbound_frame_size 878 <h2.connection.H2Connection.max_outbound_frame_size>` then a 879 :class:`FrameTooLargeError <h2.exceptions.FrameTooLargeError>` will be 880 raised. 881 882 Hyper-h2 does this to avoid buffering the data internally. If the user 883 has more data to send than hyper-h2 will allow, consider breaking it up 884 and buffering it externally. 885 886 :param stream_id: The ID of the stream on which to send the data. 887 :type stream_id: ``int`` 888 :param data: The data to send on the stream. 889 :type data: ``bytes`` 890 :param end_stream: (optional) Whether this is the last data to be sent 891 on the stream. Defaults to ``False``. 892 :type end_stream: ``bool`` 893 :param pad_length: (optional) Length of the padding to apply to the 894 data frame. Defaults to ``None`` for no use of padding. Note that 895 a value of ``0`` results in padding of length ``0`` 896 (with the "padding" flag set on the frame). 897 898 .. versionadded:: 2.6.0 899 900 :type pad_length: ``int`` 901 :returns: Nothing 902 """ 903 self.config.logger.debug( 904 "Send data on stream ID %d with len %d", stream_id, len(data) 905 ) 906 frame_size = len(data) 907 if pad_length is not None: 908 if not isinstance(pad_length, int): 909 raise TypeError("pad_length must be an int") 910 if pad_length < 0 or pad_length > 255: 911 raise ValueError("pad_length must be within range: [0, 255]") 912 # Account for padding bytes plus the 1-byte padding length field. 913 frame_size += pad_length + 1 914 self.config.logger.debug( 915 "Frame size on stream ID %d is %d", stream_id, frame_size 916 ) 917 918 if frame_size > self.local_flow_control_window(stream_id): 919 raise FlowControlError( 920 "Cannot send %d bytes, flow control window is %d." % 921 (frame_size, self.local_flow_control_window(stream_id)) 922 ) 923 elif frame_size > self.max_outbound_frame_size: 924 raise FrameTooLargeError( 925 "Cannot send frame size %d, max frame size is %d" % 926 (frame_size, self.max_outbound_frame_size) 927 ) 928 929 self.state_machine.process_input(ConnectionInputs.SEND_DATA) 930 frames = self.streams[stream_id].send_data( 931 data, end_stream, pad_length=pad_length 932 ) 933 934 self._prepare_for_sending(frames) 935 936 self.outbound_flow_control_window -= frame_size 937 self.config.logger.debug( 938 "Outbound flow control window size is %d", 939 self.outbound_flow_control_window 940 ) 941 assert self.outbound_flow_control_window >= 0 942 943 def end_stream(self, stream_id): 944 """ 945 Cleanly end a given stream. 946 947 This method ends a stream by sending an empty DATA frame on that stream 948 with the ``END_STREAM`` flag set. 949 950 :param stream_id: The ID of the stream to end. 951 :type stream_id: ``int`` 952 :returns: Nothing 953 """ 954 self.config.logger.debug("End stream ID %d", stream_id) 955 self.state_machine.process_input(ConnectionInputs.SEND_DATA) 956 frames = self.streams[stream_id].end_stream() 957 self._prepare_for_sending(frames) 958 959 def increment_flow_control_window(self, increment, stream_id=None): 960 """ 961 Increment a flow control window, optionally for a single stream. Allows 962 the remote peer to send more data. 963 964 .. versionchanged:: 2.0.0 965 Rejects attempts to increment the flow control window by out of 966 range values with a ``ValueError``. 967 968 :param increment: The amount to increment the flow control window by. 969 :type increment: ``int`` 970 :param stream_id: (optional) The ID of the stream that should have its 971 flow control window opened. If not present or ``None``, the 972 connection flow control window will be opened instead. 973 :type stream_id: ``int`` or ``None`` 974 :returns: Nothing 975 :raises: ``ValueError`` 976 """ 977 if not (1 <= increment <= self.MAX_WINDOW_INCREMENT): 978 raise ValueError( 979 "Flow control increment must be between 1 and %d" % 980 self.MAX_WINDOW_INCREMENT 981 ) 982 983 self.state_machine.process_input(ConnectionInputs.SEND_WINDOW_UPDATE) 984 985 if stream_id is not None: 986 stream = self.streams[stream_id] 987 frames = stream.increase_flow_control_window( 988 increment 989 ) 990 else: 991 self._inbound_flow_control_window_manager.window_opened(increment) 992 f = WindowUpdateFrame(0) 993 f.window_increment = increment 994 frames = [f] 995 996 self.config.logger.debug( 997 "Increase stream ID %d flow control window by %d", 998 stream_id, increment 999 ) 1000 self._prepare_for_sending(frames) 1001 1002 def push_stream(self, stream_id, promised_stream_id, request_headers): 1003 """ 1004 Push a response to the client by sending a PUSH_PROMISE frame. 1005 1006 If it is important to send HPACK "never indexed" header fields (as 1007 defined in `RFC 7451 Section 7.1.3 1008 <https://tools.ietf.org/html/rfc7541#section-7.1.3>`_), the user may 1009 instead provide headers using the HPACK library's :class:`HeaderTuple 1010 <hpack:hpack.HeaderTuple>` and :class:`NeverIndexedHeaderTuple 1011 <hpack:hpack.NeverIndexedHeaderTuple>` objects. 1012 1013 :param stream_id: The ID of the stream that this push is a response to. 1014 :type stream_id: ``int`` 1015 :param promised_stream_id: The ID of the stream that the pushed 1016 response will be sent on. 1017 :type promised_stream_id: ``int`` 1018 :param request_headers: The headers of the request that the pushed 1019 response will be responding to. 1020 :type request_headers: An iterable of two tuples of bytestrings or 1021 :class:`HeaderTuple <hpack:hpack.HeaderTuple>` objects. 1022 :returns: Nothing 1023 """ 1024 self.config.logger.debug( 1025 "Send Push Promise frame on stream ID %d", stream_id 1026 ) 1027 1028 if not self.remote_settings.enable_push: 1029 raise ProtocolError("Remote peer has disabled stream push") 1030 1031 self.state_machine.process_input(ConnectionInputs.SEND_PUSH_PROMISE) 1032 stream = self._get_stream_by_id(stream_id) 1033 1034 # We need to prevent users pushing streams in response to streams that 1035 # they themselves have already pushed: see #163 and RFC 7540 § 6.6. The 1036 # easiest way to do that is to assert that the stream_id is not even: 1037 # this shortcut works because only servers can push and the state 1038 # machine will enforce this. 1039 if (stream_id % 2) == 0: 1040 raise ProtocolError("Cannot recursively push streams.") 1041 1042 new_stream = self._begin_new_stream( 1043 promised_stream_id, AllowedStreamIDs.EVEN 1044 ) 1045 self.streams[promised_stream_id] = new_stream 1046 1047 frames = stream.push_stream_in_band( 1048 promised_stream_id, request_headers, self.encoder 1049 ) 1050 new_frames = new_stream.locally_pushed() 1051 self._prepare_for_sending(frames + new_frames) 1052 1053 def ping(self, opaque_data): 1054 """ 1055 Send a PING frame. 1056 1057 :param opaque_data: A bytestring of length 8 that will be sent in the 1058 PING frame. 1059 :returns: Nothing 1060 """ 1061 self.config.logger.debug("Send Ping frame") 1062 1063 if not isinstance(opaque_data, bytes) or len(opaque_data) != 8: 1064 raise ValueError("Invalid value for ping data: %r" % opaque_data) 1065 1066 self.state_machine.process_input(ConnectionInputs.SEND_PING) 1067 f = PingFrame(0) 1068 f.opaque_data = opaque_data 1069 self._prepare_for_sending([f]) 1070 1071 def reset_stream(self, stream_id, error_code=0): 1072 """ 1073 Reset a stream. 1074 1075 This method forcibly closes a stream by sending a RST_STREAM frame for 1076 a given stream. This is not a graceful closure. To gracefully end a 1077 stream, try the :meth:`end_stream 1078 <h2.connection.H2Connection.end_stream>` method. 1079 1080 :param stream_id: The ID of the stream to reset. 1081 :type stream_id: ``int`` 1082 :param error_code: (optional) The error code to use to reset the 1083 stream. Defaults to :data:`ErrorCodes.NO_ERROR 1084 <h2.errors.ErrorCodes.NO_ERROR>`. 1085 :type error_code: ``int`` 1086 :returns: Nothing 1087 """ 1088 self.config.logger.debug("Reset stream ID %d", stream_id) 1089 self.state_machine.process_input(ConnectionInputs.SEND_RST_STREAM) 1090 stream = self._get_stream_by_id(stream_id) 1091 frames = stream.reset_stream(error_code) 1092 self._prepare_for_sending(frames) 1093 1094 def close_connection(self, error_code=0, additional_data=None, 1095 last_stream_id=None): 1096 1097 """ 1098 Close a connection, emitting a GOAWAY frame. 1099 1100 .. versionchanged:: 2.4.0 1101 Added ``additional_data`` and ``last_stream_id`` arguments. 1102 1103 :param error_code: (optional) The error code to send in the GOAWAY 1104 frame. 1105 :param additional_data: (optional) Additional debug data indicating 1106 a reason for closing the connection. Must be a bytestring. 1107 :param last_stream_id: (optional) The last stream which was processed 1108 by the sender. Defaults to ``highest_inbound_stream_id``. 1109 :returns: Nothing 1110 """ 1111 self.config.logger.debug("Close connection") 1112 self.state_machine.process_input(ConnectionInputs.SEND_GOAWAY) 1113 1114 # Additional_data must be bytes 1115 if additional_data is not None: 1116 assert isinstance(additional_data, bytes) 1117 1118 if last_stream_id is None: 1119 last_stream_id = self.highest_inbound_stream_id 1120 1121 f = GoAwayFrame( 1122 stream_id=0, 1123 last_stream_id=last_stream_id, 1124 error_code=error_code, 1125 additional_data=(additional_data or b'') 1126 ) 1127 self._prepare_for_sending([f]) 1128 1129 def update_settings(self, new_settings): 1130 """ 1131 Update the local settings. This will prepare and emit the appropriate 1132 SETTINGS frame. 1133 1134 :param new_settings: A dictionary of {setting: new value} 1135 """ 1136 self.config.logger.debug( 1137 "Update connection settings to %s", new_settings 1138 ) 1139 self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS) 1140 self.local_settings.update(new_settings) 1141 s = SettingsFrame(0) 1142 s.settings = new_settings 1143 self._prepare_for_sending([s]) 1144 1145 def advertise_alternative_service(self, 1146 field_value, 1147 origin=None, 1148 stream_id=None): 1149 """ 1150 Notify a client about an available Alternative Service. 1151 1152 An Alternative Service is defined in `RFC 7838 1153 <https://tools.ietf.org/html/rfc7838>`_. An Alternative Service 1154 notification informs a client that a given origin is also available 1155 elsewhere. 1156 1157 Alternative Services can be advertised in two ways. Firstly, they can 1158 be advertised explicitly: that is, a server can say "origin X is also 1159 available at Y". To advertise like this, set the ``origin`` argument 1160 and not the ``stream_id`` argument. Alternatively, they can be 1161 advertised implicitly: that is, a server can say "the origin you're 1162 contacting on stream X is also available at Y". To advertise like this, 1163 set the ``stream_id`` argument and not the ``origin`` argument. 1164 1165 The explicit method of advertising can be done as long as the 1166 connection is active. The implicit method can only be done after the 1167 client has sent the request headers and before the server has sent the 1168 response headers: outside of those points, Hyper-h2 will forbid sending 1169 the Alternative Service advertisement by raising a ProtocolError. 1170 1171 The ``field_value`` parameter is specified in RFC 7838. Hyper-h2 does 1172 not validate or introspect this argument: the user is required to 1173 ensure that it's well-formed. ``field_value`` corresponds to RFC 7838's 1174 "Alternative Service Field Value". 1175 1176 .. note:: It is strongly preferred to use the explicit method of 1177 advertising Alternative Services. The implicit method of 1178 advertising Alternative Services has a number of subtleties 1179 and can lead to inconsistencies between the server and 1180 client. Hyper-h2 allows both mechanisms, but caution is 1181 strongly advised. 1182 1183 .. versionadded:: 2.3.0 1184 1185 :param field_value: The RFC 7838 Alternative Service Field Value. This 1186 argument is not introspected by Hyper-h2: the user is responsible 1187 for ensuring that it is well-formed. 1188 :type field_value: ``bytes`` 1189 1190 :param origin: The origin/authority to which the Alternative Service 1191 being advertised applies. Must not be provided at the same time as 1192 ``stream_id``. 1193 :type origin: ``bytes`` or ``None`` 1194 1195 :param stream_id: The ID of the stream which was sent to the authority 1196 for which this Alternative Service advertisement applies. Must not 1197 be provided at the same time as ``origin``. 1198 :type stream_id: ``int`` or ``None`` 1199 1200 :returns: Nothing. 1201 """ 1202 if not isinstance(field_value, bytes): 1203 raise ValueError("Field must be bytestring.") 1204 1205 if origin is not None and stream_id is not None: 1206 raise ValueError("Must not provide both origin and stream_id") 1207 1208 self.state_machine.process_input( 1209 ConnectionInputs.SEND_ALTERNATIVE_SERVICE 1210 ) 1211 1212 if origin is not None: 1213 # This ALTSVC is sent on stream zero. 1214 f = AltSvcFrame(stream_id=0) 1215 f.origin = origin 1216 f.field = field_value 1217 frames = [f] 1218 else: 1219 stream = self._get_stream_by_id(stream_id) 1220 frames = stream.advertise_alternative_service(field_value) 1221 1222 self._prepare_for_sending(frames) 1223 1224 def prioritize(self, stream_id, weight=None, depends_on=None, 1225 exclusive=None): 1226 """ 1227 Notify a server about the priority of a stream. 1228 1229 Stream priorities are a form of guidance to a remote server: they 1230 inform the server about how important a given response is, so that the 1231 server may allocate its resources (e.g. bandwidth, CPU time, etc.) 1232 accordingly. This exists to allow clients to ensure that the most 1233 important data arrives earlier, while less important data does not 1234 starve out the more important data. 1235 1236 Stream priorities are explained in depth in `RFC 7540 Section 5.3 1237 <https://tools.ietf.org/html/rfc7540#section-5.3>`_. 1238 1239 This method updates the priority information of a single stream. It may 1240 be called well before a stream is actively in use, or well after a 1241 stream is closed. 1242 1243 .. warning:: RFC 7540 allows for servers to change the priority of 1244 streams. However, hyper-h2 **does not** allow server 1245 stacks to do this. This is because most clients do not 1246 adequately know how to respond when provided conflicting 1247 priority information, and relatively little utility is 1248 provided by making that functionality available. 1249 1250 .. note:: hyper-h2 **does not** maintain any information about the 1251 RFC 7540 priority tree. That means that hyper-h2 does not 1252 prevent incautious users from creating invalid priority 1253 trees, particularly by creating priority loops. While some 1254 basic error checking is provided by hyper-h2, users are 1255 strongly recommended to understand their prioritisation 1256 strategies before using the priority tools here. 1257 1258 .. note:: Priority information is strictly advisory. Servers are 1259 allowed to disregard it entirely. Avoid relying on the idea 1260 that your priority signaling will definitely be obeyed. 1261 1262 .. versionadded:: 2.4.0 1263 1264 :param stream_id: The ID of the stream to prioritize. 1265 :type stream_id: ``int`` 1266 1267 :param weight: The weight to give the stream. Defaults to ``16``, the 1268 default weight of any stream. May be any value between ``1`` and 1269 ``256`` inclusive. The relative weight of a stream indicates what 1270 proportion of available resources will be allocated to that 1271 stream. 1272 :type weight: ``int`` 1273 1274 :param depends_on: The ID of the stream on which this stream depends. 1275 This stream will only be progressed if it is impossible to 1276 progress the parent stream (the one on which this one depends). 1277 Passing the value ``0`` means that this stream does not depend on 1278 any other. Defaults to ``0``. 1279 :type depends_on: ``int`` 1280 1281 :param exclusive: Whether this stream is an exclusive dependency of its 1282 "parent" stream (i.e. the stream given by ``depends_on``). If a 1283 stream is an exclusive dependency of another, that means that all 1284 previously-set children of the parent are moved to become children 1285 of the new exclusively-dependent stream. Defaults to ``False``. 1286 :type exclusive: ``bool`` 1287 """ 1288 if not self.config.client_side: 1289 raise RFC1122Error("Servers SHOULD NOT prioritize streams.") 1290 1291 self.state_machine.process_input( 1292 ConnectionInputs.SEND_PRIORITY 1293 ) 1294 1295 frame = PriorityFrame(stream_id) 1296 frame = _add_frame_priority(frame, weight, depends_on, exclusive) 1297 1298 self._prepare_for_sending([frame]) 1299 1300 def local_flow_control_window(self, stream_id): 1301 """ 1302 Returns the maximum amount of data that can be sent on stream 1303 ``stream_id``. 1304 1305 This value will never be larger than the total data that can be sent on 1306 the connection: even if the given stream allows more data, the 1307 connection window provides a logical maximum to the amount of data that 1308 can be sent. 1309 1310 The maximum data that can be sent in a single data frame on a stream 1311 is either this value, or the maximum frame size, whichever is 1312 *smaller*. 1313 1314 :param stream_id: The ID of the stream whose flow control window is 1315 being queried. 1316 :type stream_id: ``int`` 1317 :returns: The amount of data in bytes that can be sent on the stream 1318 before the flow control window is exhausted. 1319 :rtype: ``int`` 1320 """ 1321 stream = self._get_stream_by_id(stream_id) 1322 return min( 1323 self.outbound_flow_control_window, 1324 stream.outbound_flow_control_window 1325 ) 1326 1327 def remote_flow_control_window(self, stream_id): 1328 """ 1329 Returns the maximum amount of data the remote peer can send on stream 1330 ``stream_id``. 1331 1332 This value will never be larger than the total data that can be sent on 1333 the connection: even if the given stream allows more data, the 1334 connection window provides a logical maximum to the amount of data that 1335 can be sent. 1336 1337 The maximum data that can be sent in a single data frame on a stream 1338 is either this value, or the maximum frame size, whichever is 1339 *smaller*. 1340 1341 :param stream_id: The ID of the stream whose flow control window is 1342 being queried. 1343 :type stream_id: ``int`` 1344 :returns: The amount of data in bytes that can be received on the 1345 stream before the flow control window is exhausted. 1346 :rtype: ``int`` 1347 """ 1348 stream = self._get_stream_by_id(stream_id) 1349 return min( 1350 self.inbound_flow_control_window, 1351 stream.inbound_flow_control_window 1352 ) 1353 1354 def acknowledge_received_data(self, acknowledged_size, stream_id): 1355 """ 1356 Inform the :class:`H2Connection <h2.connection.H2Connection>` that a 1357 certain number of flow-controlled bytes have been processed, and that 1358 the space should be handed back to the remote peer at an opportune 1359 time. 1360 1361 .. versionadded:: 2.5.0 1362 1363 :param acknowledged_size: The total *flow-controlled size* of the data 1364 that has been processed. Note that this must include the amount of 1365 padding that was sent with that data. 1366 :type acknowledged_size: ``int`` 1367 :param stream_id: The ID of the stream on which this data was received. 1368 :type stream_id: ``int`` 1369 :returns: Nothing 1370 :rtype: ``None`` 1371 """ 1372 self.config.logger.debug( 1373 "Ack received data on stream ID %d with size %d", 1374 stream_id, acknowledged_size 1375 ) 1376 if stream_id <= 0: 1377 raise ValueError( 1378 "Stream ID %d is not valid for acknowledge_received_data" % 1379 stream_id 1380 ) 1381 if acknowledged_size < 0: 1382 raise ValueError("Cannot acknowledge negative data") 1383 1384 frames = [] 1385 1386 conn_manager = self._inbound_flow_control_window_manager 1387 conn_increment = conn_manager.process_bytes(acknowledged_size) 1388 if conn_increment: 1389 f = WindowUpdateFrame(0) 1390 f.window_increment = conn_increment 1391 frames.append(f) 1392 1393 try: 1394 stream = self._get_stream_by_id(stream_id) 1395 except StreamClosedError: 1396 # The stream is already gone. We're not worried about incrementing 1397 # the window in this case. 1398 pass 1399 else: 1400 # No point incrementing the windows of closed streams. 1401 if stream.open: 1402 frames.extend( 1403 stream.acknowledge_received_data(acknowledged_size) 1404 ) 1405 1406 self._prepare_for_sending(frames) 1407 1408 def data_to_send(self, amt=None): 1409 """ 1410 Returns some data for sending out of the internal data buffer. 1411 1412 This method is analogous to ``read`` on a file-like object, but it 1413 doesn't block. Instead, it returns as much data as the user asks for, 1414 or less if that much data is not available. It does not perform any 1415 I/O, and so uses a different name. 1416 1417 :param amt: (optional) The maximum amount of data to return. If not 1418 set, or set to ``None``, will return as much data as possible. 1419 :type amt: ``int`` 1420 :returns: A bytestring containing the data to send on the wire. 1421 :rtype: ``bytes`` 1422 """ 1423 if amt is None: 1424 data = self._data_to_send 1425 self._data_to_send = b'' 1426 return data 1427 else: 1428 data = self._data_to_send[:amt] 1429 self._data_to_send = self._data_to_send[amt:] 1430 return data 1431 1432 def clear_outbound_data_buffer(self): 1433 """ 1434 Clears the outbound data buffer, such that if this call was immediately 1435 followed by a call to 1436 :meth:`data_to_send <h2.connection.H2Connection.data_to_send>`, that 1437 call would return no data. 1438 1439 This method should not normally be used, but is made available to avoid 1440 exposing implementation details. 1441 """ 1442 self._data_to_send = b'' 1443 1444 def _acknowledge_settings(self): 1445 """ 1446 Acknowledge settings that have been received. 1447 1448 .. versionchanged:: 2.0.0 1449 Removed from public API, removed useless ``event`` parameter, made 1450 automatic. 1451 1452 :returns: Nothing 1453 """ 1454 self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS) 1455 1456 changes = self.remote_settings.acknowledge() 1457 1458 if SettingCodes.INITIAL_WINDOW_SIZE in changes: 1459 setting = changes[SettingCodes.INITIAL_WINDOW_SIZE] 1460 self._flow_control_change_from_settings( 1461 setting.original_value, 1462 setting.new_value, 1463 ) 1464 1465 # HEADER_TABLE_SIZE changes by the remote part affect our encoder: cf. 1466 # RFC 7540 Section 6.5.2. 1467 if SettingCodes.HEADER_TABLE_SIZE in changes: 1468 setting = changes[SettingCodes.HEADER_TABLE_SIZE] 1469 self.encoder.header_table_size = setting.new_value 1470 1471 if SettingCodes.MAX_FRAME_SIZE in changes: 1472 setting = changes[SettingCodes.MAX_FRAME_SIZE] 1473 self.max_outbound_frame_size = setting.new_value 1474 for stream in self.streams.values(): 1475 stream.max_outbound_frame_size = setting.new_value 1476 1477 f = SettingsFrame(0) 1478 f.flags.add('ACK') 1479 return [f] 1480 1481 def _flow_control_change_from_settings(self, old_value, new_value): 1482 """ 1483 Update flow control windows in response to a change in the value of 1484 SETTINGS_INITIAL_WINDOW_SIZE. 1485 1486 When this setting is changed, it automatically updates all flow control 1487 windows by the delta in the settings values. Note that it does not 1488 increment the *connection* flow control window, per section 6.9.2 of 1489 RFC 7540. 1490 """ 1491 delta = new_value - old_value 1492 1493 for stream in self.streams.values(): 1494 stream.outbound_flow_control_window = guard_increment_window( 1495 stream.outbound_flow_control_window, 1496 delta 1497 ) 1498 1499 def _inbound_flow_control_change_from_settings(self, old_value, new_value): 1500 """ 1501 Update remote flow control windows in response to a change in the value 1502 of SETTINGS_INITIAL_WINDOW_SIZE. 1503 1504 When this setting is changed, it automatically updates all remote flow 1505 control windows by the delta in the settings values. 1506 """ 1507 delta = new_value - old_value 1508 1509 for stream in self.streams.values(): 1510 stream._inbound_flow_control_change_from_settings(delta) 1511 1512 def receive_data(self, data): 1513 """ 1514 Pass some received HTTP/2 data to the connection for handling. 1515 1516 :param data: The data received from the remote peer on the network. 1517 :type data: ``bytes`` 1518 :returns: A list of events that the remote peer triggered by sending 1519 this data. 1520 """ 1521 self.config.logger.debug( 1522 "Process received data on connection. Received data: %r", data 1523 ) 1524 1525 events = [] 1526 self.incoming_buffer.add_data(data) 1527 self.incoming_buffer.max_frame_size = self.max_inbound_frame_size 1528 1529 try: 1530 for frame in self.incoming_buffer: 1531 events.extend(self._receive_frame(frame)) 1532 except InvalidPaddingError: 1533 self._terminate_connection(ErrorCodes.PROTOCOL_ERROR) 1534 raise ProtocolError("Received frame with invalid padding.") 1535 except ProtocolError as e: 1536 # For whatever reason, receiving the frame caused a protocol error. 1537 # We should prepare to emit a GoAway frame before throwing the 1538 # exception up further. No need for an event: the exception will 1539 # do fine. 1540 self._terminate_connection(e.error_code) 1541 raise 1542 1543 return events 1544 1545 def _receive_frame(self, frame): 1546 """ 1547 Handle a frame received on the connection. 1548 1549 .. versionchanged:: 2.0.0 1550 Removed from the public API. 1551 """ 1552 try: 1553 # I don't love using __class__ here, maybe reconsider it. 1554 frames, events = self._frame_dispatch_table[frame.__class__](frame) 1555 except StreamClosedError as e: 1556 # If the stream was closed by RST_STREAM, we just send a RST_STREAM 1557 # to the remote peer. Otherwise, this is a connection error, and so 1558 # we will re-raise to trigger one. 1559 if self._stream_is_closed_by_reset(e.stream_id): 1560 f = RstStreamFrame(e.stream_id) 1561 f.error_code = e.error_code 1562 self._prepare_for_sending([f]) 1563 events = e._events 1564 else: 1565 raise 1566 except StreamIDTooLowError as e: 1567 # The stream ID seems invalid. This may happen when the closed 1568 # stream has been cleaned up, or when the remote peer has opened a 1569 # new stream with a higher stream ID than this one, forcing it 1570 # closed implicitly. 1571 # 1572 # Check how the stream was closed: depending on the mechanism, it 1573 # is either a stream error or a connection error. 1574 if self._stream_is_closed_by_reset(e.stream_id): 1575 # Closed by RST_STREAM is a stream error. 1576 f = RstStreamFrame(e.stream_id) 1577 f.error_code = ErrorCodes.STREAM_CLOSED 1578 self._prepare_for_sending([f]) 1579 events = [] 1580 elif self._stream_is_closed_by_end(e.stream_id): 1581 # Closed by END_STREAM is a connection error. 1582 raise StreamClosedError(e.stream_id) 1583 else: 1584 # Closed implicitly, also a connection error, but of type 1585 # PROTOCOL_ERROR. 1586 raise 1587 else: 1588 self._prepare_for_sending(frames) 1589 1590 return events 1591 1592 def _terminate_connection(self, error_code): 1593 """ 1594 Terminate the connection early. Used in error handling blocks to send 1595 GOAWAY frames. 1596 """ 1597 f = GoAwayFrame(0) 1598 f.last_stream_id = self.highest_inbound_stream_id 1599 f.error_code = error_code 1600 self.state_machine.process_input(ConnectionInputs.SEND_GOAWAY) 1601 self._prepare_for_sending([f]) 1602 1603 def _receive_headers_frame(self, frame): 1604 """ 1605 Receive a headers frame on the connection. 1606 """ 1607 # If necessary, check we can open the stream. Also validate that the 1608 # stream ID is valid. 1609 if frame.stream_id not in self.streams: 1610 max_open_streams = self.local_settings.max_concurrent_streams 1611 if (self.open_inbound_streams + 1) > max_open_streams: 1612 raise TooManyStreamsError( 1613 "Max outbound streams is %d, %d open" % 1614 (max_open_streams, self.open_outbound_streams) 1615 ) 1616 1617 # Let's decode the headers. We handle headers as bytes internally up 1618 # until we hang them off the event, at which point we may optionally 1619 # convert them to unicode. 1620 headers = _decode_headers(self.decoder, frame.data) 1621 1622 events = self.state_machine.process_input( 1623 ConnectionInputs.RECV_HEADERS 1624 ) 1625 stream = self._get_or_create_stream( 1626 frame.stream_id, AllowedStreamIDs(not self.config.client_side) 1627 ) 1628 frames, stream_events = stream.receive_headers( 1629 headers, 1630 'END_STREAM' in frame.flags, 1631 self.config.header_encoding 1632 ) 1633 1634 if 'PRIORITY' in frame.flags: 1635 p_frames, p_events = self._receive_priority_frame(frame) 1636 stream_events[0].priority_updated = p_events[0] 1637 stream_events.extend(p_events) 1638 assert not p_frames 1639 1640 return frames, events + stream_events 1641 1642 def _receive_push_promise_frame(self, frame): 1643 """ 1644 Receive a push-promise frame on the connection. 1645 """ 1646 if not self.local_settings.enable_push: 1647 raise ProtocolError("Received pushed stream") 1648 1649 pushed_headers = _decode_headers(self.decoder, frame.data) 1650 1651 events = self.state_machine.process_input( 1652 ConnectionInputs.RECV_PUSH_PROMISE 1653 ) 1654 1655 try: 1656 stream = self._get_stream_by_id(frame.stream_id) 1657 except NoSuchStreamError: 1658 # We need to check if the parent stream was reset by us. If it was 1659 # then we presume that the PUSH_PROMISE was in flight when we reset 1660 # the parent stream. Rather than accept the new stream, just reset 1661 # it. 1662 # 1663 # If this was closed naturally, however, we should call this a 1664 # PROTOCOL_ERROR: pushing a stream on a naturally closed stream is 1665 # a real problem because it creates a brand new stream that the 1666 # remote peer now believes exists. 1667 if (self._stream_closed_by(frame.stream_id) == 1668 StreamClosedBy.SEND_RST_STREAM): 1669 f = RstStreamFrame(frame.promised_stream_id) 1670 f.error_code = ErrorCodes.REFUSED_STREAM 1671 return [f], events 1672 1673 raise ProtocolError("Attempted to push on closed stream.") 1674 1675 # We need to prevent peers pushing streams in response to streams that 1676 # they themselves have already pushed: see #163 and RFC 7540 § 6.6. The 1677 # easiest way to do that is to assert that the stream_id is not even: 1678 # this shortcut works because only servers can push and the state 1679 # machine will enforce this. 1680 if (frame.stream_id % 2) == 0: 1681 raise ProtocolError("Cannot recursively push streams.") 1682 1683 try: 1684 frames, stream_events = stream.receive_push_promise_in_band( 1685 frame.promised_stream_id, 1686 pushed_headers, 1687 self.config.header_encoding, 1688 ) 1689 except StreamClosedError: 1690 # The parent stream was reset by us, so we presume that 1691 # PUSH_PROMISE was in flight when we reset the parent stream. 1692 # So we just reset the new stream. 1693 f = RstStreamFrame(frame.promised_stream_id) 1694 f.error_code = ErrorCodes.REFUSED_STREAM 1695 return [f], events 1696 1697 new_stream = self._begin_new_stream( 1698 frame.promised_stream_id, AllowedStreamIDs.EVEN 1699 ) 1700 self.streams[frame.promised_stream_id] = new_stream 1701 new_stream.remotely_pushed(pushed_headers) 1702 1703 return frames, events + stream_events 1704 1705 def _receive_data_frame(self, frame): 1706 """ 1707 Receive a data frame on the connection. 1708 """ 1709 flow_controlled_length = frame.flow_controlled_length 1710 1711 events = self.state_machine.process_input( 1712 ConnectionInputs.RECV_DATA 1713 ) 1714 self._inbound_flow_control_window_manager.window_consumed( 1715 flow_controlled_length 1716 ) 1717 stream = self._get_stream_by_id(frame.stream_id) 1718 frames, stream_events = stream.receive_data( 1719 frame.data, 1720 'END_STREAM' in frame.flags, 1721 flow_controlled_length 1722 ) 1723 return frames, events + stream_events 1724 1725 def _receive_settings_frame(self, frame): 1726 """ 1727 Receive a SETTINGS frame on the connection. 1728 """ 1729 events = self.state_machine.process_input( 1730 ConnectionInputs.RECV_SETTINGS 1731 ) 1732 1733 # This is an ack of the local settings. 1734 if 'ACK' in frame.flags: 1735 changed_settings = self._local_settings_acked() 1736 ack_event = SettingsAcknowledged() 1737 ack_event.changed_settings = changed_settings 1738 events.append(ack_event) 1739 return [], events 1740 1741 # Add the new settings. 1742 self.remote_settings.update(frame.settings) 1743 events.append( 1744 RemoteSettingsChanged.from_settings( 1745 self.remote_settings, frame.settings 1746 ) 1747 ) 1748 frames = self._acknowledge_settings() 1749 1750 return frames, events 1751 1752 def _receive_window_update_frame(self, frame): 1753 """ 1754 Receive a WINDOW_UPDATE frame on the connection. 1755 """ 1756 # Validate the frame. 1757 if not (1 <= frame.window_increment <= self.MAX_WINDOW_INCREMENT): 1758 raise ProtocolError( 1759 "Flow control increment must be between 1 and %d, received %d" 1760 % (self.MAX_WINDOW_INCREMENT, frame.window_increment) 1761 ) 1762 1763 events = self.state_machine.process_input( 1764 ConnectionInputs.RECV_WINDOW_UPDATE 1765 ) 1766 1767 if frame.stream_id: 1768 stream = self._get_stream_by_id(frame.stream_id) 1769 frames, stream_events = stream.receive_window_update( 1770 frame.window_increment 1771 ) 1772 else: 1773 # Increment our local flow control window. 1774 self.outbound_flow_control_window = guard_increment_window( 1775 self.outbound_flow_control_window, 1776 frame.window_increment 1777 ) 1778 1779 # FIXME: Should we split this into one event per active stream? 1780 window_updated_event = WindowUpdated() 1781 window_updated_event.stream_id = 0 1782 window_updated_event.delta = frame.window_increment 1783 stream_events = [window_updated_event] 1784 frames = [] 1785 1786 return frames, events + stream_events 1787 1788 def _receive_ping_frame(self, frame): 1789 """ 1790 Receive a PING frame on the connection. 1791 """ 1792 events = self.state_machine.process_input( 1793 ConnectionInputs.RECV_PING 1794 ) 1795 flags = [] 1796 1797 if 'ACK' in frame.flags: 1798 evt = PingAcknowledged() 1799 evt.ping_data = frame.opaque_data 1800 events.append(evt) 1801 else: 1802 f = PingFrame(0) 1803 f.flags = {'ACK'} 1804 f.opaque_data = frame.opaque_data 1805 flags.append(f) 1806 1807 return flags, events 1808 1809 def _receive_rst_stream_frame(self, frame): 1810 """ 1811 Receive a RST_STREAM frame on the connection. 1812 """ 1813 events = self.state_machine.process_input( 1814 ConnectionInputs.RECV_RST_STREAM 1815 ) 1816 try: 1817 stream = self._get_stream_by_id(frame.stream_id) 1818 except NoSuchStreamError: 1819 # The stream is missing. That's ok, we just do nothing here. 1820 stream_frames = [] 1821 stream_events = [] 1822 else: 1823 stream_frames, stream_events = stream.stream_reset(frame) 1824 1825 return stream_frames, events + stream_events 1826 1827 def _receive_priority_frame(self, frame): 1828 """ 1829 Receive a PRIORITY frame on the connection. 1830 """ 1831 events = self.state_machine.process_input( 1832 ConnectionInputs.RECV_PRIORITY 1833 ) 1834 1835 event = PriorityUpdated() 1836 event.stream_id = frame.stream_id 1837 event.depends_on = frame.depends_on 1838 event.exclusive = frame.exclusive 1839 1840 # Weight is an integer between 1 and 256, but the byte only allows 1841 # 0 to 255: add one. 1842 event.weight = frame.stream_weight + 1 1843 1844 # A stream may not depend on itself. 1845 if event.depends_on == frame.stream_id: 1846 raise ProtocolError( 1847 "Stream %d may not depend on itself" % frame.stream_id 1848 ) 1849 events.append(event) 1850 1851 return [], events 1852 1853 def _receive_goaway_frame(self, frame): 1854 """ 1855 Receive a GOAWAY frame on the connection. 1856 """ 1857 events = self.state_machine.process_input( 1858 ConnectionInputs.RECV_GOAWAY 1859 ) 1860 1861 # Clear the outbound data buffer: we cannot send further data now. 1862 self.clear_outbound_data_buffer() 1863 1864 # Fire an appropriate ConnectionTerminated event. 1865 new_event = ConnectionTerminated() 1866 new_event.error_code = _error_code_from_int(frame.error_code) 1867 new_event.last_stream_id = frame.last_stream_id 1868 new_event.additional_data = (frame.additional_data 1869 if frame.additional_data else None) 1870 events.append(new_event) 1871 1872 return [], events 1873 1874 def _receive_naked_continuation(self, frame): 1875 """ 1876 A naked CONTINUATION frame has been received. This is always an error, 1877 but the type of error it is depends on the state of the stream and must 1878 transition the state of the stream, so we need to pass it to the 1879 appropriate stream. 1880 """ 1881 stream = self._get_stream_by_id(frame.stream_id) 1882 stream.receive_continuation() 1883 assert False, "Should not be reachable" 1884 1885 def _receive_alt_svc_frame(self, frame): 1886 """ 1887 An ALTSVC frame has been received. This frame, specified in RFC 7838, 1888 is used to advertise alternative places where the same service can be 1889 reached. 1890 1891 This frame can optionally be received either on a stream or on stream 1892 0, and its semantics are different in each case. 1893 """ 1894 events = self.state_machine.process_input( 1895 ConnectionInputs.RECV_ALTERNATIVE_SERVICE 1896 ) 1897 frames = [] 1898 1899 if frame.stream_id: 1900 # Given that it makes no sense to receive ALTSVC on a stream 1901 # before that stream has been opened with a HEADERS frame, the 1902 # ALTSVC frame cannot create a stream. If the stream is not 1903 # present, we simply ignore the frame. 1904 try: 1905 stream = self._get_stream_by_id(frame.stream_id) 1906 except (NoSuchStreamError, StreamClosedError): 1907 pass 1908 else: 1909 stream_frames, stream_events = stream.receive_alt_svc(frame) 1910 frames.extend(stream_frames) 1911 events.extend(stream_events) 1912 else: 1913 # This frame is sent on stream 0. The origin field on the frame 1914 # must be present, though if it isn't it's not a ProtocolError 1915 # (annoyingly), we just need to ignore it. 1916 if not frame.origin: 1917 return frames, events 1918 1919 # If we're a server, we want to ignore this (RFC 7838 says so). 1920 if not self.config.client_side: 1921 return frames, events 1922 1923 event = AlternativeServiceAvailable() 1924 event.origin = frame.origin 1925 event.field_value = frame.field 1926 events.append(event) 1927 1928 return frames, events 1929 1930 def _receive_unknown_frame(self, frame): 1931 """ 1932 We have received a frame that we do not understand. This is almost 1933 certainly an extension frame, though it's impossible to be entirely 1934 sure. 1935 1936 RFC 7540 § 5.5 says that we MUST ignore unknown frame types: so we 1937 do. 1938 """ 1939 # All we do here is log. 1940 self.config.logger.debug( 1941 "Received unknown extension frame (ID %d)", frame.stream_id 1942 ) 1943 return [], [] 1944 1945 def _local_settings_acked(self): 1946 """ 1947 Handle the local settings being ACKed, update internal state. 1948 """ 1949 changes = self.local_settings.acknowledge() 1950 1951 if SettingCodes.INITIAL_WINDOW_SIZE in changes: 1952 setting = changes[SettingCodes.INITIAL_WINDOW_SIZE] 1953 self._inbound_flow_control_change_from_settings( 1954 setting.original_value, 1955 setting.new_value, 1956 ) 1957 1958 if SettingCodes.MAX_HEADER_LIST_SIZE in changes: 1959 setting = changes[SettingCodes.MAX_HEADER_LIST_SIZE] 1960 self.decoder.max_header_list_size = setting.new_value 1961 1962 if SettingCodes.MAX_FRAME_SIZE in changes: 1963 setting = changes[SettingCodes.MAX_FRAME_SIZE] 1964 self.max_inbound_frame_size = setting.new_value 1965 1966 if SettingCodes.HEADER_TABLE_SIZE in changes: 1967 setting = changes[SettingCodes.HEADER_TABLE_SIZE] 1968 # This is safe across all hpack versions: some versions just won't 1969 # respect it. 1970 self.decoder.max_allowed_table_size = setting.new_value 1971 1972 return changes 1973 1974 def _stream_id_is_outbound(self, stream_id): 1975 """ 1976 Returns ``True`` if the stream ID corresponds to an outbound stream 1977 (one initiated by this peer), returns ``False`` otherwise. 1978 """ 1979 return (stream_id % 2 == int(self.config.client_side)) 1980 1981 def _stream_closed_by(self, stream_id): 1982 """ 1983 Returns how the stream was closed. 1984 1985 The return value will be either a member of 1986 ``h2.stream.StreamClosedBy`` or ``None``. If ``None``, the stream was 1987 closed implicitly by the peer opening a stream with a higher stream ID 1988 before opening this one. 1989 """ 1990 if stream_id in self.streams: 1991 return self.streams[stream_id].closed_by 1992 if stream_id in self._closed_streams: 1993 return self._closed_streams[stream_id] 1994 return None 1995 1996 def _stream_is_closed_by_reset(self, stream_id): 1997 """ 1998 Returns ``True`` if the stream was closed by sending or receiving a 1999 RST_STREAM frame. Returns ``False`` otherwise. 2000 """ 2001 return self._stream_closed_by(stream_id) in ( 2002 StreamClosedBy.RECV_RST_STREAM, StreamClosedBy.SEND_RST_STREAM 2003 ) 2004 2005 def _stream_is_closed_by_end(self, stream_id): 2006 """ 2007 Returns ``True`` if the stream was closed by sending or receiving an 2008 END_STREAM flag in a HEADERS or DATA frame. Returns ``False`` 2009 otherwise. 2010 """ 2011 return self._stream_closed_by(stream_id) in ( 2012 StreamClosedBy.RECV_END_STREAM, StreamClosedBy.SEND_END_STREAM 2013 ) 2014 2015 2016def _add_frame_priority(frame, weight=None, depends_on=None, exclusive=None): 2017 """ 2018 Adds priority data to a given frame. Does not change any flags set on that 2019 frame: if the caller is adding priority information to a HEADERS frame they 2020 must set that themselves. 2021 2022 This method also deliberately sets defaults for anything missing. 2023 2024 This method validates the input values. 2025 """ 2026 # A stream may not depend on itself. 2027 if depends_on == frame.stream_id: 2028 raise ProtocolError( 2029 "Stream %d may not depend on itself" % frame.stream_id 2030 ) 2031 2032 # Weight must be between 1 and 256. 2033 if weight is not None: 2034 if weight > 256 or weight < 1: 2035 raise ProtocolError( 2036 "Weight must be between 1 and 256, not %d" % weight 2037 ) 2038 else: 2039 # Weight is an integer between 1 and 256, but the byte only allows 2040 # 0 to 255: subtract one. 2041 weight -= 1 2042 2043 # Set defaults for anything not provided. 2044 weight = weight if weight is not None else 15 2045 depends_on = depends_on if depends_on is not None else 0 2046 exclusive = exclusive if exclusive is not None else False 2047 2048 frame.stream_weight = weight 2049 frame.depends_on = depends_on 2050 frame.exclusive = exclusive 2051 2052 return frame 2053 2054 2055def _decode_headers(decoder, encoded_header_block): 2056 """ 2057 Decode a HPACK-encoded header block, translating HPACK exceptions into 2058 sensible hyper-h2 errors. 2059 2060 This only ever returns bytestring headers: hyper-h2 may emit them as 2061 unicode later, but internally it processes them as bytestrings only. 2062 """ 2063 try: 2064 return decoder.decode(encoded_header_block, raw=True) 2065 except OversizedHeaderListError as e: 2066 # This is a symptom of a HPACK bomb attack: the user has 2067 # disregarded our requirements on how large a header block we'll 2068 # accept. 2069 raise DenialOfServiceError("Oversized header block: %s" % e) 2070 except (HPACKError, IndexError, TypeError, UnicodeDecodeError) as e: 2071 # We should only need HPACKError here, but versions of HPACK older 2072 # than 2.1.0 throw all three others as well. For maximum 2073 # compatibility, catch all of them. 2074 raise ProtocolError("Error decoding header block: %s" % e) 2075