1# -*- coding: utf-8 -*-
2"""
3h2/connection
4~~~~~~~~~~~~~
5
6An implementation of a HTTP/2 connection.
7"""
8import base64
9
10from enum import Enum, IntEnum
11
12from hyperframe.exceptions import InvalidPaddingError
13from hyperframe.frame import (
14    GoAwayFrame, WindowUpdateFrame, HeadersFrame, DataFrame, PingFrame,
15    PushPromiseFrame, SettingsFrame, RstStreamFrame, PriorityFrame,
16    ContinuationFrame, AltSvcFrame
17)
18from hpack.hpack import Encoder, Decoder
19from hpack.exceptions import HPACKError
20
21from .config import H2Configuration
22from .errors import ErrorCodes, _error_code_from_int
23from .events import (
24    WindowUpdated, RemoteSettingsChanged, PingAcknowledged,
25    SettingsAcknowledged, ConnectionTerminated, PriorityUpdated,
26    AlternativeServiceAvailable,
27)
28from .exceptions import (
29    ProtocolError, NoSuchStreamError, FlowControlError, FrameTooLargeError,
30    TooManyStreamsError, StreamClosedError, StreamIDTooLowError,
31    NoAvailableStreamIDError, RFC1122Error, DenialOfServiceError
32)
33from .frame_buffer import FrameBuffer
34from .settings import Settings, SettingCodes
35from .stream import H2Stream, StreamClosedBy
36from .utilities import guard_increment_window
37from .windows import WindowManager
38
39try:
40    from hpack.exceptions import OversizedHeaderListError
41except ImportError:  # Platform-specific: HPACK < 2.3.0
42    # If the exception doesn't exist, it cannot possibly be thrown. Define a
43    # placeholder name, but don't otherwise worry about it.
44    class OversizedHeaderListError(Exception):
45        pass
46
47
48try:
49    from hyperframe.frame import ExtensionFrame
50except ImportError:  # Platform-specific: Hyperframe < 5.0.0
51    # If the frame doesn't exist, that's just fine: we'll define it ourselves
52    # and the method will just never be called.
53    class ExtensionFrame(object):
54        pass
55
56
57class ConnectionState(Enum):
58    IDLE = 0
59    CLIENT_OPEN = 1
60    SERVER_OPEN = 2
61    CLOSED = 3
62
63
64class ConnectionInputs(Enum):
65    SEND_HEADERS = 0
66    SEND_PUSH_PROMISE = 1
67    SEND_DATA = 2
68    SEND_GOAWAY = 3
69    SEND_WINDOW_UPDATE = 4
70    SEND_PING = 5
71    SEND_SETTINGS = 6
72    SEND_RST_STREAM = 7
73    SEND_PRIORITY = 8
74    RECV_HEADERS = 9
75    RECV_PUSH_PROMISE = 10
76    RECV_DATA = 11
77    RECV_GOAWAY = 12
78    RECV_WINDOW_UPDATE = 13
79    RECV_PING = 14
80    RECV_SETTINGS = 15
81    RECV_RST_STREAM = 16
82    RECV_PRIORITY = 17
83    SEND_ALTERNATIVE_SERVICE = 18  # Added in 2.3.0
84    RECV_ALTERNATIVE_SERVICE = 19  # Added in 2.3.0
85
86
87class AllowedStreamIDs(IntEnum):
88    EVEN = 0
89    ODD = 1
90
91
92class H2ConnectionStateMachine(object):
93    """
94    A single HTTP/2 connection state machine.
95
96    This state machine, while defined in its own class, is logically part of
97    the H2Connection class also defined in this file. The state machine itself
98    maintains very little state directly, instead focusing entirely on managing
99    state transitions.
100    """
101    # For the purposes of this state machine we treat HEADERS and their
102    # associated CONTINUATION frames as a single jumbo frame. The protocol
103    # allows/requires this by preventing other frames from being interleved in
104    # between HEADERS/CONTINUATION frames.
105    #
106    # The _transitions dictionary contains a mapping of tuples of
107    # (state, input) to tuples of (side_effect_function, end_state). This map
108    # contains all allowed transitions: anything not in this map is invalid
109    # and immediately causes a transition to ``closed``.
110
111    _transitions = {
112        # State: idle
113        (ConnectionState.IDLE, ConnectionInputs.SEND_HEADERS):
114            (None, ConnectionState.CLIENT_OPEN),
115        (ConnectionState.IDLE, ConnectionInputs.RECV_HEADERS):
116            (None, ConnectionState.SERVER_OPEN),
117        (ConnectionState.IDLE, ConnectionInputs.SEND_SETTINGS):
118            (None, ConnectionState.IDLE),
119        (ConnectionState.IDLE, ConnectionInputs.RECV_SETTINGS):
120            (None, ConnectionState.IDLE),
121        (ConnectionState.IDLE, ConnectionInputs.SEND_WINDOW_UPDATE):
122            (None, ConnectionState.IDLE),
123        (ConnectionState.IDLE, ConnectionInputs.RECV_WINDOW_UPDATE):
124            (None, ConnectionState.IDLE),
125        (ConnectionState.IDLE, ConnectionInputs.SEND_PING):
126            (None, ConnectionState.IDLE),
127        (ConnectionState.IDLE, ConnectionInputs.RECV_PING):
128            (None, ConnectionState.IDLE),
129        (ConnectionState.IDLE, ConnectionInputs.SEND_GOAWAY):
130            (None, ConnectionState.CLOSED),
131        (ConnectionState.IDLE, ConnectionInputs.RECV_GOAWAY):
132            (None, ConnectionState.CLOSED),
133        (ConnectionState.IDLE, ConnectionInputs.SEND_PRIORITY):
134            (None, ConnectionState.IDLE),
135        (ConnectionState.IDLE, ConnectionInputs.RECV_PRIORITY):
136            (None, ConnectionState.IDLE),
137        (ConnectionState.IDLE, ConnectionInputs.SEND_ALTERNATIVE_SERVICE):
138            (None, ConnectionState.SERVER_OPEN),
139        (ConnectionState.IDLE, ConnectionInputs.RECV_ALTERNATIVE_SERVICE):
140            (None, ConnectionState.CLIENT_OPEN),
141
142        # State: open, client side.
143        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_HEADERS):
144            (None, ConnectionState.CLIENT_OPEN),
145        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_DATA):
146            (None, ConnectionState.CLIENT_OPEN),
147        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_GOAWAY):
148            (None, ConnectionState.CLOSED),
149        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_WINDOW_UPDATE):
150            (None, ConnectionState.CLIENT_OPEN),
151        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_PING):
152            (None, ConnectionState.CLIENT_OPEN),
153        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_SETTINGS):
154            (None, ConnectionState.CLIENT_OPEN),
155        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_PRIORITY):
156            (None, ConnectionState.CLIENT_OPEN),
157        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_HEADERS):
158            (None, ConnectionState.CLIENT_OPEN),
159        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PUSH_PROMISE):
160            (None, ConnectionState.CLIENT_OPEN),
161        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_DATA):
162            (None, ConnectionState.CLIENT_OPEN),
163        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_GOAWAY):
164            (None, ConnectionState.CLOSED),
165        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_WINDOW_UPDATE):
166            (None, ConnectionState.CLIENT_OPEN),
167        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PING):
168            (None, ConnectionState.CLIENT_OPEN),
169        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_SETTINGS):
170            (None, ConnectionState.CLIENT_OPEN),
171        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_RST_STREAM):
172            (None, ConnectionState.CLIENT_OPEN),
173        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_RST_STREAM):
174            (None, ConnectionState.CLIENT_OPEN),
175        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PRIORITY):
176            (None, ConnectionState.CLIENT_OPEN),
177        (ConnectionState.CLIENT_OPEN,
178            ConnectionInputs.RECV_ALTERNATIVE_SERVICE):
179                (None, ConnectionState.CLIENT_OPEN),
180
181        # State: open, server side.
182        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_HEADERS):
183            (None, ConnectionState.SERVER_OPEN),
184        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PUSH_PROMISE):
185            (None, ConnectionState.SERVER_OPEN),
186        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_DATA):
187            (None, ConnectionState.SERVER_OPEN),
188        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_GOAWAY):
189            (None, ConnectionState.CLOSED),
190        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_WINDOW_UPDATE):
191            (None, ConnectionState.SERVER_OPEN),
192        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PING):
193            (None, ConnectionState.SERVER_OPEN),
194        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_SETTINGS):
195            (None, ConnectionState.SERVER_OPEN),
196        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PRIORITY):
197            (None, ConnectionState.SERVER_OPEN),
198        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_HEADERS):
199            (None, ConnectionState.SERVER_OPEN),
200        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_DATA):
201            (None, ConnectionState.SERVER_OPEN),
202        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_GOAWAY):
203            (None, ConnectionState.CLOSED),
204        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_WINDOW_UPDATE):
205            (None, ConnectionState.SERVER_OPEN),
206        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_PING):
207            (None, ConnectionState.SERVER_OPEN),
208        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_SETTINGS):
209            (None, ConnectionState.SERVER_OPEN),
210        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_PRIORITY):
211            (None, ConnectionState.SERVER_OPEN),
212        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_RST_STREAM):
213            (None, ConnectionState.SERVER_OPEN),
214        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_RST_STREAM):
215            (None, ConnectionState.SERVER_OPEN),
216        (ConnectionState.SERVER_OPEN,
217            ConnectionInputs.SEND_ALTERNATIVE_SERVICE):
218                (None, ConnectionState.SERVER_OPEN),
219        (ConnectionState.SERVER_OPEN,
220            ConnectionInputs.RECV_ALTERNATIVE_SERVICE):
221                (None, ConnectionState.SERVER_OPEN),
222
223        # State: closed
224        (ConnectionState.CLOSED, ConnectionInputs.SEND_GOAWAY):
225            (None, ConnectionState.CLOSED),
226        (ConnectionState.CLOSED, ConnectionInputs.RECV_GOAWAY):
227            (None, ConnectionState.CLOSED),
228    }
229
230    def __init__(self):
231        self.state = ConnectionState.IDLE
232
233    def process_input(self, input_):
234        """
235        Process a specific input in the state machine.
236        """
237        if not isinstance(input_, ConnectionInputs):
238            raise ValueError("Input must be an instance of ConnectionInputs")
239
240        try:
241            func, target_state = self._transitions[(self.state, input_)]
242        except KeyError:
243            old_state = self.state
244            self.state = ConnectionState.CLOSED
245            raise ProtocolError(
246                "Invalid input %s in state %s" % (input_, old_state)
247            )
248        else:
249            self.state = target_state
250            if func is not None:  # pragma: no cover
251                return func()
252
253            return []
254
255
256class H2Connection(object):
257    """
258    A low-level HTTP/2 connection object. This handles building and receiving
259    frames and maintains both connection and per-stream state for all streams
260    on this connection.
261
262    This wraps a HTTP/2 Connection state machine implementation, ensuring that
263    frames can only be sent/received when the connection is in a valid state.
264    It also builds stream state machines on demand to ensure that the
265    constraints of those state machines are met as well. Attempts to create
266    frames that cannot be sent will raise a ``ProtocolError``.
267
268    .. versionchanged:: 2.3.0
269       Added the ``header_encoding`` keyword argument.
270
271    .. versionchanged:: 2.5.0
272       Added the ``config`` keyword argument. Deprecated the ``client_side``
273       and ``header_encoding`` parameters.
274
275    :param client_side: Whether this object is to be used on the client side of
276        a connection, or on the server side. Affects the logic used by the
277        state machine, the default settings values, the allowable stream IDs,
278        and several other properties. Defaults to ``True``.
279
280        .. deprecated:: 2.5.0
281
282    :type client_side: ``bool``
283
284    :param header_encoding: Controls whether the headers emitted by this object
285        in events are transparently decoded to ``unicode`` strings, and what
286        encoding is used to do that decoding. For historical reason, this
287        defaults to ``'utf-8'``. To prevent the decoding of headers (that is,
288        to force them to be returned as bytestrings), this can be set to
289        ``False`` or the empty string.
290
291        .. deprecated:: 2.5.0
292
293    :type header_encoding: ``str`` or ``False``
294
295    :param config: The configuration for the HTTP/2 connection. If provided,
296        supersedes the deprecated ``client_side`` and ``header_encoding``
297        values.
298
299        .. versionadded:: 2.5.0
300
301    :type config: :class:`H2Configuration <h2.config.H2Configuration>`
302    """
303    # The initial maximum outbound frame size. This can be changed by receiving
304    # a settings frame.
305    DEFAULT_MAX_OUTBOUND_FRAME_SIZE = 65535
306
307    # The initial maximum inbound frame size. This is somewhat arbitrarily
308    # chosen.
309    DEFAULT_MAX_INBOUND_FRAME_SIZE = 2**24
310
311    # The highest acceptable stream ID.
312    HIGHEST_ALLOWED_STREAM_ID = 2**31 - 1
313
314    # The largest acceptable window increment.
315    MAX_WINDOW_INCREMENT = 2**31 - 1
316
317    # The initial default value of SETTINGS_MAX_HEADER_LIST_SIZE.
318    DEFAULT_MAX_HEADER_LIST_SIZE = 2**16
319
320    def __init__(self, client_side=True, header_encoding='utf-8', config=None):
321        self.state_machine = H2ConnectionStateMachine()
322        self.streams = {}
323        self.highest_inbound_stream_id = 0
324        self.highest_outbound_stream_id = 0
325        self.encoder = Encoder()
326        self.decoder = Decoder()
327
328        # This won't always actually do anything: for versions of HPACK older
329        # than 2.3.0 it does nothing. However, we have to try!
330        self.decoder.max_header_list_size = self.DEFAULT_MAX_HEADER_LIST_SIZE
331
332        #: The configuration for this HTTP/2 connection object.
333        #:
334        #: .. versionadded:: 2.5.0
335        self.config = config
336        if self.config is None:
337            self.config = H2Configuration(
338                client_side=client_side,
339                header_encoding=header_encoding,
340            )
341
342        # Objects that store settings, including defaults.
343        #
344        # We set the MAX_CONCURRENT_STREAMS value to 100 because its default is
345        # unbounded, and that's a dangerous default because it allows
346        # essentially unbounded resources to be allocated regardless of how
347        # they will be used. 100 should be suitable for the average
348        # application. This default obviously does not apply to the remote
349        # peer's settings: the remote peer controls them!
350        #
351        # We also set MAX_HEADER_LIST_SIZE to a reasonable value. This is to
352        # advertise our defence against CVE-2016-6581. However, not all
353        # versions of HPACK will let us do it. That's ok: we should at least
354        # suggest that we're not vulnerable.
355        self.local_settings = Settings(
356            client=self.config.client_side,
357            initial_values={
358                SettingCodes.MAX_CONCURRENT_STREAMS: 100,
359                SettingCodes.MAX_HEADER_LIST_SIZE:
360                    self.DEFAULT_MAX_HEADER_LIST_SIZE,
361            }
362        )
363        self.remote_settings = Settings(client=not self.config.client_side)
364
365        # The curent value of the connection flow control windows on the
366        # connection.
367        self.outbound_flow_control_window = (
368            self.remote_settings.initial_window_size
369        )
370
371        #: The maximum size of a frame that can be emitted by this peer, in
372        #: bytes.
373        self.max_outbound_frame_size = self.remote_settings.max_frame_size
374
375        #: The maximum size of a frame that can be received by this peer, in
376        #: bytes.
377        self.max_inbound_frame_size = self.local_settings.max_frame_size
378
379        # Buffer for incoming data.
380        self.incoming_buffer = FrameBuffer(server=not self.config.client_side)
381
382        # A private variable to store a sequence of received header frames
383        # until completion.
384        self._header_frames = []
385
386        # Data that needs to be sent.
387        self._data_to_send = b''
388
389        # Keeps track of how streams are closed.
390        # Used to ensure that we don't blow up in the face of frames that were
391        # in flight when a RST_STREAM was sent.
392        # Also used to determine whether we should consider a frame received
393        # while a stream is closed as either a stream error or a connection
394        # error.
395        self._closed_streams = {}
396
397        # The flow control window manager for the connection.
398        self._inbound_flow_control_window_manager = WindowManager(
399            max_window_size=self.local_settings.initial_window_size
400        )
401
402        # When in doubt use dict-dispatch.
403        self._frame_dispatch_table = {
404            HeadersFrame: self._receive_headers_frame,
405            PushPromiseFrame: self._receive_push_promise_frame,
406            SettingsFrame: self._receive_settings_frame,
407            DataFrame: self._receive_data_frame,
408            WindowUpdateFrame: self._receive_window_update_frame,
409            PingFrame: self._receive_ping_frame,
410            RstStreamFrame: self._receive_rst_stream_frame,
411            PriorityFrame: self._receive_priority_frame,
412            GoAwayFrame: self._receive_goaway_frame,
413            ContinuationFrame: self._receive_naked_continuation,
414            AltSvcFrame: self._receive_alt_svc_frame,
415            ExtensionFrame: self._receive_unknown_frame
416        }
417
418    def _prepare_for_sending(self, frames):
419        if not frames:
420            return
421        self._data_to_send += b''.join(f.serialize() for f in frames)
422        assert all(f.body_len <= self.max_outbound_frame_size for f in frames)
423
424    def _open_streams(self, remainder):
425        """
426        A common method of counting number of open streams. Returns the number
427        of streams that are open *and* that have (stream ID % 2) == remainder.
428        While it iterates, also deletes any closed streams.
429        """
430        count = 0
431        to_delete = []
432
433        for stream_id, stream in self.streams.items():
434            if stream.open and (stream_id % 2 == remainder):
435                count += 1
436            elif stream.closed:
437                to_delete.append(stream_id)
438
439        for stream_id in to_delete:
440            stream = self.streams.pop(stream_id)
441            self._closed_streams[stream_id] = stream.closed_by
442
443        return count
444
445    @property
446    def open_outbound_streams(self):
447        """
448        The current number of open outbound streams.
449        """
450        outbound_numbers = int(self.config.client_side)
451        return self._open_streams(outbound_numbers)
452
453    @property
454    def open_inbound_streams(self):
455        """
456        The current number of open inbound streams.
457        """
458        inbound_numbers = int(not self.config.client_side)
459        return self._open_streams(inbound_numbers)
460
461    @property
462    def header_encoding(self):
463        """
464        Controls whether the headers emitted by this object in events are
465        transparently decoded to ``unicode`` strings, and what encoding is used
466        to do that decoding. For historical reason, this defaults to
467        ``'utf-8'``. To prevent the decoding of headers (that is, to force them
468        to be returned as bytestrings), this can be set to ``False`` or the
469        empty string.
470
471        .. versionadded:: 2.3.0
472
473        .. deprecated:: 2.5.0
474           Use :data:`config <h2.connection.H2Connection.config>` instead.
475        """
476        return self.config.header_encoding
477
478    @header_encoding.setter
479    def header_encoding(self, value):
480        """
481        Setter for header encoding config value.
482        """
483        self.config.header_encoding = value
484
485    @property
486    def client_side(self):
487        """
488        Whether this object is to be used on the client side of a connection,
489        or on the server side. Affects the logic used by the state machine, the
490        default settings values, the allowable stream IDs, and several other
491        properties. Defaults to ``True``.
492
493        .. deprecated:: 2.5.0
494           Use :data:`config <h2.connection.H2Connection.config>` instead.
495        """
496        return self.config.client_side
497
498    @property
499    def inbound_flow_control_window(self):
500        """
501        The size of the inbound flow control window for the connection. This is
502        rarely publicly useful: instead, use :meth:`remote_flow_control_window
503        <h2.connection.H2Connection.remote_flow_control_window>`. This
504        shortcut is largely present to provide a shortcut to this data.
505        """
506        return self._inbound_flow_control_window_manager.current_window_size
507
508    def _begin_new_stream(self, stream_id, allowed_ids):
509        """
510        Initiate a new stream.
511
512        .. versionchanged:: 2.0.0
513           Removed this function from the public API.
514
515        :param stream_id: The ID of the stream to open.
516        :param allowed_ids: What kind of stream ID is allowed.
517        """
518        self.config.logger.debug(
519            "Attempting to initiate stream ID %d", stream_id
520        )
521        outbound = self._stream_id_is_outbound(stream_id)
522        highest_stream_id = (
523            self.highest_outbound_stream_id if outbound else
524            self.highest_inbound_stream_id
525        )
526
527        if stream_id <= highest_stream_id:
528            raise StreamIDTooLowError(stream_id, highest_stream_id)
529
530        if (stream_id % 2) != int(allowed_ids):
531            raise ProtocolError(
532                "Invalid stream ID for peer."
533            )
534
535        s = H2Stream(
536            stream_id,
537            config=self.config,
538            inbound_window_size=self.local_settings.initial_window_size,
539            outbound_window_size=self.remote_settings.initial_window_size
540        )
541        self.config.logger.debug("Stream ID %d created", stream_id)
542        s.max_inbound_frame_size = self.max_inbound_frame_size
543        s.max_outbound_frame_size = self.max_outbound_frame_size
544
545        self.streams[stream_id] = s
546        self.config.logger.debug("Current streams: %s", self.streams.keys())
547
548        if outbound:
549            self.highest_outbound_stream_id = stream_id
550        else:
551            self.highest_inbound_stream_id = stream_id
552
553        return s
554
555    def initiate_connection(self):
556        """
557        Provides any data that needs to be sent at the start of the connection.
558        Must be called for both clients and servers.
559        """
560        self.config.logger.debug("Initializing connection")
561        self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS)
562        if self.config.client_side:
563            preamble = b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'
564        else:
565            preamble = b''
566
567        f = SettingsFrame(0)
568        for setting, value in self.local_settings.items():
569            f.settings[setting] = value
570        self.config.logger.debug(
571            "Send Settings frame: %s", self.local_settings
572        )
573
574        self._data_to_send += preamble + f.serialize()
575
576    def initiate_upgrade_connection(self, settings_header=None):
577        """
578        Call to initialise the connection object for use with an upgraded
579        HTTP/2 connection (i.e. a connection negotiated using the
580        ``Upgrade: h2c`` HTTP header).
581
582        This method differs from :meth:`initiate_connection
583        <h2.connection.H2Connection.initiate_connection>` in several ways.
584        Firstly, it handles the additional SETTINGS frame that is sent in the
585        ``HTTP2-Settings`` header field. When called on a client connection,
586        this method will return a bytestring that the caller can put in the
587        ``HTTP2-Settings`` field they send on their initial request. When
588        called on a server connection, the user **must** provide the value they
589        received from the client in the ``HTTP2-Settings`` header field to the
590        ``settings_header`` argument, which will be used appropriately.
591
592        Additionally, this method sets up stream 1 in a half-closed state
593        appropriate for this side of the connection, to reflect the fact that
594        the request is already complete.
595
596        Finally, this method also prepares the appropriate preamble to be sent
597        after the upgrade.
598
599        .. versionadded:: 2.3.0
600
601        :param settings_header: (optional, server-only): The value of the
602             ``HTTP2-Settings`` header field received from the client.
603        :type settings_header: ``bytes``
604
605        :returns: For clients, a bytestring to put in the ``HTTP2-Settings``.
606            For servers, returns nothing.
607        :rtype: ``bytes`` or ``None``
608        """
609        self.config.logger.debug(
610            "Upgrade connection. Current settings: %s", self.local_settings
611        )
612
613        frame_data = None
614        # Begin by getting the preamble in place.
615        self.initiate_connection()
616
617        if self.config.client_side:
618            f = SettingsFrame(0)
619            for setting, value in self.local_settings.items():
620                f.settings[setting] = value
621
622            frame_data = f.serialize_body()
623            frame_data = base64.urlsafe_b64encode(frame_data)
624        elif settings_header:
625            # We have a settings header from the client. This needs to be
626            # applied, but we want to throw away the ACK. We do this by
627            # inserting the data into a Settings frame and then passing it to
628            # the state machine, but ignoring the return value.
629            settings_header = base64.urlsafe_b64decode(settings_header)
630            f = SettingsFrame(0)
631            f.parse_body(settings_header)
632            self._receive_settings_frame(f)
633
634        # Set up appropriate state. Stream 1 in a half-closed state:
635        # half-closed(local) for clients, half-closed(remote) for servers.
636        # Additionally, we need to set up the Connection state machine.
637        connection_input = (
638            ConnectionInputs.SEND_HEADERS if self.config.client_side
639            else ConnectionInputs.RECV_HEADERS
640        )
641        self.config.logger.debug("Process input %s", connection_input)
642        self.state_machine.process_input(connection_input)
643
644        # Set up stream 1.
645        self._begin_new_stream(stream_id=1, allowed_ids=AllowedStreamIDs.ODD)
646        self.streams[1].upgrade(self.config.client_side)
647        return frame_data
648
649    def _get_or_create_stream(self, stream_id, allowed_ids):
650        """
651        Gets a stream by its stream ID. Will create one if one does not already
652        exist. Use allowed_ids to circumvent the usual stream ID rules for
653        clients and servers.
654
655        .. versionchanged:: 2.0.0
656           Removed this function from the public API.
657        """
658        try:
659            return self.streams[stream_id]
660        except KeyError:
661            return self._begin_new_stream(stream_id, allowed_ids)
662
663    def _get_stream_by_id(self, stream_id):
664        """
665        Gets a stream by its stream ID. Raises NoSuchStreamError if the stream
666        ID does not correspond to a known stream and is higher than the current
667        maximum: raises if it is lower than the current maximum.
668
669        .. versionchanged:: 2.0.0
670           Removed this function from the public API.
671        """
672        try:
673            return self.streams[stream_id]
674        except KeyError:
675            outbound = self._stream_id_is_outbound(stream_id)
676            highest_stream_id = (
677                self.highest_outbound_stream_id if outbound else
678                self.highest_inbound_stream_id
679            )
680
681            if stream_id > highest_stream_id:
682                raise NoSuchStreamError(stream_id)
683            else:
684                raise StreamClosedError(stream_id)
685
686    def get_next_available_stream_id(self):
687        """
688        Returns an integer suitable for use as the stream ID for the next
689        stream created by this endpoint. For server endpoints, this stream ID
690        will be even. For client endpoints, this stream ID will be odd. If no
691        stream IDs are available, raises :class:`NoAvailableStreamIDError
692        <h2.exceptions.NoAvailableStreamIDError>`.
693
694        .. warning:: The return value from this function does not change until
695                     the stream ID has actually been used by sending or pushing
696                     headers on that stream. For that reason, it should be
697                     called as close as possible to the actual use of the
698                     stream ID.
699
700        .. versionadded:: 2.0.0
701
702        :raises: :class:`NoAvailableStreamIDError
703            <h2.exceptions.NoAvailableStreamIDError>`
704        :returns: The next free stream ID this peer can use to initiate a
705            stream.
706        :rtype: ``int``
707        """
708        # No streams have been opened yet, so return the lowest allowed stream
709        # ID.
710        if not self.highest_outbound_stream_id:
711            next_stream_id = 1 if self.config.client_side else 2
712        else:
713            next_stream_id = self.highest_outbound_stream_id + 2
714        self.config.logger.debug(
715            "Next available stream ID %d", next_stream_id
716        )
717        if next_stream_id > self.HIGHEST_ALLOWED_STREAM_ID:
718            raise NoAvailableStreamIDError("Exhausted allowed stream IDs")
719
720        return next_stream_id
721
722    def send_headers(self, stream_id, headers, end_stream=False,
723                     priority_weight=None, priority_depends_on=None,
724                     priority_exclusive=None):
725        """
726        Send headers on a given stream.
727
728        This function can be used to send request or response headers: the kind
729        that are sent depends on whether this connection has been opened as a
730        client or server connection, and whether the stream was opened by the
731        remote peer or not.
732
733        If this is a client connection, calling ``send_headers`` will send the
734        headers as a request. It will also implicitly open the stream being
735        used. If this is a client connection and ``send_headers`` has *already*
736        been called, this will send trailers instead.
737
738        If this is a server connection, calling ``send_headers`` will send the
739        headers as a response. It is a protocol error for a server to open a
740        stream by sending headers. If this is a server connection and
741        ``send_headers`` has *already* been called, this will send trailers
742        instead.
743
744        When acting as a server, you may call ``send_headers`` any number of
745        times allowed by the following rules, in this order:
746
747        - zero or more times with ``(':status', '1XX')`` (where ``1XX`` is a
748          placeholder for any 100-level status code).
749        - once with any other status header.
750        - zero or one time for trailers.
751
752        That is, you are allowed to send as many informational responses as you
753        like, followed by one complete response and zero or one HTTP trailer
754        blocks.
755
756        Clients may send one or two header blocks: one request block, and
757        optionally one trailer block.
758
759        If it is important to send HPACK "never indexed" header fields (as
760        defined in `RFC 7451 Section 7.1.3
761        <https://tools.ietf.org/html/rfc7541#section-7.1.3>`_), the user may
762        instead provide headers using the HPACK library's :class:`HeaderTuple
763        <hpack:hpack.HeaderTuple>` and :class:`NeverIndexedHeaderTuple
764        <hpack:hpack.NeverIndexedHeaderTuple>` objects.
765
766        This method also allows users to prioritize the stream immediately,
767        by sending priority information on the HEADERS frame directly. To do
768        this, any one of ``priority_weight``, ``priority_depends_on``, or
769        ``priority_exclusive`` must be set to a value that is not ``None``. For
770        more information on the priority fields, see :meth:`prioritize
771        <h2.connection.H2Connection.prioritize>`.
772
773        .. warning:: In HTTP/2, it is mandatory that all the HTTP/2 special
774            headers (that is, ones whose header keys begin with ``:``) appear
775            at the start of the header block, before any normal headers.
776            If you pass a dictionary to the ``headers`` parameter, it is
777            unlikely that they will iterate in that order, and your connection
778            may fail. For this reason, passing a ``dict`` to ``headers`` is
779            *deprecated*, and will be removed in 3.0.
780
781        .. versionchanged:: 2.3.0
782           Added support for using :class:`HeaderTuple
783           <hpack:hpack.HeaderTuple>` objects to store headers.
784
785        .. versionchanged:: 2.4.0
786           Added the ability to provide priority keyword arguments:
787           ``priority_weight``, ``priority_depends_on``, and
788           ``priority_exclusive``.
789
790        :param stream_id: The stream ID to send the headers on. If this stream
791            does not currently exist, it will be created.
792        :type stream_id: ``int``
793
794        :param headers: The request/response headers to send.
795        :type headers: An iterable of two tuples of bytestrings or
796            :class:`HeaderTuple <hpack:hpack.HeaderTuple>` objects.
797
798        :param end_stream: Whether this headers frame should end the stream
799            immediately (that is, whether no more data will be sent after this
800            frame). Defaults to ``False``.
801        :type end_stream: ``bool``
802
803        :param priority_weight: Sets the priority weight of the stream. See
804            :meth:`prioritize <h2.connection.H2Connection.prioritize>` for more
805            about how this field works. Defaults to ``None``, which means that
806            no priority information will be sent.
807        :type priority_weight: ``int`` or ``None``
808
809        :param priority_depends_on: Sets which stream this one depends on for
810            priority purposes. See :meth:`prioritize
811            <h2.connection.H2Connection.prioritize>` for more about how this
812            field works. Defaults to ``None``, which means that no priority
813            information will be sent.
814        :type priority_depends_on: ``int`` or ``None``
815
816        :param priority_exclusive: Sets whether this stream exclusively depends
817            on the stream given in ``priority_depends_on`` for priority
818            purposes. See :meth:`prioritize
819            <h2.connection.H2Connection.prioritize>` for more about how this
820            field workds. Defaults to ``None``, which means that no priority
821            information will be sent.
822        :type priority_depends_on: ``bool`` or ``None``
823
824        :returns: Nothing
825        """
826        self.config.logger.debug(
827            "Send headers on stream ID %d", stream_id
828        )
829
830        # Check we can open the stream.
831        if stream_id not in self.streams:
832            max_open_streams = self.remote_settings.max_concurrent_streams
833            if (self.open_outbound_streams + 1) > max_open_streams:
834                raise TooManyStreamsError(
835                    "Max outbound streams is %d, %d open" %
836                    (max_open_streams, self.open_outbound_streams)
837                )
838
839        self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
840        stream = self._get_or_create_stream(
841            stream_id, AllowedStreamIDs(self.config.client_side)
842        )
843        frames = stream.send_headers(
844            headers, self.encoder, end_stream
845        )
846
847        # We may need to send priority information.
848        priority_present = (
849            (priority_weight is not None) or
850            (priority_depends_on is not None) or
851            (priority_exclusive is not None)
852        )
853
854        if priority_present:
855            if not self.config.client_side:
856                raise RFC1122Error("Servers SHOULD NOT prioritize streams.")
857
858            headers_frame = frames[0]
859            headers_frame.flags.add('PRIORITY')
860            frames[0] = _add_frame_priority(
861                headers_frame,
862                priority_weight,
863                priority_depends_on,
864                priority_exclusive
865            )
866
867        self._prepare_for_sending(frames)
868
869    def send_data(self, stream_id, data, end_stream=False, pad_length=None):
870        """
871        Send data on a given stream.
872
873        This method does no breaking up of data: if the data is larger than the
874        value returned by :meth:`local_flow_control_window
875        <h2.connection.H2Connection.local_flow_control_window>` for this stream
876        then a :class:`FlowControlError <h2.exceptions.FlowControlError>` will
877        be raised. If the data is larger than :data:`max_outbound_frame_size
878        <h2.connection.H2Connection.max_outbound_frame_size>` then a
879        :class:`FrameTooLargeError <h2.exceptions.FrameTooLargeError>` will be
880        raised.
881
882        Hyper-h2 does this to avoid buffering the data internally. If the user
883        has more data to send than hyper-h2 will allow, consider breaking it up
884        and buffering it externally.
885
886        :param stream_id: The ID of the stream on which to send the data.
887        :type stream_id: ``int``
888        :param data: The data to send on the stream.
889        :type data: ``bytes``
890        :param end_stream: (optional) Whether this is the last data to be sent
891            on the stream. Defaults to ``False``.
892        :type end_stream: ``bool``
893        :param pad_length: (optional) Length of the padding to apply to the
894            data frame. Defaults to ``None`` for no use of padding. Note that
895            a value of ``0`` results in padding of length ``0``
896            (with the "padding" flag set on the frame).
897
898            .. versionadded:: 2.6.0
899
900        :type pad_length: ``int``
901        :returns: Nothing
902        """
903        self.config.logger.debug(
904            "Send data on stream ID %d with len %d", stream_id, len(data)
905        )
906        frame_size = len(data)
907        if pad_length is not None:
908            if not isinstance(pad_length, int):
909                raise TypeError("pad_length must be an int")
910            if pad_length < 0 or pad_length > 255:
911                raise ValueError("pad_length must be within range: [0, 255]")
912            # Account for padding bytes plus the 1-byte padding length field.
913            frame_size += pad_length + 1
914        self.config.logger.debug(
915            "Frame size on stream ID %d is %d", stream_id, frame_size
916        )
917
918        if frame_size > self.local_flow_control_window(stream_id):
919            raise FlowControlError(
920                "Cannot send %d bytes, flow control window is %d." %
921                (frame_size, self.local_flow_control_window(stream_id))
922            )
923        elif frame_size > self.max_outbound_frame_size:
924            raise FrameTooLargeError(
925                "Cannot send frame size %d, max frame size is %d" %
926                (frame_size, self.max_outbound_frame_size)
927            )
928
929        self.state_machine.process_input(ConnectionInputs.SEND_DATA)
930        frames = self.streams[stream_id].send_data(
931            data, end_stream, pad_length=pad_length
932        )
933
934        self._prepare_for_sending(frames)
935
936        self.outbound_flow_control_window -= frame_size
937        self.config.logger.debug(
938            "Outbound flow control window size is %d",
939            self.outbound_flow_control_window
940        )
941        assert self.outbound_flow_control_window >= 0
942
943    def end_stream(self, stream_id):
944        """
945        Cleanly end a given stream.
946
947        This method ends a stream by sending an empty DATA frame on that stream
948        with the ``END_STREAM`` flag set.
949
950        :param stream_id: The ID of the stream to end.
951        :type stream_id: ``int``
952        :returns: Nothing
953        """
954        self.config.logger.debug("End stream ID %d", stream_id)
955        self.state_machine.process_input(ConnectionInputs.SEND_DATA)
956        frames = self.streams[stream_id].end_stream()
957        self._prepare_for_sending(frames)
958
959    def increment_flow_control_window(self, increment, stream_id=None):
960        """
961        Increment a flow control window, optionally for a single stream. Allows
962        the remote peer to send more data.
963
964        .. versionchanged:: 2.0.0
965           Rejects attempts to increment the flow control window by out of
966           range values with a ``ValueError``.
967
968        :param increment: The amount to increment the flow control window by.
969        :type increment: ``int``
970        :param stream_id: (optional) The ID of the stream that should have its
971            flow control window opened. If not present or ``None``, the
972            connection flow control window will be opened instead.
973        :type stream_id: ``int`` or ``None``
974        :returns: Nothing
975        :raises: ``ValueError``
976        """
977        if not (1 <= increment <= self.MAX_WINDOW_INCREMENT):
978            raise ValueError(
979                "Flow control increment must be between 1 and %d" %
980                self.MAX_WINDOW_INCREMENT
981            )
982
983        self.state_machine.process_input(ConnectionInputs.SEND_WINDOW_UPDATE)
984
985        if stream_id is not None:
986            stream = self.streams[stream_id]
987            frames = stream.increase_flow_control_window(
988                increment
989            )
990        else:
991            self._inbound_flow_control_window_manager.window_opened(increment)
992            f = WindowUpdateFrame(0)
993            f.window_increment = increment
994            frames = [f]
995
996        self.config.logger.debug(
997            "Increase stream ID %d flow control window by %d",
998            stream_id, increment
999        )
1000        self._prepare_for_sending(frames)
1001
1002    def push_stream(self, stream_id, promised_stream_id, request_headers):
1003        """
1004        Push a response to the client by sending a PUSH_PROMISE frame.
1005
1006        If it is important to send HPACK "never indexed" header fields (as
1007        defined in `RFC 7451 Section 7.1.3
1008        <https://tools.ietf.org/html/rfc7541#section-7.1.3>`_), the user may
1009        instead provide headers using the HPACK library's :class:`HeaderTuple
1010        <hpack:hpack.HeaderTuple>` and :class:`NeverIndexedHeaderTuple
1011        <hpack:hpack.NeverIndexedHeaderTuple>` objects.
1012
1013        :param stream_id: The ID of the stream that this push is a response to.
1014        :type stream_id: ``int``
1015        :param promised_stream_id: The ID of the stream that the pushed
1016            response will be sent on.
1017        :type promised_stream_id: ``int``
1018        :param request_headers: The headers of the request that the pushed
1019            response will be responding to.
1020        :type request_headers: An iterable of two tuples of bytestrings or
1021            :class:`HeaderTuple <hpack:hpack.HeaderTuple>` objects.
1022        :returns: Nothing
1023        """
1024        self.config.logger.debug(
1025            "Send Push Promise frame on stream ID %d", stream_id
1026        )
1027
1028        if not self.remote_settings.enable_push:
1029            raise ProtocolError("Remote peer has disabled stream push")
1030
1031        self.state_machine.process_input(ConnectionInputs.SEND_PUSH_PROMISE)
1032        stream = self._get_stream_by_id(stream_id)
1033
1034        # We need to prevent users pushing streams in response to streams that
1035        # they themselves have already pushed: see #163 and RFC 7540 § 6.6. The
1036        # easiest way to do that is to assert that the stream_id is not even:
1037        # this shortcut works because only servers can push and the state
1038        # machine will enforce this.
1039        if (stream_id % 2) == 0:
1040            raise ProtocolError("Cannot recursively push streams.")
1041
1042        new_stream = self._begin_new_stream(
1043            promised_stream_id, AllowedStreamIDs.EVEN
1044        )
1045        self.streams[promised_stream_id] = new_stream
1046
1047        frames = stream.push_stream_in_band(
1048            promised_stream_id, request_headers, self.encoder
1049        )
1050        new_frames = new_stream.locally_pushed()
1051        self._prepare_for_sending(frames + new_frames)
1052
1053    def ping(self, opaque_data):
1054        """
1055        Send a PING frame.
1056
1057        :param opaque_data: A bytestring of length 8 that will be sent in the
1058                            PING frame.
1059        :returns: Nothing
1060        """
1061        self.config.logger.debug("Send Ping frame")
1062
1063        if not isinstance(opaque_data, bytes) or len(opaque_data) != 8:
1064            raise ValueError("Invalid value for ping data: %r" % opaque_data)
1065
1066        self.state_machine.process_input(ConnectionInputs.SEND_PING)
1067        f = PingFrame(0)
1068        f.opaque_data = opaque_data
1069        self._prepare_for_sending([f])
1070
1071    def reset_stream(self, stream_id, error_code=0):
1072        """
1073        Reset a stream.
1074
1075        This method forcibly closes a stream by sending a RST_STREAM frame for
1076        a given stream. This is not a graceful closure. To gracefully end a
1077        stream, try the :meth:`end_stream
1078        <h2.connection.H2Connection.end_stream>` method.
1079
1080        :param stream_id: The ID of the stream to reset.
1081        :type stream_id: ``int``
1082        :param error_code: (optional) The error code to use to reset the
1083            stream. Defaults to :data:`ErrorCodes.NO_ERROR
1084            <h2.errors.ErrorCodes.NO_ERROR>`.
1085        :type error_code: ``int``
1086        :returns: Nothing
1087        """
1088        self.config.logger.debug("Reset stream ID %d", stream_id)
1089        self.state_machine.process_input(ConnectionInputs.SEND_RST_STREAM)
1090        stream = self._get_stream_by_id(stream_id)
1091        frames = stream.reset_stream(error_code)
1092        self._prepare_for_sending(frames)
1093
1094    def close_connection(self, error_code=0, additional_data=None,
1095                         last_stream_id=None):
1096
1097        """
1098        Close a connection, emitting a GOAWAY frame.
1099
1100        .. versionchanged:: 2.4.0
1101           Added ``additional_data`` and ``last_stream_id`` arguments.
1102
1103        :param error_code: (optional) The error code to send in the GOAWAY
1104            frame.
1105        :param additional_data: (optional) Additional debug data indicating
1106            a reason for closing the connection. Must be a bytestring.
1107        :param last_stream_id: (optional) The last stream which was processed
1108            by the sender. Defaults to ``highest_inbound_stream_id``.
1109        :returns: Nothing
1110        """
1111        self.config.logger.debug("Close connection")
1112        self.state_machine.process_input(ConnectionInputs.SEND_GOAWAY)
1113
1114        # Additional_data must be bytes
1115        if additional_data is not None:
1116            assert isinstance(additional_data, bytes)
1117
1118        if last_stream_id is None:
1119            last_stream_id = self.highest_inbound_stream_id
1120
1121        f = GoAwayFrame(
1122            stream_id=0,
1123            last_stream_id=last_stream_id,
1124            error_code=error_code,
1125            additional_data=(additional_data or b'')
1126        )
1127        self._prepare_for_sending([f])
1128
1129    def update_settings(self, new_settings):
1130        """
1131        Update the local settings. This will prepare and emit the appropriate
1132        SETTINGS frame.
1133
1134        :param new_settings: A dictionary of {setting: new value}
1135        """
1136        self.config.logger.debug(
1137            "Update connection settings to %s", new_settings
1138        )
1139        self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS)
1140        self.local_settings.update(new_settings)
1141        s = SettingsFrame(0)
1142        s.settings = new_settings
1143        self._prepare_for_sending([s])
1144
1145    def advertise_alternative_service(self,
1146                                      field_value,
1147                                      origin=None,
1148                                      stream_id=None):
1149        """
1150        Notify a client about an available Alternative Service.
1151
1152        An Alternative Service is defined in `RFC 7838
1153        <https://tools.ietf.org/html/rfc7838>`_. An Alternative Service
1154        notification informs a client that a given origin is also available
1155        elsewhere.
1156
1157        Alternative Services can be advertised in two ways. Firstly, they can
1158        be advertised explicitly: that is, a server can say "origin X is also
1159        available at Y". To advertise like this, set the ``origin`` argument
1160        and not the ``stream_id`` argument. Alternatively, they can be
1161        advertised implicitly: that is, a server can say "the origin you're
1162        contacting on stream X is also available at Y". To advertise like this,
1163        set the ``stream_id`` argument and not the ``origin`` argument.
1164
1165        The explicit method of advertising can be done as long as the
1166        connection is active. The implicit method can only be done after the
1167        client has sent the request headers and before the server has sent the
1168        response headers: outside of those points, Hyper-h2 will forbid sending
1169        the Alternative Service advertisement by raising a ProtocolError.
1170
1171        The ``field_value`` parameter is specified in RFC 7838. Hyper-h2 does
1172        not validate or introspect this argument: the user is required to
1173        ensure that it's well-formed. ``field_value`` corresponds to RFC 7838's
1174        "Alternative Service Field Value".
1175
1176        .. note:: It is strongly preferred to use the explicit method of
1177                  advertising Alternative Services. The implicit method of
1178                  advertising Alternative Services has a number of subtleties
1179                  and can lead to inconsistencies between the server and
1180                  client. Hyper-h2 allows both mechanisms, but caution is
1181                  strongly advised.
1182
1183        .. versionadded:: 2.3.0
1184
1185        :param field_value: The RFC 7838 Alternative Service Field Value. This
1186            argument is not introspected by Hyper-h2: the user is responsible
1187            for ensuring that it is well-formed.
1188        :type field_value: ``bytes``
1189
1190        :param origin: The origin/authority to which the Alternative Service
1191            being advertised applies. Must not be provided at the same time as
1192            ``stream_id``.
1193        :type origin: ``bytes`` or ``None``
1194
1195        :param stream_id: The ID of the stream which was sent to the authority
1196            for which this Alternative Service advertisement applies. Must not
1197            be provided at the same time as ``origin``.
1198        :type stream_id: ``int`` or ``None``
1199
1200        :returns: Nothing.
1201        """
1202        if not isinstance(field_value, bytes):
1203            raise ValueError("Field must be bytestring.")
1204
1205        if origin is not None and stream_id is not None:
1206            raise ValueError("Must not provide both origin and stream_id")
1207
1208        self.state_machine.process_input(
1209            ConnectionInputs.SEND_ALTERNATIVE_SERVICE
1210        )
1211
1212        if origin is not None:
1213            # This ALTSVC is sent on stream zero.
1214            f = AltSvcFrame(stream_id=0)
1215            f.origin = origin
1216            f.field = field_value
1217            frames = [f]
1218        else:
1219            stream = self._get_stream_by_id(stream_id)
1220            frames = stream.advertise_alternative_service(field_value)
1221
1222        self._prepare_for_sending(frames)
1223
1224    def prioritize(self, stream_id, weight=None, depends_on=None,
1225                   exclusive=None):
1226        """
1227        Notify a server about the priority of a stream.
1228
1229        Stream priorities are a form of guidance to a remote server: they
1230        inform the server about how important a given response is, so that the
1231        server may allocate its resources (e.g. bandwidth, CPU time, etc.)
1232        accordingly. This exists to allow clients to ensure that the most
1233        important data arrives earlier, while less important data does not
1234        starve out the more important data.
1235
1236        Stream priorities are explained in depth in `RFC 7540 Section 5.3
1237        <https://tools.ietf.org/html/rfc7540#section-5.3>`_.
1238
1239        This method updates the priority information of a single stream. It may
1240        be called well before a stream is actively in use, or well after a
1241        stream is closed.
1242
1243        .. warning:: RFC 7540 allows for servers to change the priority of
1244                     streams. However, hyper-h2 **does not** allow server
1245                     stacks to do this. This is because most clients do not
1246                     adequately know how to respond when provided conflicting
1247                     priority information, and relatively little utility is
1248                     provided by making that functionality available.
1249
1250        .. note:: hyper-h2 **does not** maintain any information about the
1251                  RFC 7540 priority tree. That means that hyper-h2 does not
1252                  prevent incautious users from creating invalid priority
1253                  trees, particularly by creating priority loops. While some
1254                  basic error checking is provided by hyper-h2, users are
1255                  strongly recommended to understand their prioritisation
1256                  strategies before using the priority tools here.
1257
1258        .. note:: Priority information is strictly advisory. Servers are
1259                  allowed to disregard it entirely. Avoid relying on the idea
1260                  that your priority signaling will definitely be obeyed.
1261
1262        .. versionadded:: 2.4.0
1263
1264        :param stream_id: The ID of the stream to prioritize.
1265        :type stream_id: ``int``
1266
1267        :param weight: The weight to give the stream. Defaults to ``16``, the
1268             default weight of any stream. May be any value between ``1`` and
1269             ``256`` inclusive. The relative weight of a stream indicates what
1270             proportion of available resources will be allocated to that
1271             stream.
1272        :type weight: ``int``
1273
1274        :param depends_on: The ID of the stream on which this stream depends.
1275             This stream will only be progressed if it is impossible to
1276             progress the parent stream (the one on which this one depends).
1277             Passing the value ``0`` means that this stream does not depend on
1278             any other. Defaults to ``0``.
1279        :type depends_on: ``int``
1280
1281        :param exclusive: Whether this stream is an exclusive dependency of its
1282            "parent" stream (i.e. the stream given by ``depends_on``). If a
1283            stream is an exclusive dependency of another, that means that all
1284            previously-set children of the parent are moved to become children
1285            of the new exclusively-dependent stream. Defaults to ``False``.
1286        :type exclusive: ``bool``
1287        """
1288        if not self.config.client_side:
1289            raise RFC1122Error("Servers SHOULD NOT prioritize streams.")
1290
1291        self.state_machine.process_input(
1292            ConnectionInputs.SEND_PRIORITY
1293        )
1294
1295        frame = PriorityFrame(stream_id)
1296        frame = _add_frame_priority(frame, weight, depends_on, exclusive)
1297
1298        self._prepare_for_sending([frame])
1299
1300    def local_flow_control_window(self, stream_id):
1301        """
1302        Returns the maximum amount of data that can be sent on stream
1303        ``stream_id``.
1304
1305        This value will never be larger than the total data that can be sent on
1306        the connection: even if the given stream allows more data, the
1307        connection window provides a logical maximum to the amount of data that
1308        can be sent.
1309
1310        The maximum data that can be sent in a single data frame on a stream
1311        is either this value, or the maximum frame size, whichever is
1312        *smaller*.
1313
1314        :param stream_id: The ID of the stream whose flow control window is
1315            being queried.
1316        :type stream_id: ``int``
1317        :returns: The amount of data in bytes that can be sent on the stream
1318            before the flow control window is exhausted.
1319        :rtype: ``int``
1320        """
1321        stream = self._get_stream_by_id(stream_id)
1322        return min(
1323            self.outbound_flow_control_window,
1324            stream.outbound_flow_control_window
1325        )
1326
1327    def remote_flow_control_window(self, stream_id):
1328        """
1329        Returns the maximum amount of data the remote peer can send on stream
1330        ``stream_id``.
1331
1332        This value will never be larger than the total data that can be sent on
1333        the connection: even if the given stream allows more data, the
1334        connection window provides a logical maximum to the amount of data that
1335        can be sent.
1336
1337        The maximum data that can be sent in a single data frame on a stream
1338        is either this value, or the maximum frame size, whichever is
1339        *smaller*.
1340
1341        :param stream_id: The ID of the stream whose flow control window is
1342            being queried.
1343        :type stream_id: ``int``
1344        :returns: The amount of data in bytes that can be received on the
1345            stream before the flow control window is exhausted.
1346        :rtype: ``int``
1347        """
1348        stream = self._get_stream_by_id(stream_id)
1349        return min(
1350            self.inbound_flow_control_window,
1351            stream.inbound_flow_control_window
1352        )
1353
1354    def acknowledge_received_data(self, acknowledged_size, stream_id):
1355        """
1356        Inform the :class:`H2Connection <h2.connection.H2Connection>` that a
1357        certain number of flow-controlled bytes have been processed, and that
1358        the space should be handed back to the remote peer at an opportune
1359        time.
1360
1361        .. versionadded:: 2.5.0
1362
1363        :param acknowledged_size: The total *flow-controlled size* of the data
1364            that has been processed. Note that this must include the amount of
1365            padding that was sent with that data.
1366        :type acknowledged_size: ``int``
1367        :param stream_id: The ID of the stream on which this data was received.
1368        :type stream_id: ``int``
1369        :returns: Nothing
1370        :rtype: ``None``
1371        """
1372        self.config.logger.debug(
1373            "Ack received data on stream ID %d with size %d",
1374            stream_id, acknowledged_size
1375        )
1376        if stream_id <= 0:
1377            raise ValueError(
1378                "Stream ID %d is not valid for acknowledge_received_data" %
1379                stream_id
1380            )
1381        if acknowledged_size < 0:
1382            raise ValueError("Cannot acknowledge negative data")
1383
1384        frames = []
1385
1386        conn_manager = self._inbound_flow_control_window_manager
1387        conn_increment = conn_manager.process_bytes(acknowledged_size)
1388        if conn_increment:
1389            f = WindowUpdateFrame(0)
1390            f.window_increment = conn_increment
1391            frames.append(f)
1392
1393        try:
1394            stream = self._get_stream_by_id(stream_id)
1395        except StreamClosedError:
1396            # The stream is already gone. We're not worried about incrementing
1397            # the window in this case.
1398            pass
1399        else:
1400            # No point incrementing the windows of closed streams.
1401            if stream.open:
1402                frames.extend(
1403                    stream.acknowledge_received_data(acknowledged_size)
1404                )
1405
1406        self._prepare_for_sending(frames)
1407
1408    def data_to_send(self, amt=None):
1409        """
1410        Returns some data for sending out of the internal data buffer.
1411
1412        This method is analogous to ``read`` on a file-like object, but it
1413        doesn't block. Instead, it returns as much data as the user asks for,
1414        or less if that much data is not available. It does not perform any
1415        I/O, and so uses a different name.
1416
1417        :param amt: (optional) The maximum amount of data to return. If not
1418            set, or set to ``None``, will return as much data as possible.
1419        :type amt: ``int``
1420        :returns: A bytestring containing the data to send on the wire.
1421        :rtype: ``bytes``
1422        """
1423        if amt is None:
1424            data = self._data_to_send
1425            self._data_to_send = b''
1426            return data
1427        else:
1428            data = self._data_to_send[:amt]
1429            self._data_to_send = self._data_to_send[amt:]
1430            return data
1431
1432    def clear_outbound_data_buffer(self):
1433        """
1434        Clears the outbound data buffer, such that if this call was immediately
1435        followed by a call to
1436        :meth:`data_to_send <h2.connection.H2Connection.data_to_send>`, that
1437        call would return no data.
1438
1439        This method should not normally be used, but is made available to avoid
1440        exposing implementation details.
1441        """
1442        self._data_to_send = b''
1443
1444    def _acknowledge_settings(self):
1445        """
1446        Acknowledge settings that have been received.
1447
1448        .. versionchanged:: 2.0.0
1449           Removed from public API, removed useless ``event`` parameter, made
1450           automatic.
1451
1452        :returns: Nothing
1453        """
1454        self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS)
1455
1456        changes = self.remote_settings.acknowledge()
1457
1458        if SettingCodes.INITIAL_WINDOW_SIZE in changes:
1459            setting = changes[SettingCodes.INITIAL_WINDOW_SIZE]
1460            self._flow_control_change_from_settings(
1461                setting.original_value,
1462                setting.new_value,
1463            )
1464
1465        # HEADER_TABLE_SIZE changes by the remote part affect our encoder: cf.
1466        # RFC 7540 Section 6.5.2.
1467        if SettingCodes.HEADER_TABLE_SIZE in changes:
1468            setting = changes[SettingCodes.HEADER_TABLE_SIZE]
1469            self.encoder.header_table_size = setting.new_value
1470
1471        if SettingCodes.MAX_FRAME_SIZE in changes:
1472            setting = changes[SettingCodes.MAX_FRAME_SIZE]
1473            self.max_outbound_frame_size = setting.new_value
1474            for stream in self.streams.values():
1475                stream.max_outbound_frame_size = setting.new_value
1476
1477        f = SettingsFrame(0)
1478        f.flags.add('ACK')
1479        return [f]
1480
1481    def _flow_control_change_from_settings(self, old_value, new_value):
1482        """
1483        Update flow control windows in response to a change in the value of
1484        SETTINGS_INITIAL_WINDOW_SIZE.
1485
1486        When this setting is changed, it automatically updates all flow control
1487        windows by the delta in the settings values. Note that it does not
1488        increment the *connection* flow control window, per section 6.9.2 of
1489        RFC 7540.
1490        """
1491        delta = new_value - old_value
1492
1493        for stream in self.streams.values():
1494            stream.outbound_flow_control_window = guard_increment_window(
1495                stream.outbound_flow_control_window,
1496                delta
1497            )
1498
1499    def _inbound_flow_control_change_from_settings(self, old_value, new_value):
1500        """
1501        Update remote flow control windows in response to a change in the value
1502        of SETTINGS_INITIAL_WINDOW_SIZE.
1503
1504        When this setting is changed, it automatically updates all remote flow
1505        control windows by the delta in the settings values.
1506        """
1507        delta = new_value - old_value
1508
1509        for stream in self.streams.values():
1510            stream._inbound_flow_control_change_from_settings(delta)
1511
1512    def receive_data(self, data):
1513        """
1514        Pass some received HTTP/2 data to the connection for handling.
1515
1516        :param data: The data received from the remote peer on the network.
1517        :type data: ``bytes``
1518        :returns: A list of events that the remote peer triggered by sending
1519            this data.
1520        """
1521        self.config.logger.debug(
1522            "Process received data on connection. Received data: %r", data
1523        )
1524
1525        events = []
1526        self.incoming_buffer.add_data(data)
1527        self.incoming_buffer.max_frame_size = self.max_inbound_frame_size
1528
1529        try:
1530            for frame in self.incoming_buffer:
1531                events.extend(self._receive_frame(frame))
1532        except InvalidPaddingError:
1533            self._terminate_connection(ErrorCodes.PROTOCOL_ERROR)
1534            raise ProtocolError("Received frame with invalid padding.")
1535        except ProtocolError as e:
1536            # For whatever reason, receiving the frame caused a protocol error.
1537            # We should prepare to emit a GoAway frame before throwing the
1538            # exception up further. No need for an event: the exception will
1539            # do fine.
1540            self._terminate_connection(e.error_code)
1541            raise
1542
1543        return events
1544
1545    def _receive_frame(self, frame):
1546        """
1547        Handle a frame received on the connection.
1548
1549        .. versionchanged:: 2.0.0
1550           Removed from the public API.
1551        """
1552        try:
1553            # I don't love using __class__ here, maybe reconsider it.
1554            frames, events = self._frame_dispatch_table[frame.__class__](frame)
1555        except StreamClosedError as e:
1556            # If the stream was closed by RST_STREAM, we just send a RST_STREAM
1557            # to the remote peer. Otherwise, this is a connection error, and so
1558            # we will re-raise to trigger one.
1559            if self._stream_is_closed_by_reset(e.stream_id):
1560                f = RstStreamFrame(e.stream_id)
1561                f.error_code = e.error_code
1562                self._prepare_for_sending([f])
1563                events = e._events
1564            else:
1565                raise
1566        except StreamIDTooLowError as e:
1567            # The stream ID seems invalid. This may happen when the closed
1568            # stream has been cleaned up, or when the remote peer has opened a
1569            # new stream with a higher stream ID than this one, forcing it
1570            # closed implicitly.
1571            #
1572            # Check how the stream was closed: depending on the mechanism, it
1573            # is either a stream error or a connection error.
1574            if self._stream_is_closed_by_reset(e.stream_id):
1575                # Closed by RST_STREAM is a stream error.
1576                f = RstStreamFrame(e.stream_id)
1577                f.error_code = ErrorCodes.STREAM_CLOSED
1578                self._prepare_for_sending([f])
1579                events = []
1580            elif self._stream_is_closed_by_end(e.stream_id):
1581                # Closed by END_STREAM is a connection error.
1582                raise StreamClosedError(e.stream_id)
1583            else:
1584                # Closed implicitly, also a connection error, but of type
1585                # PROTOCOL_ERROR.
1586                raise
1587        else:
1588            self._prepare_for_sending(frames)
1589
1590        return events
1591
1592    def _terminate_connection(self, error_code):
1593        """
1594        Terminate the connection early. Used in error handling blocks to send
1595        GOAWAY frames.
1596        """
1597        f = GoAwayFrame(0)
1598        f.last_stream_id = self.highest_inbound_stream_id
1599        f.error_code = error_code
1600        self.state_machine.process_input(ConnectionInputs.SEND_GOAWAY)
1601        self._prepare_for_sending([f])
1602
1603    def _receive_headers_frame(self, frame):
1604        """
1605        Receive a headers frame on the connection.
1606        """
1607        # If necessary, check we can open the stream. Also validate that the
1608        # stream ID is valid.
1609        if frame.stream_id not in self.streams:
1610            max_open_streams = self.local_settings.max_concurrent_streams
1611            if (self.open_inbound_streams + 1) > max_open_streams:
1612                raise TooManyStreamsError(
1613                    "Max outbound streams is %d, %d open" %
1614                    (max_open_streams, self.open_outbound_streams)
1615                )
1616
1617        # Let's decode the headers. We handle headers as bytes internally up
1618        # until we hang them off the event, at which point we may optionally
1619        # convert them to unicode.
1620        headers = _decode_headers(self.decoder, frame.data)
1621
1622        events = self.state_machine.process_input(
1623            ConnectionInputs.RECV_HEADERS
1624        )
1625        stream = self._get_or_create_stream(
1626            frame.stream_id, AllowedStreamIDs(not self.config.client_side)
1627        )
1628        frames, stream_events = stream.receive_headers(
1629            headers,
1630            'END_STREAM' in frame.flags,
1631            self.config.header_encoding
1632        )
1633
1634        if 'PRIORITY' in frame.flags:
1635            p_frames, p_events = self._receive_priority_frame(frame)
1636            stream_events[0].priority_updated = p_events[0]
1637            stream_events.extend(p_events)
1638            assert not p_frames
1639
1640        return frames, events + stream_events
1641
1642    def _receive_push_promise_frame(self, frame):
1643        """
1644        Receive a push-promise frame on the connection.
1645        """
1646        if not self.local_settings.enable_push:
1647            raise ProtocolError("Received pushed stream")
1648
1649        pushed_headers = _decode_headers(self.decoder, frame.data)
1650
1651        events = self.state_machine.process_input(
1652            ConnectionInputs.RECV_PUSH_PROMISE
1653        )
1654
1655        try:
1656            stream = self._get_stream_by_id(frame.stream_id)
1657        except NoSuchStreamError:
1658            # We need to check if the parent stream was reset by us. If it was
1659            # then we presume that the PUSH_PROMISE was in flight when we reset
1660            # the parent stream. Rather than accept the new stream, just reset
1661            # it.
1662            #
1663            # If this was closed naturally, however, we should call this a
1664            # PROTOCOL_ERROR: pushing a stream on a naturally closed stream is
1665            # a real problem because it creates a brand new stream that the
1666            # remote peer now believes exists.
1667            if (self._stream_closed_by(frame.stream_id) ==
1668                    StreamClosedBy.SEND_RST_STREAM):
1669                f = RstStreamFrame(frame.promised_stream_id)
1670                f.error_code = ErrorCodes.REFUSED_STREAM
1671                return [f], events
1672
1673            raise ProtocolError("Attempted to push on closed stream.")
1674
1675        # We need to prevent peers pushing streams in response to streams that
1676        # they themselves have already pushed: see #163 and RFC 7540 § 6.6. The
1677        # easiest way to do that is to assert that the stream_id is not even:
1678        # this shortcut works because only servers can push and the state
1679        # machine will enforce this.
1680        if (frame.stream_id % 2) == 0:
1681            raise ProtocolError("Cannot recursively push streams.")
1682
1683        try:
1684            frames, stream_events = stream.receive_push_promise_in_band(
1685                frame.promised_stream_id,
1686                pushed_headers,
1687                self.config.header_encoding,
1688            )
1689        except StreamClosedError:
1690            # The parent stream was reset by us, so we presume that
1691            # PUSH_PROMISE was in flight when we reset the parent stream.
1692            # So we just reset the new stream.
1693            f = RstStreamFrame(frame.promised_stream_id)
1694            f.error_code = ErrorCodes.REFUSED_STREAM
1695            return [f], events
1696
1697        new_stream = self._begin_new_stream(
1698            frame.promised_stream_id, AllowedStreamIDs.EVEN
1699        )
1700        self.streams[frame.promised_stream_id] = new_stream
1701        new_stream.remotely_pushed(pushed_headers)
1702
1703        return frames, events + stream_events
1704
1705    def _receive_data_frame(self, frame):
1706        """
1707        Receive a data frame on the connection.
1708        """
1709        flow_controlled_length = frame.flow_controlled_length
1710
1711        events = self.state_machine.process_input(
1712            ConnectionInputs.RECV_DATA
1713        )
1714        self._inbound_flow_control_window_manager.window_consumed(
1715            flow_controlled_length
1716        )
1717        stream = self._get_stream_by_id(frame.stream_id)
1718        frames, stream_events = stream.receive_data(
1719            frame.data,
1720            'END_STREAM' in frame.flags,
1721            flow_controlled_length
1722        )
1723        return frames, events + stream_events
1724
1725    def _receive_settings_frame(self, frame):
1726        """
1727        Receive a SETTINGS frame on the connection.
1728        """
1729        events = self.state_machine.process_input(
1730            ConnectionInputs.RECV_SETTINGS
1731        )
1732
1733        # This is an ack of the local settings.
1734        if 'ACK' in frame.flags:
1735            changed_settings = self._local_settings_acked()
1736            ack_event = SettingsAcknowledged()
1737            ack_event.changed_settings = changed_settings
1738            events.append(ack_event)
1739            return [], events
1740
1741        # Add the new settings.
1742        self.remote_settings.update(frame.settings)
1743        events.append(
1744            RemoteSettingsChanged.from_settings(
1745                self.remote_settings, frame.settings
1746            )
1747        )
1748        frames = self._acknowledge_settings()
1749
1750        return frames, events
1751
1752    def _receive_window_update_frame(self, frame):
1753        """
1754        Receive a WINDOW_UPDATE frame on the connection.
1755        """
1756        # Validate the frame.
1757        if not (1 <= frame.window_increment <= self.MAX_WINDOW_INCREMENT):
1758            raise ProtocolError(
1759                "Flow control increment must be between 1 and %d, received %d"
1760                % (self.MAX_WINDOW_INCREMENT, frame.window_increment)
1761            )
1762
1763        events = self.state_machine.process_input(
1764            ConnectionInputs.RECV_WINDOW_UPDATE
1765        )
1766
1767        if frame.stream_id:
1768            stream = self._get_stream_by_id(frame.stream_id)
1769            frames, stream_events = stream.receive_window_update(
1770                frame.window_increment
1771            )
1772        else:
1773            # Increment our local flow control window.
1774            self.outbound_flow_control_window = guard_increment_window(
1775                self.outbound_flow_control_window,
1776                frame.window_increment
1777            )
1778
1779            # FIXME: Should we split this into one event per active stream?
1780            window_updated_event = WindowUpdated()
1781            window_updated_event.stream_id = 0
1782            window_updated_event.delta = frame.window_increment
1783            stream_events = [window_updated_event]
1784            frames = []
1785
1786        return frames, events + stream_events
1787
1788    def _receive_ping_frame(self, frame):
1789        """
1790        Receive a PING frame on the connection.
1791        """
1792        events = self.state_machine.process_input(
1793            ConnectionInputs.RECV_PING
1794        )
1795        flags = []
1796
1797        if 'ACK' in frame.flags:
1798            evt = PingAcknowledged()
1799            evt.ping_data = frame.opaque_data
1800            events.append(evt)
1801        else:
1802            f = PingFrame(0)
1803            f.flags = {'ACK'}
1804            f.opaque_data = frame.opaque_data
1805            flags.append(f)
1806
1807        return flags, events
1808
1809    def _receive_rst_stream_frame(self, frame):
1810        """
1811        Receive a RST_STREAM frame on the connection.
1812        """
1813        events = self.state_machine.process_input(
1814            ConnectionInputs.RECV_RST_STREAM
1815        )
1816        try:
1817            stream = self._get_stream_by_id(frame.stream_id)
1818        except NoSuchStreamError:
1819            # The stream is missing. That's ok, we just do nothing here.
1820            stream_frames = []
1821            stream_events = []
1822        else:
1823            stream_frames, stream_events = stream.stream_reset(frame)
1824
1825        return stream_frames, events + stream_events
1826
1827    def _receive_priority_frame(self, frame):
1828        """
1829        Receive a PRIORITY frame on the connection.
1830        """
1831        events = self.state_machine.process_input(
1832            ConnectionInputs.RECV_PRIORITY
1833        )
1834
1835        event = PriorityUpdated()
1836        event.stream_id = frame.stream_id
1837        event.depends_on = frame.depends_on
1838        event.exclusive = frame.exclusive
1839
1840        # Weight is an integer between 1 and 256, but the byte only allows
1841        # 0 to 255: add one.
1842        event.weight = frame.stream_weight + 1
1843
1844        # A stream may not depend on itself.
1845        if event.depends_on == frame.stream_id:
1846            raise ProtocolError(
1847                "Stream %d may not depend on itself" % frame.stream_id
1848            )
1849        events.append(event)
1850
1851        return [], events
1852
1853    def _receive_goaway_frame(self, frame):
1854        """
1855        Receive a GOAWAY frame on the connection.
1856        """
1857        events = self.state_machine.process_input(
1858            ConnectionInputs.RECV_GOAWAY
1859        )
1860
1861        # Clear the outbound data buffer: we cannot send further data now.
1862        self.clear_outbound_data_buffer()
1863
1864        # Fire an appropriate ConnectionTerminated event.
1865        new_event = ConnectionTerminated()
1866        new_event.error_code = _error_code_from_int(frame.error_code)
1867        new_event.last_stream_id = frame.last_stream_id
1868        new_event.additional_data = (frame.additional_data
1869                                     if frame.additional_data else None)
1870        events.append(new_event)
1871
1872        return [], events
1873
1874    def _receive_naked_continuation(self, frame):
1875        """
1876        A naked CONTINUATION frame has been received. This is always an error,
1877        but the type of error it is depends on the state of the stream and must
1878        transition the state of the stream, so we need to pass it to the
1879        appropriate stream.
1880        """
1881        stream = self._get_stream_by_id(frame.stream_id)
1882        stream.receive_continuation()
1883        assert False, "Should not be reachable"
1884
1885    def _receive_alt_svc_frame(self, frame):
1886        """
1887        An ALTSVC frame has been received. This frame, specified in RFC 7838,
1888        is used to advertise alternative places where the same service can be
1889        reached.
1890
1891        This frame can optionally be received either on a stream or on stream
1892        0, and its semantics are different in each case.
1893        """
1894        events = self.state_machine.process_input(
1895            ConnectionInputs.RECV_ALTERNATIVE_SERVICE
1896        )
1897        frames = []
1898
1899        if frame.stream_id:
1900            # Given that it makes no sense to receive ALTSVC on a stream
1901            # before that stream has been opened with a HEADERS frame, the
1902            # ALTSVC frame cannot create a stream. If the stream is not
1903            # present, we simply ignore the frame.
1904            try:
1905                stream = self._get_stream_by_id(frame.stream_id)
1906            except (NoSuchStreamError, StreamClosedError):
1907                pass
1908            else:
1909                stream_frames, stream_events = stream.receive_alt_svc(frame)
1910                frames.extend(stream_frames)
1911                events.extend(stream_events)
1912        else:
1913            # This frame is sent on stream 0. The origin field on the frame
1914            # must be present, though if it isn't it's not a ProtocolError
1915            # (annoyingly), we just need to ignore it.
1916            if not frame.origin:
1917                return frames, events
1918
1919            # If we're a server, we want to ignore this (RFC 7838 says so).
1920            if not self.config.client_side:
1921                return frames, events
1922
1923            event = AlternativeServiceAvailable()
1924            event.origin = frame.origin
1925            event.field_value = frame.field
1926            events.append(event)
1927
1928        return frames, events
1929
1930    def _receive_unknown_frame(self, frame):
1931        """
1932        We have received a frame that we do not understand. This is almost
1933        certainly an extension frame, though it's impossible to be entirely
1934        sure.
1935
1936        RFC 7540 § 5.5 says that we MUST ignore unknown frame types: so we
1937        do.
1938        """
1939        # All we do here is log.
1940        self.config.logger.debug(
1941            "Received unknown extension frame (ID %d)", frame.stream_id
1942        )
1943        return [], []
1944
1945    def _local_settings_acked(self):
1946        """
1947        Handle the local settings being ACKed, update internal state.
1948        """
1949        changes = self.local_settings.acknowledge()
1950
1951        if SettingCodes.INITIAL_WINDOW_SIZE in changes:
1952            setting = changes[SettingCodes.INITIAL_WINDOW_SIZE]
1953            self._inbound_flow_control_change_from_settings(
1954                setting.original_value,
1955                setting.new_value,
1956            )
1957
1958        if SettingCodes.MAX_HEADER_LIST_SIZE in changes:
1959            setting = changes[SettingCodes.MAX_HEADER_LIST_SIZE]
1960            self.decoder.max_header_list_size = setting.new_value
1961
1962        if SettingCodes.MAX_FRAME_SIZE in changes:
1963            setting = changes[SettingCodes.MAX_FRAME_SIZE]
1964            self.max_inbound_frame_size = setting.new_value
1965
1966        if SettingCodes.HEADER_TABLE_SIZE in changes:
1967            setting = changes[SettingCodes.HEADER_TABLE_SIZE]
1968            # This is safe across all hpack versions: some versions just won't
1969            # respect it.
1970            self.decoder.max_allowed_table_size = setting.new_value
1971
1972        return changes
1973
1974    def _stream_id_is_outbound(self, stream_id):
1975        """
1976        Returns ``True`` if the stream ID corresponds to an outbound stream
1977        (one initiated by this peer), returns ``False`` otherwise.
1978        """
1979        return (stream_id % 2 == int(self.config.client_side))
1980
1981    def _stream_closed_by(self, stream_id):
1982        """
1983        Returns how the stream was closed.
1984
1985        The return value will be either a member of
1986        ``h2.stream.StreamClosedBy`` or ``None``. If ``None``, the stream was
1987        closed implicitly by the peer opening a stream with a higher stream ID
1988        before opening this one.
1989        """
1990        if stream_id in self.streams:
1991            return self.streams[stream_id].closed_by
1992        if stream_id in self._closed_streams:
1993            return self._closed_streams[stream_id]
1994        return None
1995
1996    def _stream_is_closed_by_reset(self, stream_id):
1997        """
1998        Returns ``True`` if the stream was closed by sending or receiving a
1999        RST_STREAM frame. Returns ``False`` otherwise.
2000        """
2001        return self._stream_closed_by(stream_id) in (
2002            StreamClosedBy.RECV_RST_STREAM, StreamClosedBy.SEND_RST_STREAM
2003        )
2004
2005    def _stream_is_closed_by_end(self, stream_id):
2006        """
2007        Returns ``True`` if the stream was closed by sending or receiving an
2008        END_STREAM flag in a HEADERS or DATA frame. Returns ``False``
2009        otherwise.
2010        """
2011        return self._stream_closed_by(stream_id) in (
2012            StreamClosedBy.RECV_END_STREAM, StreamClosedBy.SEND_END_STREAM
2013        )
2014
2015
2016def _add_frame_priority(frame, weight=None, depends_on=None, exclusive=None):
2017    """
2018    Adds priority data to a given frame. Does not change any flags set on that
2019    frame: if the caller is adding priority information to a HEADERS frame they
2020    must set that themselves.
2021
2022    This method also deliberately sets defaults for anything missing.
2023
2024    This method validates the input values.
2025    """
2026    # A stream may not depend on itself.
2027    if depends_on == frame.stream_id:
2028        raise ProtocolError(
2029            "Stream %d may not depend on itself" % frame.stream_id
2030        )
2031
2032    # Weight must be between 1 and 256.
2033    if weight is not None:
2034        if weight > 256 or weight < 1:
2035            raise ProtocolError(
2036                "Weight must be between 1 and 256, not %d" % weight
2037            )
2038        else:
2039            # Weight is an integer between 1 and 256, but the byte only allows
2040            # 0 to 255: subtract one.
2041            weight -= 1
2042
2043    # Set defaults for anything not provided.
2044    weight = weight if weight is not None else 15
2045    depends_on = depends_on if depends_on is not None else 0
2046    exclusive = exclusive if exclusive is not None else False
2047
2048    frame.stream_weight = weight
2049    frame.depends_on = depends_on
2050    frame.exclusive = exclusive
2051
2052    return frame
2053
2054
2055def _decode_headers(decoder, encoded_header_block):
2056    """
2057    Decode a HPACK-encoded header block, translating HPACK exceptions into
2058    sensible hyper-h2 errors.
2059
2060    This only ever returns bytestring headers: hyper-h2 may emit them as
2061    unicode later, but internally it processes them as bytestrings only.
2062    """
2063    try:
2064        return decoder.decode(encoded_header_block, raw=True)
2065    except OversizedHeaderListError as e:
2066        # This is a symptom of a HPACK bomb attack: the user has
2067        # disregarded our requirements on how large a header block we'll
2068        # accept.
2069        raise DenialOfServiceError("Oversized header block: %s" % e)
2070    except (HPACKError, IndexError, TypeError, UnicodeDecodeError) as e:
2071        # We should only need HPACKError here, but versions of HPACK older
2072        # than 2.1.0 throw all three others as well. For maximum
2073        # compatibility, catch all of them.
2074        raise ProtocolError("Error decoding header block: %s" % e)
2075