1# -*- coding: utf-8 -*-
2"""
3h2/stream
4~~~~~~~~~
5
6An implementation of a HTTP/2 stream.
7"""
8import warnings
9
10from enum import Enum, IntEnum
11from hpack import HeaderTuple
12from hyperframe.frame import (
13    HeadersFrame, ContinuationFrame, DataFrame, WindowUpdateFrame,
14    RstStreamFrame, PushPromiseFrame, AltSvcFrame
15)
16
17from .errors import ErrorCodes, _error_code_from_int
18from .events import (
19    RequestReceived, ResponseReceived, DataReceived, WindowUpdated,
20    StreamEnded, PushedStreamReceived, StreamReset, TrailersReceived,
21    InformationalResponseReceived, AlternativeServiceAvailable,
22    _ResponseSent, _RequestSent, _TrailersSent, _PushedRequestSent
23)
24from .exceptions import (
25    ProtocolError, StreamClosedError, InvalidBodyLengthError, FlowControlError
26)
27from .utilities import (
28    guard_increment_window, is_informational_response, authority_from_headers,
29    validate_headers, validate_outbound_headers, normalize_outbound_headers,
30    HeaderValidationFlags, extract_method_header
31)
32from .windows import WindowManager
33
34
35class StreamState(IntEnum):
36    IDLE = 0
37    RESERVED_REMOTE = 1
38    RESERVED_LOCAL = 2
39    OPEN = 3
40    HALF_CLOSED_REMOTE = 4
41    HALF_CLOSED_LOCAL = 5
42    CLOSED = 6
43
44
45class StreamInputs(Enum):
46    SEND_HEADERS = 0
47    SEND_PUSH_PROMISE = 1
48    SEND_RST_STREAM = 2
49    SEND_DATA = 3
50    SEND_WINDOW_UPDATE = 4
51    SEND_END_STREAM = 5
52    RECV_HEADERS = 6
53    RECV_PUSH_PROMISE = 7
54    RECV_RST_STREAM = 8
55    RECV_DATA = 9
56    RECV_WINDOW_UPDATE = 10
57    RECV_END_STREAM = 11
58    RECV_CONTINUATION = 12  # Added in 2.0.0
59    SEND_INFORMATIONAL_HEADERS = 13  # Added in 2.2.0
60    RECV_INFORMATIONAL_HEADERS = 14  # Added in 2.2.0
61    SEND_ALTERNATIVE_SERVICE = 15  # Added in 2.3.0
62    RECV_ALTERNATIVE_SERVICE = 16  # Added in 2.3.0
63    UPGRADE_CLIENT = 17  # Added 2.3.0
64    UPGRADE_SERVER = 18  # Added 2.3.0
65
66
67class StreamClosedBy(Enum):
68    SEND_END_STREAM = 0
69    RECV_END_STREAM = 1
70    SEND_RST_STREAM = 2
71    RECV_RST_STREAM = 3
72
73
74# This array is initialized once, and is indexed by the stream states above.
75# It indicates whether a stream in the given state is open. The reason we do
76# this is that we potentially check whether a stream in a given state is open
77# quite frequently: given that we check so often, we should do so in the
78# fastest and most performant way possible.
79STREAM_OPEN = [False for _ in range(0, len(StreamState))]
80STREAM_OPEN[StreamState.OPEN] = True
81STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True
82STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True
83
84
85class H2StreamStateMachine(object):
86    """
87    A single HTTP/2 stream state machine.
88
89    This stream object implements basically the state machine described in
90    RFC 7540 section 5.1.
91
92    :param stream_id: The stream ID of this stream. This is stored primarily
93        for logging purposes.
94    """
95    def __init__(self, stream_id):
96        self.state = StreamState.IDLE
97        self.stream_id = stream_id
98
99        #: Whether this peer is the client side of this stream.
100        self.client = None
101
102        # Whether trailers have been sent/received on this stream or not.
103        self.headers_sent = None
104        self.trailers_sent = None
105        self.headers_received = None
106        self.trailers_received = None
107
108        # How the stream was closed. One of StreamClosedBy.
109        self.stream_closed_by = None
110
111    def process_input(self, input_):
112        """
113        Process a specific input in the state machine.
114        """
115        if not isinstance(input_, StreamInputs):
116            raise ValueError("Input must be an instance of StreamInputs")
117
118        try:
119            func, target_state = _transitions[(self.state, input_)]
120        except KeyError:
121            old_state = self.state
122            self.state = StreamState.CLOSED
123            raise ProtocolError(
124                "Invalid input %s in state %s" % (input_, old_state)
125            )
126        else:
127            previous_state = self.state
128            self.state = target_state
129            if func is not None:
130                try:
131                    return func(self, previous_state)
132                except ProtocolError:
133                    self.state = StreamState.CLOSED
134                    raise
135                except AssertionError as e:  # pragma: no cover
136                    self.state = StreamState.CLOSED
137                    raise ProtocolError(e)
138
139            return []
140
141    def request_sent(self, previous_state):
142        """
143        Fires when a request is sent.
144        """
145        self.client = True
146        self.headers_sent = True
147        event = _RequestSent()
148
149        return [event]
150
151    def response_sent(self, previous_state):
152        """
153        Fires when something that should be a response is sent. This 'response'
154        may actually be trailers.
155        """
156        if not self.headers_sent:
157            if self.client is True or self.client is None:
158                raise ProtocolError("Client cannot send responses.")
159            self.headers_sent = True
160            event = _ResponseSent()
161        else:
162            assert not self.trailers_sent
163            self.trailers_sent = True
164            event = _TrailersSent()
165
166        return [event]
167
168    def request_received(self, previous_state):
169        """
170        Fires when a request is received.
171        """
172        assert not self.headers_received
173        assert not self.trailers_received
174
175        self.client = False
176        self.headers_received = True
177        event = RequestReceived()
178
179        event.stream_id = self.stream_id
180        return [event]
181
182    def response_received(self, previous_state):
183        """
184        Fires when a response is received. Also disambiguates between responses
185        and trailers.
186        """
187        if not self.headers_received:
188            assert self.client is True
189            self.headers_received = True
190            event = ResponseReceived()
191        else:
192            assert not self.trailers_received
193            self.trailers_received = True
194            event = TrailersReceived()
195
196        event.stream_id = self.stream_id
197        return [event]
198
199    def data_received(self, previous_state):
200        """
201        Fires when data is received.
202        """
203        event = DataReceived()
204        event.stream_id = self.stream_id
205        return [event]
206
207    def window_updated(self, previous_state):
208        """
209        Fires when a window update frame is received.
210        """
211        event = WindowUpdated()
212        event.stream_id = self.stream_id
213        return [event]
214
215    def stream_half_closed(self, previous_state):
216        """
217        Fires when an END_STREAM flag is received in the OPEN state,
218        transitioning this stream to a HALF_CLOSED_REMOTE state.
219        """
220        event = StreamEnded()
221        event.stream_id = self.stream_id
222        return [event]
223
224    def stream_ended(self, previous_state):
225        """
226        Fires when a stream is cleanly ended.
227        """
228        self.stream_closed_by = StreamClosedBy.RECV_END_STREAM
229        event = StreamEnded()
230        event.stream_id = self.stream_id
231        return [event]
232
233    def stream_reset(self, previous_state):
234        """
235        Fired when a stream is forcefully reset.
236        """
237        self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM
238        event = StreamReset()
239        event.stream_id = self.stream_id
240        return [event]
241
242    def send_new_pushed_stream(self, previous_state):
243        """
244        Fires on the newly pushed stream, when pushed by the local peer.
245
246        No event here, but definitionally this peer must be a server.
247        """
248        assert self.client is None
249        self.client = False
250        self.headers_received = True
251        return []
252
253    def recv_new_pushed_stream(self, previous_state):
254        """
255        Fires on the newly pushed stream, when pushed by the remote peer.
256
257        No event here, but definitionally this peer must be a client.
258        """
259        assert self.client is None
260        self.client = True
261        self.headers_sent = True
262        return []
263
264    def send_push_promise(self, previous_state):
265        """
266        Fires on the already-existing stream when a PUSH_PROMISE frame is sent.
267        We may only send PUSH_PROMISE frames if we're a server.
268        """
269        if self.client is True:
270            raise ProtocolError("Cannot push streams from client peers.")
271
272        event = _PushedRequestSent()
273        return [event]
274
275    def recv_push_promise(self, previous_state):
276        """
277        Fires on the already-existing stream when a PUSH_PROMISE frame is
278        received. We may only receive PUSH_PROMISE frames if we're a client.
279
280        Fires a PushedStreamReceived event.
281        """
282        if not self.client:
283            if self.client is None:  # pragma: no cover
284                msg = "Idle streams cannot receive pushes"
285            else:  # pragma: no cover
286                msg = "Cannot receive pushed streams as a server"
287            raise ProtocolError(msg)
288
289        event = PushedStreamReceived()
290        event.parent_stream_id = self.stream_id
291        return [event]
292
293    def send_end_stream(self, previous_state):
294        """
295        Called when an attempt is made to send END_STREAM in the
296        HALF_CLOSED_REMOTE state.
297        """
298        self.stream_closed_by = StreamClosedBy.SEND_END_STREAM
299
300    def send_reset_stream(self, previous_state):
301        """
302        Called when an attempt is made to send RST_STREAM in a non-closed
303        stream state.
304        """
305        self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
306
307    def reset_stream_on_error(self, previous_state):
308        """
309        Called when we need to forcefully emit another RST_STREAM frame on
310        behalf of the state machine.
311
312        If this is the first time we've done this, we should also hang an event
313        off the StreamClosedError so that the user can be informed. We know
314        it's the first time we've done this if the stream is currently in a
315        state other than CLOSED.
316        """
317        self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
318
319        error = StreamClosedError(self.stream_id)
320
321        event = StreamReset()
322        event.stream_id = self.stream_id
323        event.error_code = ErrorCodes.STREAM_CLOSED
324        event.remote_reset = False
325        error._events = [event]
326        raise error
327
328    def recv_on_closed_stream(self, previous_state):
329        """
330        Called when an unexpected frame is received on an already-closed
331        stream.
332
333        An endpoint that receives an unexpected frame should treat it as
334        a stream error or connection error with type STREAM_CLOSED, depending
335        on the specific frame. The error handling is done at a higher level:
336        this just raises the appropriate error.
337        """
338        raise StreamClosedError(self.stream_id)
339
340    def send_on_closed_stream(self, previous_state):
341        """
342        Called when an attempt is made to send data on an already-closed
343        stream.
344
345        This essentially overrides the standard logic by throwing a
346        more-specific error: StreamClosedError. This is a ProtocolError, so it
347        matches the standard API of the state machine, but provides more detail
348        to the user.
349        """
350        raise StreamClosedError(self.stream_id)
351
352    def recv_push_on_closed_stream(self, previous_state):
353        """
354        Called when a PUSH_PROMISE frame is received on a full stop
355        stream.
356
357        If the stream was closed by us sending a RST_STREAM frame, then we
358        presume that the PUSH_PROMISE was in flight when we reset the parent
359        stream. Rathen than accept the new stream, we just reset it.
360        Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a
361        naturally closed stream is a real problem because it creates a brand
362        new stream that the remote peer now believes exists.
363        """
364        assert self.stream_closed_by is not None
365
366        if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM:
367            raise StreamClosedError(self.stream_id)
368        else:
369            raise ProtocolError("Attempted to push on closed stream.")
370
371    def send_push_on_closed_stream(self, previous_state):
372        """
373        Called when an attempt is made to push on an already-closed stream.
374
375        This essentially overrides the standard logic by providing a more
376        useful error message. It's necessary because simply indicating that the
377        stream is closed is not enough: there is now a new stream that is not
378        allowed to be there. The only recourse is to tear the whole connection
379        down.
380        """
381        raise ProtocolError("Attempted to push on closed stream.")
382
383    def window_on_closed_stream(self, previous_state):
384        """
385        Called when a WINDOW_UPDATE frame is received on an already-closed
386        stream.
387
388        If we sent an END_STREAM frame, we just ignore the frame, as instructed
389        in RFC 7540 Section 5.1. Technically we should eventually consider
390        WINDOW_UPDATE in this state an error, but we don't have access to a
391        clock so we just always allow it. If we closed the stream for any other
392        reason, we behave as we do for receiving any other frame on a closed
393        stream.
394        """
395        assert self.stream_closed_by is not None
396
397        if self.stream_closed_by == StreamClosedBy.SEND_END_STREAM:
398            return []
399        return self.recv_on_closed_stream(previous_state)
400
401    def reset_on_closed_stream(self, previous_state):
402        """
403        Called when a RST_STREAM frame is received on an already-closed stream.
404
405        If we sent an END_STREAM frame, we just ignore the frame, as instructed
406        in RFC 7540 Section 5.1. Technically we should eventually consider
407        RST_STREAM in this state an error, but we don't have access to a clock
408        so we just always allow it. If we closed the stream for any other
409        reason, we behave as we do for receiving any other frame on a closed
410        stream.
411        """
412        assert self.stream_closed_by is not None
413
414        if self.stream_closed_by is StreamClosedBy.SEND_END_STREAM:
415            return []
416        return self.recv_on_closed_stream(previous_state)
417
418    def send_informational_response(self, previous_state):
419        """
420        Called when an informational header block is sent (that is, a block
421        where the :status header has a 1XX value).
422
423        Only enforces that these are sent *before* final headers are sent.
424        """
425        if self.headers_sent:
426            raise ProtocolError("Information response after final response")
427
428        event = _ResponseSent()
429        return [event]
430
431    def recv_informational_response(self, previous_state):
432        """
433        Called when an informational header block is received (that is, a block
434        where the :status header has a 1XX value).
435        """
436        if self.headers_received:
437            raise ProtocolError("Informational response after final response")
438
439        event = InformationalResponseReceived()
440        event.stream_id = self.stream_id
441        return [event]
442
443    def recv_alt_svc(self, previous_state):
444        """
445        Called when receiving an ALTSVC frame.
446
447        RFC 7838 allows us to receive ALTSVC frames at any stream state, which
448        is really absurdly overzealous. For that reason, we want to limit the
449        states in which we can actually receive it. It's really only sensible
450        to receive it after we've sent our own headers and before the server
451        has sent its header block: the server can't guarantee that we have any
452        state around after it completes its header block, and the server
453        doesn't know what origin we're talking about before we've sent ours.
454
455        For that reason, this function applies a few extra checks on both state
456        and some of the little state variables we keep around. If those suggest
457        an unreasonable situation for the ALTSVC frame to have been sent in,
458        we quietly ignore it (as RFC 7838 suggests).
459
460        This function is also *not* always called by the state machine. In some
461        states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it,
462        because we know the frame cannot be valid in that state (IDLE because
463        the server cannot know what origin the stream applies to, CLOSED
464        because the server cannot assume we still have state around,
465        RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL
466        state then *we* are the server).
467        """
468        # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore
469        # them.
470        if self.client is False:
471            return []
472
473        # If we've received the response headers from the server they can't
474        # guarantee we still have any state around. Other implementations
475        # (like nghttp2) ignore ALTSVC in this state, so we will too.
476        if self.headers_received:
477            return []
478
479        # Otherwise, this is a sensible enough frame to have received. Return
480        # the event and let it get populated.
481        return [AlternativeServiceAvailable()]
482
483    def send_alt_svc(self, previous_state):
484        """
485        Called when sending an ALTSVC frame on this stream.
486
487        For consistency with the restrictions we apply on receiving ALTSVC
488        frames in ``recv_alt_svc``, we want to restrict when users can send
489        ALTSVC frames to the situations when we ourselves would accept them.
490
491        That means: when we are a server, when we have received the request
492        headers, and when we have not yet sent our own response headers.
493        """
494        # We should not send ALTSVC after we've sent response headers, as the
495        # client may have disposed of its state.
496        if self.headers_sent:
497            raise ProtocolError(
498                "Cannot send ALTSVC after sending response headers."
499            )
500
501        return
502
503
504# STATE MACHINE
505#
506# The stream state machine is defined here to avoid the need to allocate it
507# repeatedly for each stream. It cannot be defined in the stream class because
508# it needs to be able to reference the callbacks defined on the class, but
509# because Python's scoping rules are weird the class object is not actually in
510# scope during the body of the class object.
511#
512# For the sake of clarity, we reproduce the RFC 7540 state machine here:
513#
514#                          +--------+
515#                  send PP |        | recv PP
516#                 ,--------|  idle  |--------.
517#                /         |        |         \
518#               v          +--------+          v
519#        +----------+          |           +----------+
520#        |          |          | send H /  |          |
521# ,------| reserved |          | recv H    | reserved |------.
522# |      | (local)  |          |           | (remote) |      |
523# |      +----------+          v           +----------+      |
524# |          |             +--------+             |          |
525# |          |     recv ES |        | send ES     |          |
526# |   send H |     ,-------|  open  |-------.     | recv H   |
527# |          |    /        |        |        \    |          |
528# |          v   v         +--------+         v   v          |
529# |      +----------+          |           +----------+      |
530# |      |   half   |          |           |   half   |      |
531# |      |  closed  |          | send R /  |  closed  |      |
532# |      | (remote) |          | recv R    | (local)  |      |
533# |      +----------+          |           +----------+      |
534# |           |                |                 |           |
535# |           | send ES /      |       recv ES / |           |
536# |           | send R /       v        send R / |           |
537# |           | recv R     +--------+   recv R   |           |
538# | send R /  `----------->|        |<-----------'  send R / |
539# | recv R                 | closed |               recv R   |
540# `----------------------->|        |<----------------------'
541#                          +--------+
542#
543#    send:   endpoint sends this frame
544#    recv:   endpoint receives this frame
545#
546#    H:  HEADERS frame (with implied CONTINUATIONs)
547#    PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
548#    ES: END_STREAM flag
549#    R:  RST_STREAM frame
550#
551# For the purposes of this state machine we treat HEADERS and their
552# associated CONTINUATION frames as a single jumbo frame. The protocol
553# allows/requires this by preventing other frames from being interleved in
554# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is
555# received without a prior HEADERS frame, it *will* be passed to this state
556# machine. The state machine should always reject that frame, either as an
557# invalid transition or because the stream is closed.
558#
559# There is a confusing relationship around PUSH_PROMISE frames. The state
560# machine above considers them to be frames belonging to the new stream,
561# which is *somewhat* true. However, they are sent with the stream ID of
562# their related stream, and are only sendable in some cases.
563# For this reason, our state machine implementation below allows for
564# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also
565# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states.
566# Essentially, for hyper-h2, PUSH_PROMISE frames are effectively sent on
567# two streams.
568#
569# The _transitions dictionary contains a mapping of tuples of
570# (state, input) to tuples of (side_effect_function, end_state). This
571# map contains all allowed transitions: anything not in this map is
572# invalid and immediately causes a transition to ``closed``.
573_transitions = {
574    # State: idle
575    (StreamState.IDLE, StreamInputs.SEND_HEADERS):
576        (H2StreamStateMachine.request_sent, StreamState.OPEN),
577    (StreamState.IDLE, StreamInputs.RECV_HEADERS):
578        (H2StreamStateMachine.request_received, StreamState.OPEN),
579    (StreamState.IDLE, StreamInputs.RECV_DATA):
580        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
581    (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE):
582        (H2StreamStateMachine.send_new_pushed_stream,
583            StreamState.RESERVED_LOCAL),
584    (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE):
585        (H2StreamStateMachine.recv_new_pushed_stream,
586            StreamState.RESERVED_REMOTE),
587    (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
588        (None, StreamState.IDLE),
589    (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT):
590        (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL),
591    (StreamState.IDLE, StreamInputs.UPGRADE_SERVER):
592        (H2StreamStateMachine.request_received,
593            StreamState.HALF_CLOSED_REMOTE),
594
595    # State: reserved local
596    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS):
597        (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
598    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA):
599        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
600    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
601        (None, StreamState.RESERVED_LOCAL),
602    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
603        (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL),
604    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM):
605        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
606    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM):
607        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
608    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
609        (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL),
610    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
611        (None, StreamState.RESERVED_LOCAL),
612
613    # State: reserved remote
614    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS):
615        (H2StreamStateMachine.response_received,
616            StreamState.HALF_CLOSED_LOCAL),
617    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA):
618        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
619    (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
620        (None, StreamState.RESERVED_REMOTE),
621    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
622        (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE),
623    (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM):
624        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
625    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM):
626        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
627    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
628        (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE),
629
630    # State: open
631    (StreamState.OPEN, StreamInputs.SEND_HEADERS):
632        (H2StreamStateMachine.response_sent, StreamState.OPEN),
633    (StreamState.OPEN, StreamInputs.RECV_HEADERS):
634        (H2StreamStateMachine.response_received, StreamState.OPEN),
635    (StreamState.OPEN, StreamInputs.SEND_DATA):
636        (None, StreamState.OPEN),
637    (StreamState.OPEN, StreamInputs.RECV_DATA):
638        (H2StreamStateMachine.data_received, StreamState.OPEN),
639    (StreamState.OPEN, StreamInputs.SEND_END_STREAM):
640        (None, StreamState.HALF_CLOSED_LOCAL),
641    (StreamState.OPEN, StreamInputs.RECV_END_STREAM):
642        (H2StreamStateMachine.stream_half_closed,
643         StreamState.HALF_CLOSED_REMOTE),
644    (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE):
645        (None, StreamState.OPEN),
646    (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE):
647        (H2StreamStateMachine.window_updated, StreamState.OPEN),
648    (StreamState.OPEN, StreamInputs.SEND_RST_STREAM):
649        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
650    (StreamState.OPEN, StreamInputs.RECV_RST_STREAM):
651        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
652    (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE):
653        (H2StreamStateMachine.send_push_promise, StreamState.OPEN),
654    (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE):
655        (H2StreamStateMachine.recv_push_promise, StreamState.OPEN),
656    (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS):
657        (H2StreamStateMachine.send_informational_response, StreamState.OPEN),
658    (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS):
659        (H2StreamStateMachine.recv_informational_response, StreamState.OPEN),
660    (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE):
661        (H2StreamStateMachine.send_alt_svc, StreamState.OPEN),
662    (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE):
663        (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN),
664
665    # State: half-closed remote
666    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS):
667        (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
668    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS):
669        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
670    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA):
671        (None, StreamState.HALF_CLOSED_REMOTE),
672    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA):
673        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
674    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM):
675        (H2StreamStateMachine.send_end_stream, StreamState.CLOSED),
676    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
677        (None, StreamState.HALF_CLOSED_REMOTE),
678    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
679        (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE),
680    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM):
681        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
682    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM):
683        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
684    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE):
685        (H2StreamStateMachine.send_push_promise,
686            StreamState.HALF_CLOSED_REMOTE),
687    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE):
688        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
689    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS):
690        (H2StreamStateMachine.send_informational_response,
691            StreamState.HALF_CLOSED_REMOTE),
692    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE):
693        (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE),
694    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
695        (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE),
696
697    # State: half-closed local
698    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS):
699        (H2StreamStateMachine.response_received,
700            StreamState.HALF_CLOSED_LOCAL),
701    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA):
702        (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL),
703    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM):
704        (H2StreamStateMachine.stream_ended, StreamState.CLOSED),
705    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
706        (None, StreamState.HALF_CLOSED_LOCAL),
707    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
708        (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL),
709    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM):
710        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
711    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM):
712        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
713    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE):
714        (H2StreamStateMachine.recv_push_promise,
715            StreamState.HALF_CLOSED_LOCAL),
716    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS):
717        (H2StreamStateMachine.recv_informational_response,
718            StreamState.HALF_CLOSED_LOCAL),
719    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
720        (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL),
721    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
722        (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL),
723
724    # State: closed
725    (StreamState.CLOSED, StreamInputs.RECV_END_STREAM):
726        (None, StreamState.CLOSED),
727    (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE):
728        (None, StreamState.CLOSED),
729
730    # RFC 7540 Section 5.1 defines how the end point should react when
731    # receiving a frame on a closed stream with the following statements:
732    #
733    # > An endpoint that receives any frame other than PRIORITY after receiving
734    # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED.
735    # > An endpoint that receives any frames after receiving a frame with the
736    # > END_STREAM flag set MUST treat that as a connection error of type
737    # > STREAM_CLOSED.
738    (StreamState.CLOSED, StreamInputs.RECV_HEADERS):
739        (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
740    (StreamState.CLOSED, StreamInputs.RECV_DATA):
741        (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
742
743    # > WINDOW_UPDATE or RST_STREAM frames can be received in this state
744    # > for a short period after a DATA or HEADERS frame containing a
745    # > END_STREAM flag is sent.
746    (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE):
747        (H2StreamStateMachine.window_on_closed_stream, StreamState.CLOSED),
748    (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM):
749        (H2StreamStateMachine.reset_on_closed_stream, StreamState.CLOSED),
750
751    # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is
752    # > neither "open" nor "half-closed (local)" as a connection error of type
753    # > PROTOCOL_ERROR.
754    (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE):
755        (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED),
756
757    # Also, users should be forbidden from sending on closed streams.
758    (StreamState.CLOSED, StreamInputs.SEND_HEADERS):
759        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
760    (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE):
761        (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED),
762    (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM):
763        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
764    (StreamState.CLOSED, StreamInputs.SEND_DATA):
765        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
766    (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE):
767        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
768    (StreamState.CLOSED, StreamInputs.SEND_END_STREAM):
769        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
770}
771
772
773class H2Stream(object):
774    """
775    A low-level HTTP/2 stream object. This handles building and receiving
776    frames and maintains per-stream state.
777
778    This wraps a HTTP/2 Stream state machine implementation, ensuring that
779    frames can only be sent/received when the stream is in a valid state.
780    Attempts to create frames that cannot be sent will raise a
781    ``ProtocolError``.
782    """
783    def __init__(self,
784                 stream_id,
785                 config,
786                 inbound_window_size,
787                 outbound_window_size):
788        self.state_machine = H2StreamStateMachine(stream_id)
789        self.stream_id = stream_id
790        self.max_outbound_frame_size = None
791        self.request_method = None
792
793        # The curent value of the outbound stream flow control window
794        self.outbound_flow_control_window = outbound_window_size
795
796        # The flow control manager.
797        self._inbound_window_manager = WindowManager(inbound_window_size)
798
799        # The expected content length, if any.
800        self._expected_content_length = None
801
802        # The actual received content length. Always tracked.
803        self._actual_content_length = 0
804
805        # The authority we believe this stream belongs to.
806        self._authority = None
807
808        # The configuration for this stream.
809        self.config = config
810
811    def __repr__(self):
812        return "<%s id:%d state:%r>" % (
813            type(self).__name__,
814            self.stream_id,
815            self.state_machine.state
816        )
817
818    @property
819    def inbound_flow_control_window(self):
820        """
821        The size of the inbound flow control window for the stream. This is
822        rarely publicly useful: instead, use :meth:`remote_flow_control_window
823        <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is
824        largely present to provide a shortcut to this data.
825        """
826        return self._inbound_window_manager.current_window_size
827
828    @property
829    def open(self):
830        """
831        Whether the stream is 'open' in any sense: that is, whether it counts
832        against the number of concurrent streams.
833        """
834        # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either
835        # the OPEN state or either of the HALF_CLOSED states. Perplexingly,
836        # this excludes the reserved states.
837        # For more detail on why we're doing this in this slightly weird way,
838        # see the comment on ``STREAM_OPEN`` at the top of the file.
839        return STREAM_OPEN[self.state_machine.state]
840
841    @property
842    def closed(self):
843        """
844        Whether the stream is closed.
845        """
846        return self.state_machine.state == StreamState.CLOSED
847
848    @property
849    def closed_by(self):
850        """
851        Returns how the stream was closed, as one of StreamClosedBy.
852        """
853        return self.state_machine.stream_closed_by
854
855    def upgrade(self, client_side):
856        """
857        Called by the connection to indicate that this stream is the initial
858        request/response of an upgraded connection. Places the stream into an
859        appropriate state.
860        """
861        self.config.logger.debug("Upgrading %r", self)
862
863        assert self.stream_id == 1
864        input_ = (
865            StreamInputs.UPGRADE_CLIENT if client_side
866            else StreamInputs.UPGRADE_SERVER
867        )
868
869        # This may return events, we deliberately don't want them.
870        self.state_machine.process_input(input_)
871        return
872
873    def send_headers(self, headers, encoder, end_stream=False):
874        """
875        Returns a list of HEADERS/CONTINUATION frames to emit as either headers
876        or trailers.
877        """
878        self.config.logger.debug("Send headers %s on %r", headers, self)
879        # Convert headers to two-tuples.
880        # FIXME: The fallback for dictionary headers is to be removed in 3.0.
881        try:
882            headers = headers.items()
883            warnings.warn(
884                "Implicit conversion of dictionaries to two-tuples for "
885                "headers is deprecated and will be removed in 3.0.",
886                DeprecationWarning
887            )
888        except AttributeError:
889            headers = headers
890
891        # Because encoding headers makes an irreversible change to the header
892        # compression context, we make the state transition before we encode
893        # them.
894
895        # First, check if we're a client. If we are, no problem: if we aren't,
896        # we need to scan the header block to see if this is an informational
897        # response.
898        input_ = StreamInputs.SEND_HEADERS
899        if ((not self.state_machine.client) and
900                is_informational_response(headers)):
901            if end_stream:
902                raise ProtocolError(
903                    "Cannot set END_STREAM on informational responses."
904                )
905
906            input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS
907
908        events = self.state_machine.process_input(input_)
909
910        hf = HeadersFrame(self.stream_id)
911        hdr_validation_flags = self._build_hdr_validation_flags(events)
912        frames = self._build_headers_frames(
913            headers, encoder, hf, hdr_validation_flags
914        )
915
916        if end_stream:
917            # Not a bug: the END_STREAM flag is valid on the initial HEADERS
918            # frame, not the CONTINUATION frames that follow.
919            self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
920            frames[0].flags.add('END_STREAM')
921
922        if self.state_machine.trailers_sent and not end_stream:
923            raise ProtocolError("Trailers must have END_STREAM set.")
924
925        if self.state_machine.client and self._authority is None:
926            self._authority = authority_from_headers(headers)
927
928        # store request method for _initialize_content_length
929        self.request_method = extract_method_header(headers)
930
931        return frames
932
933    def push_stream_in_band(self, related_stream_id, headers, encoder):
934        """
935        Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed
936        stream header. Called on the stream that has the PUSH_PROMISE frame
937        sent on it.
938        """
939        self.config.logger.debug("Push stream %r", self)
940
941        # Because encoding headers makes an irreversible change to the header
942        # compression context, we make the state transition *first*.
943
944        events = self.state_machine.process_input(
945            StreamInputs.SEND_PUSH_PROMISE
946        )
947
948        ppf = PushPromiseFrame(self.stream_id)
949        ppf.promised_stream_id = related_stream_id
950        hdr_validation_flags = self._build_hdr_validation_flags(events)
951        frames = self._build_headers_frames(
952            headers, encoder, ppf, hdr_validation_flags
953        )
954
955        return frames
956
957    def locally_pushed(self):
958        """
959        Mark this stream as one that was pushed by this peer. Must be called
960        immediately after initialization. Sends no frames, simply updates the
961        state machine.
962        """
963        # This does not trigger any events.
964        events = self.state_machine.process_input(
965            StreamInputs.SEND_PUSH_PROMISE
966        )
967        assert not events
968        return []
969
970    def send_data(self, data, end_stream=False, pad_length=None):
971        """
972        Prepare some data frames. Optionally end the stream.
973
974        .. warning:: Does not perform flow control checks.
975        """
976        self.config.logger.debug(
977            "Send data on %r with end stream set to %s", self, end_stream
978        )
979
980        self.state_machine.process_input(StreamInputs.SEND_DATA)
981
982        df = DataFrame(self.stream_id)
983        df.data = data
984        if end_stream:
985            self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
986            df.flags.add('END_STREAM')
987        if pad_length is not None:
988            df.flags.add('PADDED')
989            df.pad_length = pad_length
990
991        # Subtract flow_controlled_length to account for possible padding
992        self.outbound_flow_control_window -= df.flow_controlled_length
993        assert self.outbound_flow_control_window >= 0
994
995        return [df]
996
997    def end_stream(self):
998        """
999        End a stream without sending data.
1000        """
1001        self.config.logger.debug("End stream %r", self)
1002
1003        self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
1004        df = DataFrame(self.stream_id)
1005        df.flags.add('END_STREAM')
1006        return [df]
1007
1008    def advertise_alternative_service(self, field_value):
1009        """
1010        Advertise an RFC 7838 alternative service. The semantics of this are
1011        better documented in the ``H2Connection`` class.
1012        """
1013        self.config.logger.debug(
1014            "Advertise alternative service of %r for %r", field_value, self
1015        )
1016        self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE)
1017        asf = AltSvcFrame(self.stream_id)
1018        asf.field = field_value
1019        return [asf]
1020
1021    def increase_flow_control_window(self, increment):
1022        """
1023        Increase the size of the flow control window for the remote side.
1024        """
1025        self.config.logger.debug(
1026            "Increase flow control window for %r by %d",
1027            self, increment
1028        )
1029        self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE)
1030        self._inbound_window_manager.window_opened(increment)
1031
1032        wuf = WindowUpdateFrame(self.stream_id)
1033        wuf.window_increment = increment
1034        return [wuf]
1035
1036    def receive_push_promise_in_band(self,
1037                                     promised_stream_id,
1038                                     headers,
1039                                     header_encoding):
1040        """
1041        Receives a push promise frame sent on this stream, pushing a remote
1042        stream. This is called on the stream that has the PUSH_PROMISE sent
1043        on it.
1044        """
1045        self.config.logger.debug(
1046            "Receive Push Promise on %r for remote stream %d",
1047            self, promised_stream_id
1048        )
1049        events = self.state_machine.process_input(
1050            StreamInputs.RECV_PUSH_PROMISE
1051        )
1052        events[0].pushed_stream_id = promised_stream_id
1053
1054        if self.config.validate_inbound_headers:
1055            hdr_validation_flags = self._build_hdr_validation_flags(events)
1056            headers = validate_headers(headers, hdr_validation_flags)
1057
1058        if header_encoding:
1059            headers = list(_decode_headers(headers, header_encoding))
1060        events[0].headers = headers
1061        return [], events
1062
1063    def remotely_pushed(self, pushed_headers):
1064        """
1065        Mark this stream as one that was pushed by the remote peer. Must be
1066        called immediately after initialization. Sends no frames, simply
1067        updates the state machine.
1068        """
1069        self.config.logger.debug("%r pushed by remote peer", self)
1070        events = self.state_machine.process_input(
1071            StreamInputs.RECV_PUSH_PROMISE
1072        )
1073        self._authority = authority_from_headers(pushed_headers)
1074        return [], events
1075
1076    def receive_headers(self, headers, end_stream, header_encoding):
1077        """
1078        Receive a set of headers (or trailers).
1079        """
1080        if is_informational_response(headers):
1081            if end_stream:
1082                raise ProtocolError(
1083                    "Cannot set END_STREAM on informational responses"
1084                )
1085            input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS
1086        else:
1087            input_ = StreamInputs.RECV_HEADERS
1088
1089        events = self.state_machine.process_input(input_)
1090
1091        if end_stream:
1092            es_events = self.state_machine.process_input(
1093                StreamInputs.RECV_END_STREAM
1094            )
1095            events[0].stream_ended = es_events[0]
1096            events += es_events
1097
1098        self._initialize_content_length(headers)
1099
1100        if isinstance(events[0], TrailersReceived):
1101            if not end_stream:
1102                raise ProtocolError("Trailers must have END_STREAM set")
1103
1104        if self.config.validate_inbound_headers:
1105            hdr_validation_flags = self._build_hdr_validation_flags(events)
1106            headers = validate_headers(headers, hdr_validation_flags)
1107
1108        if header_encoding:
1109            headers = list(_decode_headers(headers, header_encoding))
1110
1111        events[0].headers = headers
1112        return [], events
1113
1114    def receive_data(self, data, end_stream, flow_control_len):
1115        """
1116        Receive some data.
1117        """
1118        self.config.logger.debug(
1119            "Receive data on %r with end stream %s and flow control length "
1120            "set to %d", self, end_stream, flow_control_len
1121        )
1122        events = self.state_machine.process_input(StreamInputs.RECV_DATA)
1123        self._inbound_window_manager.window_consumed(flow_control_len)
1124        self._track_content_length(len(data), end_stream)
1125
1126        if end_stream:
1127            es_events = self.state_machine.process_input(
1128                StreamInputs.RECV_END_STREAM
1129            )
1130            events[0].stream_ended = es_events[0]
1131            events.extend(es_events)
1132
1133        events[0].data = data
1134        events[0].flow_controlled_length = flow_control_len
1135        return [], events
1136
1137    def receive_window_update(self, increment):
1138        """
1139        Handle a WINDOW_UPDATE increment.
1140        """
1141        self.config.logger.debug(
1142            "Receive Window Update on %r for increment of %d",
1143            self, increment
1144        )
1145        events = self.state_machine.process_input(
1146            StreamInputs.RECV_WINDOW_UPDATE
1147        )
1148        frames = []
1149
1150        # If we encounter a problem with incrementing the flow control window,
1151        # this should be treated as a *stream* error, not a *connection* error.
1152        # That means we need to catch the error and forcibly close the stream.
1153        if events:
1154            events[0].delta = increment
1155            try:
1156                self.outbound_flow_control_window = guard_increment_window(
1157                    self.outbound_flow_control_window,
1158                    increment
1159                )
1160            except FlowControlError:
1161                # Ok, this is bad. We're going to need to perform a local
1162                # reset.
1163                event = StreamReset()
1164                event.stream_id = self.stream_id
1165                event.error_code = ErrorCodes.FLOW_CONTROL_ERROR
1166                event.remote_reset = False
1167
1168                events = [event]
1169                frames = self.reset_stream(event.error_code)
1170
1171        return frames, events
1172
1173    def receive_continuation(self):
1174        """
1175        A naked CONTINUATION frame has been received. This is always an error,
1176        but the type of error it is depends on the state of the stream and must
1177        transition the state of the stream, so we need to handle it.
1178        """
1179        self.config.logger.debug("Receive Continuation frame on %r", self)
1180        self.state_machine.process_input(
1181            StreamInputs.RECV_CONTINUATION
1182        )
1183        assert False, "Should not be reachable"
1184
1185    def receive_alt_svc(self, frame):
1186        """
1187        An Alternative Service frame was received on the stream. This frame
1188        inherits the origin associated with this stream.
1189        """
1190        self.config.logger.debug(
1191            "Receive Alternative Service frame on stream %r", self
1192        )
1193
1194        # If the origin is present, RFC 7838 says we have to ignore it.
1195        if frame.origin:
1196            return [], []
1197
1198        events = self.state_machine.process_input(
1199            StreamInputs.RECV_ALTERNATIVE_SERVICE
1200        )
1201
1202        # There are lots of situations where we want to ignore the ALTSVC
1203        # frame. If we need to pay attention, we'll have an event and should
1204        # fill it out.
1205        if events:
1206            assert isinstance(events[0], AlternativeServiceAvailable)
1207            events[0].origin = self._authority
1208            events[0].field_value = frame.field
1209
1210        return [], events
1211
1212    def reset_stream(self, error_code=0):
1213        """
1214        Close the stream locally. Reset the stream with an error code.
1215        """
1216        self.config.logger.debug(
1217            "Local reset %r with error code: %d", self, error_code
1218        )
1219        self.state_machine.process_input(StreamInputs.SEND_RST_STREAM)
1220
1221        rsf = RstStreamFrame(self.stream_id)
1222        rsf.error_code = error_code
1223        return [rsf]
1224
1225    def stream_reset(self, frame):
1226        """
1227        Handle a stream being reset remotely.
1228        """
1229        self.config.logger.debug(
1230            "Remote reset %r with error code: %d", self, frame.error_code
1231        )
1232        events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM)
1233
1234        if events:
1235            # We don't fire an event if this stream is already closed.
1236            events[0].error_code = _error_code_from_int(frame.error_code)
1237
1238        return [], events
1239
1240    def acknowledge_received_data(self, acknowledged_size):
1241        """
1242        The user has informed us that they've processed some amount of data
1243        that was received on this stream. Pass that to the window manager and
1244        potentially return some WindowUpdate frames.
1245        """
1246        self.config.logger.debug(
1247            "Acknowledge received data with size %d on %r",
1248            acknowledged_size, self
1249        )
1250        increment = self._inbound_window_manager.process_bytes(
1251            acknowledged_size
1252        )
1253        if increment:
1254            f = WindowUpdateFrame(self.stream_id)
1255            f.window_increment = increment
1256            return [f]
1257
1258        return []
1259
1260    def _build_hdr_validation_flags(self, events):
1261        """
1262        Constructs a set of header validation flags for use when normalizing
1263        and validating header blocks.
1264        """
1265        is_trailer = isinstance(
1266            events[0], (_TrailersSent, TrailersReceived)
1267        )
1268        is_response_header = isinstance(
1269            events[0],
1270            (
1271                _ResponseSent,
1272                ResponseReceived,
1273                InformationalResponseReceived
1274            )
1275        )
1276        is_push_promise = isinstance(
1277            events[0], (PushedStreamReceived, _PushedRequestSent)
1278        )
1279
1280        return HeaderValidationFlags(
1281            is_client=self.state_machine.client,
1282            is_trailer=is_trailer,
1283            is_response_header=is_response_header,
1284            is_push_promise=is_push_promise,
1285        )
1286
1287    def _build_headers_frames(self,
1288                              headers,
1289                              encoder,
1290                              first_frame,
1291                              hdr_validation_flags):
1292        """
1293        Helper method to build headers or push promise frames.
1294        """
1295        # We need to lowercase the header names, and to ensure that secure
1296        # header fields are kept out of compression contexts.
1297        if self.config.normalize_outbound_headers:
1298            headers = normalize_outbound_headers(
1299                headers, hdr_validation_flags
1300            )
1301        if self.config.validate_outbound_headers:
1302            headers = validate_outbound_headers(
1303                headers, hdr_validation_flags
1304            )
1305
1306        encoded_headers = encoder.encode(headers)
1307
1308        # Slice into blocks of max_outbound_frame_size. Be careful with this:
1309        # it only works right because we never send padded frames or priority
1310        # information on the frames. Revisit this if we do.
1311        header_blocks = [
1312            encoded_headers[i:i+self.max_outbound_frame_size]
1313            for i in range(
1314                0, len(encoded_headers), self.max_outbound_frame_size
1315            )
1316        ]
1317
1318        frames = []
1319        first_frame.data = header_blocks[0]
1320        frames.append(first_frame)
1321
1322        for block in header_blocks[1:]:
1323            cf = ContinuationFrame(self.stream_id)
1324            cf.data = block
1325            frames.append(cf)
1326
1327        frames[-1].flags.add('END_HEADERS')
1328        return frames
1329
1330    def _initialize_content_length(self, headers):
1331        """
1332        Checks the headers for a content-length header and initializes the
1333        _expected_content_length field from it. It's not an error for no
1334        Content-Length header to be present.
1335        """
1336        if self.request_method == b'HEAD':
1337            self._expected_content_length = 0
1338            return
1339
1340        for n, v in headers:
1341            if n == b'content-length':
1342                try:
1343                    self._expected_content_length = int(v, 10)
1344                except ValueError:
1345                    raise ProtocolError(
1346                        "Invalid content-length header: %s" % v
1347                    )
1348
1349                return
1350
1351    def _track_content_length(self, length, end_stream):
1352        """
1353        Update the expected content length in response to data being received.
1354        Validates that the appropriate amount of data is sent. Always updates
1355        the received data, but only validates the length against the
1356        content-length header if one was sent.
1357
1358        :param length: The length of the body chunk received.
1359        :param end_stream: If this is the last body chunk received.
1360        """
1361        self._actual_content_length += length
1362        actual = self._actual_content_length
1363        expected = self._expected_content_length
1364
1365        if expected is not None:
1366            if expected < actual:
1367                raise InvalidBodyLengthError(expected, actual)
1368
1369            if end_stream and expected != actual:
1370                raise InvalidBodyLengthError(expected, actual)
1371
1372    def _inbound_flow_control_change_from_settings(self, delta):
1373        """
1374        We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to
1375        update the target window size for flow control. For our flow control
1376        strategy, this means we need to do two things: we need to adjust the
1377        current window size, but we also need to set the target maximum window
1378        size to the new value.
1379        """
1380        new_max_size = self._inbound_window_manager.max_window_size + delta
1381        self._inbound_window_manager.window_opened(delta)
1382        self._inbound_window_manager.max_window_size = new_max_size
1383
1384
1385def _decode_headers(headers, encoding):
1386    """
1387    Given an iterable of header two-tuples and an encoding, decodes those
1388    headers using that encoding while preserving the type of the header tuple.
1389    This ensures that the use of ``HeaderTuple`` is preserved.
1390    """
1391    for header in headers:
1392        # This function expects to work on decoded headers, which are always
1393        # HeaderTuple objects.
1394        assert isinstance(header, HeaderTuple)
1395
1396        name, value = header
1397        name = name.decode(encoding)
1398        value = value.decode(encoding)
1399        yield header.__class__(name, value)
1400