1# -*- coding: utf-8 -*-
2"""
3h2/stream
4~~~~~~~~~
5
6An implementation of a HTTP/2 stream.
7"""
8from enum import Enum, IntEnum
9from hpack import HeaderTuple
10from hyperframe.frame import (
11    HeadersFrame, ContinuationFrame, DataFrame, WindowUpdateFrame,
12    RstStreamFrame, PushPromiseFrame, AltSvcFrame
13)
14
15from .errors import ErrorCodes, _error_code_from_int
16from .events import (
17    RequestReceived, ResponseReceived, DataReceived, WindowUpdated,
18    StreamEnded, PushedStreamReceived, StreamReset, TrailersReceived,
19    InformationalResponseReceived, AlternativeServiceAvailable,
20    _ResponseSent, _RequestSent, _TrailersSent, _PushedRequestSent
21)
22from .exceptions import (
23    ProtocolError, StreamClosedError, InvalidBodyLengthError, FlowControlError
24)
25from .utilities import (
26    guard_increment_window, is_informational_response, authority_from_headers,
27    validate_headers, validate_outbound_headers, normalize_outbound_headers,
28    HeaderValidationFlags, extract_method_header, normalize_inbound_headers
29)
30from .windows import WindowManager
31
32
33class StreamState(IntEnum):
34    IDLE = 0
35    RESERVED_REMOTE = 1
36    RESERVED_LOCAL = 2
37    OPEN = 3
38    HALF_CLOSED_REMOTE = 4
39    HALF_CLOSED_LOCAL = 5
40    CLOSED = 6
41
42
43class StreamInputs(Enum):
44    SEND_HEADERS = 0
45    SEND_PUSH_PROMISE = 1
46    SEND_RST_STREAM = 2
47    SEND_DATA = 3
48    SEND_WINDOW_UPDATE = 4
49    SEND_END_STREAM = 5
50    RECV_HEADERS = 6
51    RECV_PUSH_PROMISE = 7
52    RECV_RST_STREAM = 8
53    RECV_DATA = 9
54    RECV_WINDOW_UPDATE = 10
55    RECV_END_STREAM = 11
56    RECV_CONTINUATION = 12  # Added in 2.0.0
57    SEND_INFORMATIONAL_HEADERS = 13  # Added in 2.2.0
58    RECV_INFORMATIONAL_HEADERS = 14  # Added in 2.2.0
59    SEND_ALTERNATIVE_SERVICE = 15  # Added in 2.3.0
60    RECV_ALTERNATIVE_SERVICE = 16  # Added in 2.3.0
61    UPGRADE_CLIENT = 17  # Added 2.3.0
62    UPGRADE_SERVER = 18  # Added 2.3.0
63
64
65class StreamClosedBy(Enum):
66    SEND_END_STREAM = 0
67    RECV_END_STREAM = 1
68    SEND_RST_STREAM = 2
69    RECV_RST_STREAM = 3
70
71
72# This array is initialized once, and is indexed by the stream states above.
73# It indicates whether a stream in the given state is open. The reason we do
74# this is that we potentially check whether a stream in a given state is open
75# quite frequently: given that we check so often, we should do so in the
76# fastest and most performant way possible.
77STREAM_OPEN = [False for _ in range(0, len(StreamState))]
78STREAM_OPEN[StreamState.OPEN] = True
79STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True
80STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True
81
82
83class H2StreamStateMachine(object):
84    """
85    A single HTTP/2 stream state machine.
86
87    This stream object implements basically the state machine described in
88    RFC 7540 section 5.1.
89
90    :param stream_id: The stream ID of this stream. This is stored primarily
91        for logging purposes.
92    """
93    def __init__(self, stream_id):
94        self.state = StreamState.IDLE
95        self.stream_id = stream_id
96
97        #: Whether this peer is the client side of this stream.
98        self.client = None
99
100        # Whether trailers have been sent/received on this stream or not.
101        self.headers_sent = None
102        self.trailers_sent = None
103        self.headers_received = None
104        self.trailers_received = None
105
106        # How the stream was closed. One of StreamClosedBy.
107        self.stream_closed_by = None
108
109    def process_input(self, input_):
110        """
111        Process a specific input in the state machine.
112        """
113        if not isinstance(input_, StreamInputs):
114            raise ValueError("Input must be an instance of StreamInputs")
115
116        try:
117            func, target_state = _transitions[(self.state, input_)]
118        except KeyError:
119            old_state = self.state
120            self.state = StreamState.CLOSED
121            raise ProtocolError(
122                "Invalid input %s in state %s" % (input_, old_state)
123            )
124        else:
125            previous_state = self.state
126            self.state = target_state
127            if func is not None:
128                try:
129                    return func(self, previous_state)
130                except ProtocolError:
131                    self.state = StreamState.CLOSED
132                    raise
133                except AssertionError as e:  # pragma: no cover
134                    self.state = StreamState.CLOSED
135                    raise ProtocolError(e)
136
137            return []
138
139    def request_sent(self, previous_state):
140        """
141        Fires when a request is sent.
142        """
143        self.client = True
144        self.headers_sent = True
145        event = _RequestSent()
146
147        return [event]
148
149    def response_sent(self, previous_state):
150        """
151        Fires when something that should be a response is sent. This 'response'
152        may actually be trailers.
153        """
154        if not self.headers_sent:
155            if self.client is True or self.client is None:
156                raise ProtocolError("Client cannot send responses.")
157            self.headers_sent = True
158            event = _ResponseSent()
159        else:
160            assert not self.trailers_sent
161            self.trailers_sent = True
162            event = _TrailersSent()
163
164        return [event]
165
166    def request_received(self, previous_state):
167        """
168        Fires when a request is received.
169        """
170        assert not self.headers_received
171        assert not self.trailers_received
172
173        self.client = False
174        self.headers_received = True
175        event = RequestReceived()
176
177        event.stream_id = self.stream_id
178        return [event]
179
180    def response_received(self, previous_state):
181        """
182        Fires when a response is received. Also disambiguates between responses
183        and trailers.
184        """
185        if not self.headers_received:
186            assert self.client is True
187            self.headers_received = True
188            event = ResponseReceived()
189        else:
190            assert not self.trailers_received
191            self.trailers_received = True
192            event = TrailersReceived()
193
194        event.stream_id = self.stream_id
195        return [event]
196
197    def data_received(self, previous_state):
198        """
199        Fires when data is received.
200        """
201        event = DataReceived()
202        event.stream_id = self.stream_id
203        return [event]
204
205    def window_updated(self, previous_state):
206        """
207        Fires when a window update frame is received.
208        """
209        event = WindowUpdated()
210        event.stream_id = self.stream_id
211        return [event]
212
213    def stream_half_closed(self, previous_state):
214        """
215        Fires when an END_STREAM flag is received in the OPEN state,
216        transitioning this stream to a HALF_CLOSED_REMOTE state.
217        """
218        event = StreamEnded()
219        event.stream_id = self.stream_id
220        return [event]
221
222    def stream_ended(self, previous_state):
223        """
224        Fires when a stream is cleanly ended.
225        """
226        self.stream_closed_by = StreamClosedBy.RECV_END_STREAM
227        event = StreamEnded()
228        event.stream_id = self.stream_id
229        return [event]
230
231    def stream_reset(self, previous_state):
232        """
233        Fired when a stream is forcefully reset.
234        """
235        self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM
236        event = StreamReset()
237        event.stream_id = self.stream_id
238        return [event]
239
240    def send_new_pushed_stream(self, previous_state):
241        """
242        Fires on the newly pushed stream, when pushed by the local peer.
243
244        No event here, but definitionally this peer must be a server.
245        """
246        assert self.client is None
247        self.client = False
248        self.headers_received = True
249        return []
250
251    def recv_new_pushed_stream(self, previous_state):
252        """
253        Fires on the newly pushed stream, when pushed by the remote peer.
254
255        No event here, but definitionally this peer must be a client.
256        """
257        assert self.client is None
258        self.client = True
259        self.headers_sent = True
260        return []
261
262    def send_push_promise(self, previous_state):
263        """
264        Fires on the already-existing stream when a PUSH_PROMISE frame is sent.
265        We may only send PUSH_PROMISE frames if we're a server.
266        """
267        if self.client is True:
268            raise ProtocolError("Cannot push streams from client peers.")
269
270        event = _PushedRequestSent()
271        return [event]
272
273    def recv_push_promise(self, previous_state):
274        """
275        Fires on the already-existing stream when a PUSH_PROMISE frame is
276        received. We may only receive PUSH_PROMISE frames if we're a client.
277
278        Fires a PushedStreamReceived event.
279        """
280        if not self.client:
281            if self.client is None:  # pragma: no cover
282                msg = "Idle streams cannot receive pushes"
283            else:  # pragma: no cover
284                msg = "Cannot receive pushed streams as a server"
285            raise ProtocolError(msg)
286
287        event = PushedStreamReceived()
288        event.parent_stream_id = self.stream_id
289        return [event]
290
291    def send_end_stream(self, previous_state):
292        """
293        Called when an attempt is made to send END_STREAM in the
294        HALF_CLOSED_REMOTE state.
295        """
296        self.stream_closed_by = StreamClosedBy.SEND_END_STREAM
297
298    def send_reset_stream(self, previous_state):
299        """
300        Called when an attempt is made to send RST_STREAM in a non-closed
301        stream state.
302        """
303        self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
304
305    def reset_stream_on_error(self, previous_state):
306        """
307        Called when we need to forcefully emit another RST_STREAM frame on
308        behalf of the state machine.
309
310        If this is the first time we've done this, we should also hang an event
311        off the StreamClosedError so that the user can be informed. We know
312        it's the first time we've done this if the stream is currently in a
313        state other than CLOSED.
314        """
315        self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
316
317        error = StreamClosedError(self.stream_id)
318
319        event = StreamReset()
320        event.stream_id = self.stream_id
321        event.error_code = ErrorCodes.STREAM_CLOSED
322        event.remote_reset = False
323        error._events = [event]
324        raise error
325
326    def recv_on_closed_stream(self, previous_state):
327        """
328        Called when an unexpected frame is received on an already-closed
329        stream.
330
331        An endpoint that receives an unexpected frame should treat it as
332        a stream error or connection error with type STREAM_CLOSED, depending
333        on the specific frame. The error handling is done at a higher level:
334        this just raises the appropriate error.
335        """
336        raise StreamClosedError(self.stream_id)
337
338    def send_on_closed_stream(self, previous_state):
339        """
340        Called when an attempt is made to send data on an already-closed
341        stream.
342
343        This essentially overrides the standard logic by throwing a
344        more-specific error: StreamClosedError. This is a ProtocolError, so it
345        matches the standard API of the state machine, but provides more detail
346        to the user.
347        """
348        raise StreamClosedError(self.stream_id)
349
350    def recv_push_on_closed_stream(self, previous_state):
351        """
352        Called when a PUSH_PROMISE frame is received on a full stop
353        stream.
354
355        If the stream was closed by us sending a RST_STREAM frame, then we
356        presume that the PUSH_PROMISE was in flight when we reset the parent
357        stream. Rathen than accept the new stream, we just reset it.
358        Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a
359        naturally closed stream is a real problem because it creates a brand
360        new stream that the remote peer now believes exists.
361        """
362        assert self.stream_closed_by is not None
363
364        if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM:
365            raise StreamClosedError(self.stream_id)
366        else:
367            raise ProtocolError("Attempted to push on closed stream.")
368
369    def send_push_on_closed_stream(self, previous_state):
370        """
371        Called when an attempt is made to push on an already-closed stream.
372
373        This essentially overrides the standard logic by providing a more
374        useful error message. It's necessary because simply indicating that the
375        stream is closed is not enough: there is now a new stream that is not
376        allowed to be there. The only recourse is to tear the whole connection
377        down.
378        """
379        raise ProtocolError("Attempted to push on closed stream.")
380
381    def send_informational_response(self, previous_state):
382        """
383        Called when an informational header block is sent (that is, a block
384        where the :status header has a 1XX value).
385
386        Only enforces that these are sent *before* final headers are sent.
387        """
388        if self.headers_sent:
389            raise ProtocolError("Information response after final response")
390
391        event = _ResponseSent()
392        return [event]
393
394    def recv_informational_response(self, previous_state):
395        """
396        Called when an informational header block is received (that is, a block
397        where the :status header has a 1XX value).
398        """
399        if self.headers_received:
400            raise ProtocolError("Informational response after final response")
401
402        event = InformationalResponseReceived()
403        event.stream_id = self.stream_id
404        return [event]
405
406    def recv_alt_svc(self, previous_state):
407        """
408        Called when receiving an ALTSVC frame.
409
410        RFC 7838 allows us to receive ALTSVC frames at any stream state, which
411        is really absurdly overzealous. For that reason, we want to limit the
412        states in which we can actually receive it. It's really only sensible
413        to receive it after we've sent our own headers and before the server
414        has sent its header block: the server can't guarantee that we have any
415        state around after it completes its header block, and the server
416        doesn't know what origin we're talking about before we've sent ours.
417
418        For that reason, this function applies a few extra checks on both state
419        and some of the little state variables we keep around. If those suggest
420        an unreasonable situation for the ALTSVC frame to have been sent in,
421        we quietly ignore it (as RFC 7838 suggests).
422
423        This function is also *not* always called by the state machine. In some
424        states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it,
425        because we know the frame cannot be valid in that state (IDLE because
426        the server cannot know what origin the stream applies to, CLOSED
427        because the server cannot assume we still have state around,
428        RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL
429        state then *we* are the server).
430        """
431        # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore
432        # them.
433        if self.client is False:
434            return []
435
436        # If we've received the response headers from the server they can't
437        # guarantee we still have any state around. Other implementations
438        # (like nghttp2) ignore ALTSVC in this state, so we will too.
439        if self.headers_received:
440            return []
441
442        # Otherwise, this is a sensible enough frame to have received. Return
443        # the event and let it get populated.
444        return [AlternativeServiceAvailable()]
445
446    def send_alt_svc(self, previous_state):
447        """
448        Called when sending an ALTSVC frame on this stream.
449
450        For consistency with the restrictions we apply on receiving ALTSVC
451        frames in ``recv_alt_svc``, we want to restrict when users can send
452        ALTSVC frames to the situations when we ourselves would accept them.
453
454        That means: when we are a server, when we have received the request
455        headers, and when we have not yet sent our own response headers.
456        """
457        # We should not send ALTSVC after we've sent response headers, as the
458        # client may have disposed of its state.
459        if self.headers_sent:
460            raise ProtocolError(
461                "Cannot send ALTSVC after sending response headers."
462            )
463
464        return
465
466
467# STATE MACHINE
468#
469# The stream state machine is defined here to avoid the need to allocate it
470# repeatedly for each stream. It cannot be defined in the stream class because
471# it needs to be able to reference the callbacks defined on the class, but
472# because Python's scoping rules are weird the class object is not actually in
473# scope during the body of the class object.
474#
475# For the sake of clarity, we reproduce the RFC 7540 state machine here:
476#
477#                          +--------+
478#                  send PP |        | recv PP
479#                 ,--------|  idle  |--------.
480#                /         |        |         \
481#               v          +--------+          v
482#        +----------+          |           +----------+
483#        |          |          | send H /  |          |
484# ,------| reserved |          | recv H    | reserved |------.
485# |      | (local)  |          |           | (remote) |      |
486# |      +----------+          v           +----------+      |
487# |          |             +--------+             |          |
488# |          |     recv ES |        | send ES     |          |
489# |   send H |     ,-------|  open  |-------.     | recv H   |
490# |          |    /        |        |        \    |          |
491# |          v   v         +--------+         v   v          |
492# |      +----------+          |           +----------+      |
493# |      |   half   |          |           |   half   |      |
494# |      |  closed  |          | send R /  |  closed  |      |
495# |      | (remote) |          | recv R    | (local)  |      |
496# |      +----------+          |           +----------+      |
497# |           |                |                 |           |
498# |           | send ES /      |       recv ES / |           |
499# |           | send R /       v        send R / |           |
500# |           | recv R     +--------+   recv R   |           |
501# | send R /  `----------->|        |<-----------'  send R / |
502# | recv R                 | closed |               recv R   |
503# `----------------------->|        |<----------------------'
504#                          +--------+
505#
506#    send:   endpoint sends this frame
507#    recv:   endpoint receives this frame
508#
509#    H:  HEADERS frame (with implied CONTINUATIONs)
510#    PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
511#    ES: END_STREAM flag
512#    R:  RST_STREAM frame
513#
514# For the purposes of this state machine we treat HEADERS and their
515# associated CONTINUATION frames as a single jumbo frame. The protocol
516# allows/requires this by preventing other frames from being interleved in
517# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is
518# received without a prior HEADERS frame, it *will* be passed to this state
519# machine. The state machine should always reject that frame, either as an
520# invalid transition or because the stream is closed.
521#
522# There is a confusing relationship around PUSH_PROMISE frames. The state
523# machine above considers them to be frames belonging to the new stream,
524# which is *somewhat* true. However, they are sent with the stream ID of
525# their related stream, and are only sendable in some cases.
526# For this reason, our state machine implementation below allows for
527# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also
528# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states.
529# Essentially, for hyper-h2, PUSH_PROMISE frames are effectively sent on
530# two streams.
531#
532# The _transitions dictionary contains a mapping of tuples of
533# (state, input) to tuples of (side_effect_function, end_state). This
534# map contains all allowed transitions: anything not in this map is
535# invalid and immediately causes a transition to ``closed``.
536_transitions = {
537    # State: idle
538    (StreamState.IDLE, StreamInputs.SEND_HEADERS):
539        (H2StreamStateMachine.request_sent, StreamState.OPEN),
540    (StreamState.IDLE, StreamInputs.RECV_HEADERS):
541        (H2StreamStateMachine.request_received, StreamState.OPEN),
542    (StreamState.IDLE, StreamInputs.RECV_DATA):
543        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
544    (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE):
545        (H2StreamStateMachine.send_new_pushed_stream,
546            StreamState.RESERVED_LOCAL),
547    (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE):
548        (H2StreamStateMachine.recv_new_pushed_stream,
549            StreamState.RESERVED_REMOTE),
550    (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
551        (None, StreamState.IDLE),
552    (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT):
553        (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL),
554    (StreamState.IDLE, StreamInputs.UPGRADE_SERVER):
555        (H2StreamStateMachine.request_received,
556            StreamState.HALF_CLOSED_REMOTE),
557
558    # State: reserved local
559    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS):
560        (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
561    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA):
562        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
563    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
564        (None, StreamState.RESERVED_LOCAL),
565    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
566        (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL),
567    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM):
568        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
569    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM):
570        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
571    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
572        (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL),
573    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
574        (None, StreamState.RESERVED_LOCAL),
575
576    # State: reserved remote
577    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS):
578        (H2StreamStateMachine.response_received,
579            StreamState.HALF_CLOSED_LOCAL),
580    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA):
581        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
582    (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
583        (None, StreamState.RESERVED_REMOTE),
584    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
585        (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE),
586    (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM):
587        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
588    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM):
589        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
590    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
591        (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE),
592
593    # State: open
594    (StreamState.OPEN, StreamInputs.SEND_HEADERS):
595        (H2StreamStateMachine.response_sent, StreamState.OPEN),
596    (StreamState.OPEN, StreamInputs.RECV_HEADERS):
597        (H2StreamStateMachine.response_received, StreamState.OPEN),
598    (StreamState.OPEN, StreamInputs.SEND_DATA):
599        (None, StreamState.OPEN),
600    (StreamState.OPEN, StreamInputs.RECV_DATA):
601        (H2StreamStateMachine.data_received, StreamState.OPEN),
602    (StreamState.OPEN, StreamInputs.SEND_END_STREAM):
603        (None, StreamState.HALF_CLOSED_LOCAL),
604    (StreamState.OPEN, StreamInputs.RECV_END_STREAM):
605        (H2StreamStateMachine.stream_half_closed,
606         StreamState.HALF_CLOSED_REMOTE),
607    (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE):
608        (None, StreamState.OPEN),
609    (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE):
610        (H2StreamStateMachine.window_updated, StreamState.OPEN),
611    (StreamState.OPEN, StreamInputs.SEND_RST_STREAM):
612        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
613    (StreamState.OPEN, StreamInputs.RECV_RST_STREAM):
614        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
615    (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE):
616        (H2StreamStateMachine.send_push_promise, StreamState.OPEN),
617    (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE):
618        (H2StreamStateMachine.recv_push_promise, StreamState.OPEN),
619    (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS):
620        (H2StreamStateMachine.send_informational_response, StreamState.OPEN),
621    (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS):
622        (H2StreamStateMachine.recv_informational_response, StreamState.OPEN),
623    (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE):
624        (H2StreamStateMachine.send_alt_svc, StreamState.OPEN),
625    (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE):
626        (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN),
627
628    # State: half-closed remote
629    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS):
630        (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
631    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS):
632        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
633    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA):
634        (None, StreamState.HALF_CLOSED_REMOTE),
635    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA):
636        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
637    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM):
638        (H2StreamStateMachine.send_end_stream, StreamState.CLOSED),
639    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
640        (None, StreamState.HALF_CLOSED_REMOTE),
641    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
642        (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE),
643    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM):
644        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
645    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM):
646        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
647    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE):
648        (H2StreamStateMachine.send_push_promise,
649            StreamState.HALF_CLOSED_REMOTE),
650    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE):
651        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
652    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS):
653        (H2StreamStateMachine.send_informational_response,
654            StreamState.HALF_CLOSED_REMOTE),
655    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE):
656        (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE),
657    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
658        (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE),
659
660    # State: half-closed local
661    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS):
662        (H2StreamStateMachine.response_received,
663            StreamState.HALF_CLOSED_LOCAL),
664    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA):
665        (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL),
666    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM):
667        (H2StreamStateMachine.stream_ended, StreamState.CLOSED),
668    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
669        (None, StreamState.HALF_CLOSED_LOCAL),
670    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
671        (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL),
672    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM):
673        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
674    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM):
675        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
676    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE):
677        (H2StreamStateMachine.recv_push_promise,
678            StreamState.HALF_CLOSED_LOCAL),
679    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS):
680        (H2StreamStateMachine.recv_informational_response,
681            StreamState.HALF_CLOSED_LOCAL),
682    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
683        (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL),
684    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
685        (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL),
686
687    # State: closed
688    (StreamState.CLOSED, StreamInputs.RECV_END_STREAM):
689        (None, StreamState.CLOSED),
690    (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE):
691        (None, StreamState.CLOSED),
692
693    # RFC 7540 Section 5.1 defines how the end point should react when
694    # receiving a frame on a closed stream with the following statements:
695    #
696    # > An endpoint that receives any frame other than PRIORITY after receiving
697    # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED.
698    # > An endpoint that receives any frames after receiving a frame with the
699    # > END_STREAM flag set MUST treat that as a connection error of type
700    # > STREAM_CLOSED.
701    (StreamState.CLOSED, StreamInputs.RECV_HEADERS):
702        (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
703    (StreamState.CLOSED, StreamInputs.RECV_DATA):
704        (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
705
706    # > WINDOW_UPDATE or RST_STREAM frames can be received in this state
707    # > for a short period after a DATA or HEADERS frame containing a
708    # > END_STREAM flag is sent, as instructed in RFC 7540 Section 5.1. But we
709    # > don't have access to a clock so we just always allow it.
710    (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE):
711        (None, StreamState.CLOSED),
712    (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM):
713        (None, StreamState.CLOSED),
714
715    # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is
716    # > neither "open" nor "half-closed (local)" as a connection error of type
717    # > PROTOCOL_ERROR.
718    (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE):
719        (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED),
720
721    # Also, users should be forbidden from sending on closed streams.
722    (StreamState.CLOSED, StreamInputs.SEND_HEADERS):
723        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
724    (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE):
725        (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED),
726    (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM):
727        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
728    (StreamState.CLOSED, StreamInputs.SEND_DATA):
729        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
730    (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE):
731        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
732    (StreamState.CLOSED, StreamInputs.SEND_END_STREAM):
733        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
734}
735
736
737class H2Stream(object):
738    """
739    A low-level HTTP/2 stream object. This handles building and receiving
740    frames and maintains per-stream state.
741
742    This wraps a HTTP/2 Stream state machine implementation, ensuring that
743    frames can only be sent/received when the stream is in a valid state.
744    Attempts to create frames that cannot be sent will raise a
745    ``ProtocolError``.
746    """
747    def __init__(self,
748                 stream_id,
749                 config,
750                 inbound_window_size,
751                 outbound_window_size):
752        self.state_machine = H2StreamStateMachine(stream_id)
753        self.stream_id = stream_id
754        self.max_outbound_frame_size = None
755        self.request_method = None
756
757        # The current value of the outbound stream flow control window
758        self.outbound_flow_control_window = outbound_window_size
759
760        # The flow control manager.
761        self._inbound_window_manager = WindowManager(inbound_window_size)
762
763        # The expected content length, if any.
764        self._expected_content_length = None
765
766        # The actual received content length. Always tracked.
767        self._actual_content_length = 0
768
769        # The authority we believe this stream belongs to.
770        self._authority = None
771
772        # The configuration for this stream.
773        self.config = config
774
775    def __repr__(self):
776        return "<%s id:%d state:%r>" % (
777            type(self).__name__,
778            self.stream_id,
779            self.state_machine.state
780        )
781
782    @property
783    def inbound_flow_control_window(self):
784        """
785        The size of the inbound flow control window for the stream. This is
786        rarely publicly useful: instead, use :meth:`remote_flow_control_window
787        <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is
788        largely present to provide a shortcut to this data.
789        """
790        return self._inbound_window_manager.current_window_size
791
792    @property
793    def open(self):
794        """
795        Whether the stream is 'open' in any sense: that is, whether it counts
796        against the number of concurrent streams.
797        """
798        # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either
799        # the OPEN state or either of the HALF_CLOSED states. Perplexingly,
800        # this excludes the reserved states.
801        # For more detail on why we're doing this in this slightly weird way,
802        # see the comment on ``STREAM_OPEN`` at the top of the file.
803        return STREAM_OPEN[self.state_machine.state]
804
805    @property
806    def closed(self):
807        """
808        Whether the stream is closed.
809        """
810        return self.state_machine.state == StreamState.CLOSED
811
812    @property
813    def closed_by(self):
814        """
815        Returns how the stream was closed, as one of StreamClosedBy.
816        """
817        return self.state_machine.stream_closed_by
818
819    def upgrade(self, client_side):
820        """
821        Called by the connection to indicate that this stream is the initial
822        request/response of an upgraded connection. Places the stream into an
823        appropriate state.
824        """
825        self.config.logger.debug("Upgrading %r", self)
826
827        assert self.stream_id == 1
828        input_ = (
829            StreamInputs.UPGRADE_CLIENT if client_side
830            else StreamInputs.UPGRADE_SERVER
831        )
832
833        # This may return events, we deliberately don't want them.
834        self.state_machine.process_input(input_)
835        return
836
837    def send_headers(self, headers, encoder, end_stream=False):
838        """
839        Returns a list of HEADERS/CONTINUATION frames to emit as either headers
840        or trailers.
841        """
842        self.config.logger.debug("Send headers %s on %r", headers, self)
843
844        # Because encoding headers makes an irreversible change to the header
845        # compression context, we make the state transition before we encode
846        # them.
847
848        # First, check if we're a client. If we are, no problem: if we aren't,
849        # we need to scan the header block to see if this is an informational
850        # response.
851        input_ = StreamInputs.SEND_HEADERS
852        if ((not self.state_machine.client) and
853                is_informational_response(headers)):
854            if end_stream:
855                raise ProtocolError(
856                    "Cannot set END_STREAM on informational responses."
857                )
858
859            input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS
860
861        events = self.state_machine.process_input(input_)
862
863        hf = HeadersFrame(self.stream_id)
864        hdr_validation_flags = self._build_hdr_validation_flags(events)
865        frames = self._build_headers_frames(
866            headers, encoder, hf, hdr_validation_flags
867        )
868
869        if end_stream:
870            # Not a bug: the END_STREAM flag is valid on the initial HEADERS
871            # frame, not the CONTINUATION frames that follow.
872            self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
873            frames[0].flags.add('END_STREAM')
874
875        if self.state_machine.trailers_sent and not end_stream:
876            raise ProtocolError("Trailers must have END_STREAM set.")
877
878        if self.state_machine.client and self._authority is None:
879            self._authority = authority_from_headers(headers)
880
881        # store request method for _initialize_content_length
882        self.request_method = extract_method_header(headers)
883
884        return frames
885
886    def push_stream_in_band(self, related_stream_id, headers, encoder):
887        """
888        Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed
889        stream header. Called on the stream that has the PUSH_PROMISE frame
890        sent on it.
891        """
892        self.config.logger.debug("Push stream %r", self)
893
894        # Because encoding headers makes an irreversible change to the header
895        # compression context, we make the state transition *first*.
896
897        events = self.state_machine.process_input(
898            StreamInputs.SEND_PUSH_PROMISE
899        )
900
901        ppf = PushPromiseFrame(self.stream_id)
902        ppf.promised_stream_id = related_stream_id
903        hdr_validation_flags = self._build_hdr_validation_flags(events)
904        frames = self._build_headers_frames(
905            headers, encoder, ppf, hdr_validation_flags
906        )
907
908        return frames
909
910    def locally_pushed(self):
911        """
912        Mark this stream as one that was pushed by this peer. Must be called
913        immediately after initialization. Sends no frames, simply updates the
914        state machine.
915        """
916        # This does not trigger any events.
917        events = self.state_machine.process_input(
918            StreamInputs.SEND_PUSH_PROMISE
919        )
920        assert not events
921        return []
922
923    def send_data(self, data, end_stream=False, pad_length=None):
924        """
925        Prepare some data frames. Optionally end the stream.
926
927        .. warning:: Does not perform flow control checks.
928        """
929        self.config.logger.debug(
930            "Send data on %r with end stream set to %s", self, end_stream
931        )
932
933        self.state_machine.process_input(StreamInputs.SEND_DATA)
934
935        df = DataFrame(self.stream_id)
936        df.data = data
937        if end_stream:
938            self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
939            df.flags.add('END_STREAM')
940        if pad_length is not None:
941            df.flags.add('PADDED')
942            df.pad_length = pad_length
943
944        # Subtract flow_controlled_length to account for possible padding
945        self.outbound_flow_control_window -= df.flow_controlled_length
946        assert self.outbound_flow_control_window >= 0
947
948        return [df]
949
950    def end_stream(self):
951        """
952        End a stream without sending data.
953        """
954        self.config.logger.debug("End stream %r", self)
955
956        self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
957        df = DataFrame(self.stream_id)
958        df.flags.add('END_STREAM')
959        return [df]
960
961    def advertise_alternative_service(self, field_value):
962        """
963        Advertise an RFC 7838 alternative service. The semantics of this are
964        better documented in the ``H2Connection`` class.
965        """
966        self.config.logger.debug(
967            "Advertise alternative service of %r for %r", field_value, self
968        )
969        self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE)
970        asf = AltSvcFrame(self.stream_id)
971        asf.field = field_value
972        return [asf]
973
974    def increase_flow_control_window(self, increment):
975        """
976        Increase the size of the flow control window for the remote side.
977        """
978        self.config.logger.debug(
979            "Increase flow control window for %r by %d",
980            self, increment
981        )
982        self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE)
983        self._inbound_window_manager.window_opened(increment)
984
985        wuf = WindowUpdateFrame(self.stream_id)
986        wuf.window_increment = increment
987        return [wuf]
988
989    def receive_push_promise_in_band(self,
990                                     promised_stream_id,
991                                     headers,
992                                     header_encoding):
993        """
994        Receives a push promise frame sent on this stream, pushing a remote
995        stream. This is called on the stream that has the PUSH_PROMISE sent
996        on it.
997        """
998        self.config.logger.debug(
999            "Receive Push Promise on %r for remote stream %d",
1000            self, promised_stream_id
1001        )
1002        events = self.state_machine.process_input(
1003            StreamInputs.RECV_PUSH_PROMISE
1004        )
1005        events[0].pushed_stream_id = promised_stream_id
1006
1007        hdr_validation_flags = self._build_hdr_validation_flags(events)
1008        events[0].headers = self._process_received_headers(
1009            headers, hdr_validation_flags, header_encoding
1010        )
1011        return [], events
1012
1013    def remotely_pushed(self, pushed_headers):
1014        """
1015        Mark this stream as one that was pushed by the remote peer. Must be
1016        called immediately after initialization. Sends no frames, simply
1017        updates the state machine.
1018        """
1019        self.config.logger.debug("%r pushed by remote peer", self)
1020        events = self.state_machine.process_input(
1021            StreamInputs.RECV_PUSH_PROMISE
1022        )
1023        self._authority = authority_from_headers(pushed_headers)
1024        return [], events
1025
1026    def receive_headers(self, headers, end_stream, header_encoding):
1027        """
1028        Receive a set of headers (or trailers).
1029        """
1030        if is_informational_response(headers):
1031            if end_stream:
1032                raise ProtocolError(
1033                    "Cannot set END_STREAM on informational responses"
1034                )
1035            input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS
1036        else:
1037            input_ = StreamInputs.RECV_HEADERS
1038
1039        events = self.state_machine.process_input(input_)
1040
1041        if end_stream:
1042            es_events = self.state_machine.process_input(
1043                StreamInputs.RECV_END_STREAM
1044            )
1045            events[0].stream_ended = es_events[0]
1046            events += es_events
1047
1048        self._initialize_content_length(headers)
1049
1050        if isinstance(events[0], TrailersReceived):
1051            if not end_stream:
1052                raise ProtocolError("Trailers must have END_STREAM set")
1053
1054        hdr_validation_flags = self._build_hdr_validation_flags(events)
1055        events[0].headers = self._process_received_headers(
1056            headers, hdr_validation_flags, header_encoding
1057        )
1058        return [], events
1059
1060    def receive_data(self, data, end_stream, flow_control_len):
1061        """
1062        Receive some data.
1063        """
1064        self.config.logger.debug(
1065            "Receive data on %r with end stream %s and flow control length "
1066            "set to %d", self, end_stream, flow_control_len
1067        )
1068        events = self.state_machine.process_input(StreamInputs.RECV_DATA)
1069        self._inbound_window_manager.window_consumed(flow_control_len)
1070        self._track_content_length(len(data), end_stream)
1071
1072        if end_stream:
1073            es_events = self.state_machine.process_input(
1074                StreamInputs.RECV_END_STREAM
1075            )
1076            events[0].stream_ended = es_events[0]
1077            events.extend(es_events)
1078
1079        events[0].data = data
1080        events[0].flow_controlled_length = flow_control_len
1081        return [], events
1082
1083    def receive_window_update(self, increment):
1084        """
1085        Handle a WINDOW_UPDATE increment.
1086        """
1087        self.config.logger.debug(
1088            "Receive Window Update on %r for increment of %d",
1089            self, increment
1090        )
1091        events = self.state_machine.process_input(
1092            StreamInputs.RECV_WINDOW_UPDATE
1093        )
1094        frames = []
1095
1096        # If we encounter a problem with incrementing the flow control window,
1097        # this should be treated as a *stream* error, not a *connection* error.
1098        # That means we need to catch the error and forcibly close the stream.
1099        if events:
1100            events[0].delta = increment
1101            try:
1102                self.outbound_flow_control_window = guard_increment_window(
1103                    self.outbound_flow_control_window,
1104                    increment
1105                )
1106            except FlowControlError:
1107                # Ok, this is bad. We're going to need to perform a local
1108                # reset.
1109                event = StreamReset()
1110                event.stream_id = self.stream_id
1111                event.error_code = ErrorCodes.FLOW_CONTROL_ERROR
1112                event.remote_reset = False
1113
1114                events = [event]
1115                frames = self.reset_stream(event.error_code)
1116
1117        return frames, events
1118
1119    def receive_continuation(self):
1120        """
1121        A naked CONTINUATION frame has been received. This is always an error,
1122        but the type of error it is depends on the state of the stream and must
1123        transition the state of the stream, so we need to handle it.
1124        """
1125        self.config.logger.debug("Receive Continuation frame on %r", self)
1126        self.state_machine.process_input(
1127            StreamInputs.RECV_CONTINUATION
1128        )
1129        assert False, "Should not be reachable"
1130
1131    def receive_alt_svc(self, frame):
1132        """
1133        An Alternative Service frame was received on the stream. This frame
1134        inherits the origin associated with this stream.
1135        """
1136        self.config.logger.debug(
1137            "Receive Alternative Service frame on stream %r", self
1138        )
1139
1140        # If the origin is present, RFC 7838 says we have to ignore it.
1141        if frame.origin:
1142            return [], []
1143
1144        events = self.state_machine.process_input(
1145            StreamInputs.RECV_ALTERNATIVE_SERVICE
1146        )
1147
1148        # There are lots of situations where we want to ignore the ALTSVC
1149        # frame. If we need to pay attention, we'll have an event and should
1150        # fill it out.
1151        if events:
1152            assert isinstance(events[0], AlternativeServiceAvailable)
1153            events[0].origin = self._authority
1154            events[0].field_value = frame.field
1155
1156        return [], events
1157
1158    def reset_stream(self, error_code=0):
1159        """
1160        Close the stream locally. Reset the stream with an error code.
1161        """
1162        self.config.logger.debug(
1163            "Local reset %r with error code: %d", self, error_code
1164        )
1165        self.state_machine.process_input(StreamInputs.SEND_RST_STREAM)
1166
1167        rsf = RstStreamFrame(self.stream_id)
1168        rsf.error_code = error_code
1169        return [rsf]
1170
1171    def stream_reset(self, frame):
1172        """
1173        Handle a stream being reset remotely.
1174        """
1175        self.config.logger.debug(
1176            "Remote reset %r with error code: %d", self, frame.error_code
1177        )
1178        events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM)
1179
1180        if events:
1181            # We don't fire an event if this stream is already closed.
1182            events[0].error_code = _error_code_from_int(frame.error_code)
1183
1184        return [], events
1185
1186    def acknowledge_received_data(self, acknowledged_size):
1187        """
1188        The user has informed us that they've processed some amount of data
1189        that was received on this stream. Pass that to the window manager and
1190        potentially return some WindowUpdate frames.
1191        """
1192        self.config.logger.debug(
1193            "Acknowledge received data with size %d on %r",
1194            acknowledged_size, self
1195        )
1196        increment = self._inbound_window_manager.process_bytes(
1197            acknowledged_size
1198        )
1199        if increment:
1200            f = WindowUpdateFrame(self.stream_id)
1201            f.window_increment = increment
1202            return [f]
1203
1204        return []
1205
1206    def _build_hdr_validation_flags(self, events):
1207        """
1208        Constructs a set of header validation flags for use when normalizing
1209        and validating header blocks.
1210        """
1211        is_trailer = isinstance(
1212            events[0], (_TrailersSent, TrailersReceived)
1213        )
1214        is_response_header = isinstance(
1215            events[0],
1216            (
1217                _ResponseSent,
1218                ResponseReceived,
1219                InformationalResponseReceived
1220            )
1221        )
1222        is_push_promise = isinstance(
1223            events[0], (PushedStreamReceived, _PushedRequestSent)
1224        )
1225
1226        return HeaderValidationFlags(
1227            is_client=self.state_machine.client,
1228            is_trailer=is_trailer,
1229            is_response_header=is_response_header,
1230            is_push_promise=is_push_promise,
1231        )
1232
1233    def _build_headers_frames(self,
1234                              headers,
1235                              encoder,
1236                              first_frame,
1237                              hdr_validation_flags):
1238        """
1239        Helper method to build headers or push promise frames.
1240        """
1241        # We need to lowercase the header names, and to ensure that secure
1242        # header fields are kept out of compression contexts.
1243        if self.config.normalize_outbound_headers:
1244            headers = normalize_outbound_headers(
1245                headers, hdr_validation_flags
1246            )
1247        if self.config.validate_outbound_headers:
1248            headers = validate_outbound_headers(
1249                headers, hdr_validation_flags
1250            )
1251
1252        encoded_headers = encoder.encode(headers)
1253
1254        # Slice into blocks of max_outbound_frame_size. Be careful with this:
1255        # it only works right because we never send padded frames or priority
1256        # information on the frames. Revisit this if we do.
1257        header_blocks = [
1258            encoded_headers[i:i+self.max_outbound_frame_size]
1259            for i in range(
1260                0, len(encoded_headers), self.max_outbound_frame_size
1261            )
1262        ]
1263
1264        frames = []
1265        first_frame.data = header_blocks[0]
1266        frames.append(first_frame)
1267
1268        for block in header_blocks[1:]:
1269            cf = ContinuationFrame(self.stream_id)
1270            cf.data = block
1271            frames.append(cf)
1272
1273        frames[-1].flags.add('END_HEADERS')
1274        return frames
1275
1276    def _process_received_headers(self,
1277                                  headers,
1278                                  header_validation_flags,
1279                                  header_encoding):
1280        """
1281        When headers have been received from the remote peer, run a processing
1282        pipeline on them to transform them into the appropriate form for
1283        attaching to an event.
1284        """
1285        if self.config.normalize_inbound_headers:
1286            headers = normalize_inbound_headers(
1287                headers, header_validation_flags
1288            )
1289
1290        if self.config.validate_inbound_headers:
1291            headers = validate_headers(headers, header_validation_flags)
1292
1293        if header_encoding:
1294            headers = _decode_headers(headers, header_encoding)
1295
1296        # The above steps are all generators, so we need to concretize the
1297        # headers now.
1298        return list(headers)
1299
1300    def _initialize_content_length(self, headers):
1301        """
1302        Checks the headers for a content-length header and initializes the
1303        _expected_content_length field from it. It's not an error for no
1304        Content-Length header to be present.
1305        """
1306        if self.request_method == b'HEAD':
1307            self._expected_content_length = 0
1308            return
1309
1310        for n, v in headers:
1311            if n == b'content-length':
1312                try:
1313                    self._expected_content_length = int(v, 10)
1314                except ValueError:
1315                    raise ProtocolError(
1316                        "Invalid content-length header: %s" % v
1317                    )
1318
1319                return
1320
1321    def _track_content_length(self, length, end_stream):
1322        """
1323        Update the expected content length in response to data being received.
1324        Validates that the appropriate amount of data is sent. Always updates
1325        the received data, but only validates the length against the
1326        content-length header if one was sent.
1327
1328        :param length: The length of the body chunk received.
1329        :param end_stream: If this is the last body chunk received.
1330        """
1331        self._actual_content_length += length
1332        actual = self._actual_content_length
1333        expected = self._expected_content_length
1334
1335        if expected is not None:
1336            if expected < actual:
1337                raise InvalidBodyLengthError(expected, actual)
1338
1339            if end_stream and expected != actual:
1340                raise InvalidBodyLengthError(expected, actual)
1341
1342    def _inbound_flow_control_change_from_settings(self, delta):
1343        """
1344        We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to
1345        update the target window size for flow control. For our flow control
1346        strategy, this means we need to do two things: we need to adjust the
1347        current window size, but we also need to set the target maximum window
1348        size to the new value.
1349        """
1350        new_max_size = self._inbound_window_manager.max_window_size + delta
1351        self._inbound_window_manager.window_opened(delta)
1352        self._inbound_window_manager.max_window_size = new_max_size
1353
1354
1355def _decode_headers(headers, encoding):
1356    """
1357    Given an iterable of header two-tuples and an encoding, decodes those
1358    headers using that encoding while preserving the type of the header tuple.
1359    This ensures that the use of ``HeaderTuple`` is preserved.
1360    """
1361    for header in headers:
1362        # This function expects to work on decoded headers, which are always
1363        # HeaderTuple objects.
1364        assert isinstance(header, HeaderTuple)
1365
1366        name, value = header
1367        name = name.decode(encoding)
1368        value = value.decode(encoding)
1369        yield header.__class__(name, value)
1370