1# -*- coding: utf-8 -*-
2"""
3h2/stream
4~~~~~~~~~
5
6An implementation of a HTTP/2 stream.
7"""
8from enum import Enum, IntEnum
9from hpack import HeaderTuple
10from hyperframe.frame import (
11    HeadersFrame, ContinuationFrame, DataFrame, WindowUpdateFrame,
12    RstStreamFrame, PushPromiseFrame, AltSvcFrame
13)
14
15from .errors import ErrorCodes, _error_code_from_int
16from .events import (
17    RequestReceived, ResponseReceived, DataReceived, WindowUpdated,
18    StreamEnded, PushedStreamReceived, StreamReset, TrailersReceived,
19    InformationalResponseReceived, AlternativeServiceAvailable,
20    _ResponseSent, _RequestSent, _TrailersSent, _PushedRequestSent
21)
22from .exceptions import (
23    ProtocolError, StreamClosedError, InvalidBodyLengthError, FlowControlError
24)
25from .utilities import (
26    guard_increment_window, is_informational_response, authority_from_headers,
27    validate_headers, validate_outbound_headers, normalize_outbound_headers,
28    HeaderValidationFlags, extract_method_header, normalize_inbound_headers
29)
30from .windows import WindowManager
31
32
33class StreamState(IntEnum):
34    IDLE = 0
35    RESERVED_REMOTE = 1
36    RESERVED_LOCAL = 2
37    OPEN = 3
38    HALF_CLOSED_REMOTE = 4
39    HALF_CLOSED_LOCAL = 5
40    CLOSED = 6
41
42
43class StreamInputs(Enum):
44    SEND_HEADERS = 0
45    SEND_PUSH_PROMISE = 1
46    SEND_RST_STREAM = 2
47    SEND_DATA = 3
48    SEND_WINDOW_UPDATE = 4
49    SEND_END_STREAM = 5
50    RECV_HEADERS = 6
51    RECV_PUSH_PROMISE = 7
52    RECV_RST_STREAM = 8
53    RECV_DATA = 9
54    RECV_WINDOW_UPDATE = 10
55    RECV_END_STREAM = 11
56    RECV_CONTINUATION = 12  # Added in 2.0.0
57    SEND_INFORMATIONAL_HEADERS = 13  # Added in 2.2.0
58    RECV_INFORMATIONAL_HEADERS = 14  # Added in 2.2.0
59    SEND_ALTERNATIVE_SERVICE = 15  # Added in 2.3.0
60    RECV_ALTERNATIVE_SERVICE = 16  # Added in 2.3.0
61    UPGRADE_CLIENT = 17  # Added 2.3.0
62    UPGRADE_SERVER = 18  # Added 2.3.0
63
64
65class StreamClosedBy(Enum):
66    SEND_END_STREAM = 0
67    RECV_END_STREAM = 1
68    SEND_RST_STREAM = 2
69    RECV_RST_STREAM = 3
70
71
72# This array is initialized once, and is indexed by the stream states above.
73# It indicates whether a stream in the given state is open. The reason we do
74# this is that we potentially check whether a stream in a given state is open
75# quite frequently: given that we check so often, we should do so in the
76# fastest and most performant way possible.
77STREAM_OPEN = [False for _ in range(0, len(StreamState))]
78STREAM_OPEN[StreamState.OPEN] = True
79STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True
80STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True
81
82
83class H2StreamStateMachine(object):
84    """
85    A single HTTP/2 stream state machine.
86
87    This stream object implements basically the state machine described in
88    RFC 7540 section 5.1.
89
90    :param stream_id: The stream ID of this stream. This is stored primarily
91        for logging purposes.
92    """
93    def __init__(self, stream_id):
94        self.state = StreamState.IDLE
95        self.stream_id = stream_id
96
97        #: Whether this peer is the client side of this stream.
98        self.client = None
99
100        # Whether trailers have been sent/received on this stream or not.
101        self.headers_sent = None
102        self.trailers_sent = None
103        self.headers_received = None
104        self.trailers_received = None
105
106        # How the stream was closed. One of StreamClosedBy.
107        self.stream_closed_by = None
108
109    def process_input(self, input_):
110        """
111        Process a specific input in the state machine.
112        """
113        if not isinstance(input_, StreamInputs):
114            raise ValueError("Input must be an instance of StreamInputs")
115
116        try:
117            func, target_state = _transitions[(self.state, input_)]
118        except KeyError:
119            old_state = self.state
120            self.state = StreamState.CLOSED
121            raise ProtocolError(
122                "Invalid input %s in state %s" % (input_, old_state)
123            )
124        else:
125            previous_state = self.state
126            self.state = target_state
127            if func is not None:
128                try:
129                    return func(self, previous_state)
130                except ProtocolError:
131                    self.state = StreamState.CLOSED
132                    raise
133                except AssertionError as e:  # pragma: no cover
134                    self.state = StreamState.CLOSED
135                    raise ProtocolError(e)
136
137            return []
138
139    def request_sent(self, previous_state):
140        """
141        Fires when a request is sent.
142        """
143        self.client = True
144        self.headers_sent = True
145        event = _RequestSent()
146
147        return [event]
148
149    def response_sent(self, previous_state):
150        """
151        Fires when something that should be a response is sent. This 'response'
152        may actually be trailers.
153        """
154        if not self.headers_sent:
155            if self.client is True or self.client is None:
156                raise ProtocolError("Client cannot send responses.")
157            self.headers_sent = True
158            event = _ResponseSent()
159        else:
160            assert not self.trailers_sent
161            self.trailers_sent = True
162            event = _TrailersSent()
163
164        return [event]
165
166    def request_received(self, previous_state):
167        """
168        Fires when a request is received.
169        """
170        assert not self.headers_received
171        assert not self.trailers_received
172
173        self.client = False
174        self.headers_received = True
175        event = RequestReceived()
176
177        event.stream_id = self.stream_id
178        return [event]
179
180    def response_received(self, previous_state):
181        """
182        Fires when a response is received. Also disambiguates between responses
183        and trailers.
184        """
185        if not self.headers_received:
186            assert self.client is True
187            self.headers_received = True
188            event = ResponseReceived()
189        else:
190            assert not self.trailers_received
191            self.trailers_received = True
192            event = TrailersReceived()
193
194        event.stream_id = self.stream_id
195        return [event]
196
197    def data_received(self, previous_state):
198        """
199        Fires when data is received.
200        """
201        event = DataReceived()
202        event.stream_id = self.stream_id
203        return [event]
204
205    def window_updated(self, previous_state):
206        """
207        Fires when a window update frame is received.
208        """
209        event = WindowUpdated()
210        event.stream_id = self.stream_id
211        return [event]
212
213    def stream_half_closed(self, previous_state):
214        """
215        Fires when an END_STREAM flag is received in the OPEN state,
216        transitioning this stream to a HALF_CLOSED_REMOTE state.
217        """
218        event = StreamEnded()
219        event.stream_id = self.stream_id
220        return [event]
221
222    def stream_ended(self, previous_state):
223        """
224        Fires when a stream is cleanly ended.
225        """
226        self.stream_closed_by = StreamClosedBy.RECV_END_STREAM
227        event = StreamEnded()
228        event.stream_id = self.stream_id
229        return [event]
230
231    def stream_reset(self, previous_state):
232        """
233        Fired when a stream is forcefully reset.
234        """
235        self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM
236        event = StreamReset()
237        event.stream_id = self.stream_id
238        return [event]
239
240    def send_new_pushed_stream(self, previous_state):
241        """
242        Fires on the newly pushed stream, when pushed by the local peer.
243
244        No event here, but definitionally this peer must be a server.
245        """
246        assert self.client is None
247        self.client = False
248        self.headers_received = True
249        return []
250
251    def recv_new_pushed_stream(self, previous_state):
252        """
253        Fires on the newly pushed stream, when pushed by the remote peer.
254
255        No event here, but definitionally this peer must be a client.
256        """
257        assert self.client is None
258        self.client = True
259        self.headers_sent = True
260        return []
261
262    def send_push_promise(self, previous_state):
263        """
264        Fires on the already-existing stream when a PUSH_PROMISE frame is sent.
265        We may only send PUSH_PROMISE frames if we're a server.
266        """
267        if self.client is True:
268            raise ProtocolError("Cannot push streams from client peers.")
269
270        event = _PushedRequestSent()
271        return [event]
272
273    def recv_push_promise(self, previous_state):
274        """
275        Fires on the already-existing stream when a PUSH_PROMISE frame is
276        received. We may only receive PUSH_PROMISE frames if we're a client.
277
278        Fires a PushedStreamReceived event.
279        """
280        if not self.client:
281            if self.client is None:  # pragma: no cover
282                msg = "Idle streams cannot receive pushes"
283            else:  # pragma: no cover
284                msg = "Cannot receive pushed streams as a server"
285            raise ProtocolError(msg)
286
287        event = PushedStreamReceived()
288        event.parent_stream_id = self.stream_id
289        return [event]
290
291    def send_end_stream(self, previous_state):
292        """
293        Called when an attempt is made to send END_STREAM in the
294        HALF_CLOSED_REMOTE state.
295        """
296        self.stream_closed_by = StreamClosedBy.SEND_END_STREAM
297
298    def send_reset_stream(self, previous_state):
299        """
300        Called when an attempt is made to send RST_STREAM in a non-closed
301        stream state.
302        """
303        self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
304
305    def reset_stream_on_error(self, previous_state):
306        """
307        Called when we need to forcefully emit another RST_STREAM frame on
308        behalf of the state machine.
309
310        If this is the first time we've done this, we should also hang an event
311        off the StreamClosedError so that the user can be informed. We know
312        it's the first time we've done this if the stream is currently in a
313        state other than CLOSED.
314        """
315        self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
316
317        error = StreamClosedError(self.stream_id)
318
319        event = StreamReset()
320        event.stream_id = self.stream_id
321        event.error_code = ErrorCodes.STREAM_CLOSED
322        event.remote_reset = False
323        error._events = [event]
324        raise error
325
326    def recv_on_closed_stream(self, previous_state):
327        """
328        Called when an unexpected frame is received on an already-closed
329        stream.
330
331        An endpoint that receives an unexpected frame should treat it as
332        a stream error or connection error with type STREAM_CLOSED, depending
333        on the specific frame. The error handling is done at a higher level:
334        this just raises the appropriate error.
335        """
336        raise StreamClosedError(self.stream_id)
337
338    def send_on_closed_stream(self, previous_state):
339        """
340        Called when an attempt is made to send data on an already-closed
341        stream.
342
343        This essentially overrides the standard logic by throwing a
344        more-specific error: StreamClosedError. This is a ProtocolError, so it
345        matches the standard API of the state machine, but provides more detail
346        to the user.
347        """
348        raise StreamClosedError(self.stream_id)
349
350    def recv_push_on_closed_stream(self, previous_state):
351        """
352        Called when a PUSH_PROMISE frame is received on a full stop
353        stream.
354
355        If the stream was closed by us sending a RST_STREAM frame, then we
356        presume that the PUSH_PROMISE was in flight when we reset the parent
357        stream. Rathen than accept the new stream, we just reset it.
358        Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a
359        naturally closed stream is a real problem because it creates a brand
360        new stream that the remote peer now believes exists.
361        """
362        assert self.stream_closed_by is not None
363
364        if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM:
365            raise StreamClosedError(self.stream_id)
366        else:
367            raise ProtocolError("Attempted to push on closed stream.")
368
369    def send_push_on_closed_stream(self, previous_state):
370        """
371        Called when an attempt is made to push on an already-closed stream.
372
373        This essentially overrides the standard logic by providing a more
374        useful error message. It's necessary because simply indicating that the
375        stream is closed is not enough: there is now a new stream that is not
376        allowed to be there. The only recourse is to tear the whole connection
377        down.
378        """
379        raise ProtocolError("Attempted to push on closed stream.")
380
381    def window_on_closed_stream(self, previous_state):
382        """
383        Called when a WINDOW_UPDATE frame is received on an already-closed
384        stream.
385
386        If we sent an END_STREAM frame, we just ignore the frame, as instructed
387        in RFC 7540 Section 5.1. Technically we should eventually consider
388        WINDOW_UPDATE in this state an error, but we don't have access to a
389        clock so we just always allow it. If we closed the stream for any other
390        reason, we behave as we do for receiving any other frame on a closed
391        stream.
392        """
393        assert self.stream_closed_by is not None
394
395        if self.stream_closed_by == StreamClosedBy.SEND_END_STREAM:
396            return []
397        return self.recv_on_closed_stream(previous_state)
398
399    def reset_on_closed_stream(self, previous_state):
400        """
401        Called when a RST_STREAM frame is received on an already-closed stream.
402
403        If we sent an END_STREAM frame, we just ignore the frame, as instructed
404        in RFC 7540 Section 5.1. Technically we should eventually consider
405        RST_STREAM in this state an error, but we don't have access to a clock
406        so we just always allow it. If we closed the stream for any other
407        reason, we behave as we do for receiving any other frame on a closed
408        stream.
409        """
410        assert self.stream_closed_by is not None
411
412        if self.stream_closed_by is StreamClosedBy.SEND_END_STREAM:
413            return []
414        return self.recv_on_closed_stream(previous_state)
415
416    def send_informational_response(self, previous_state):
417        """
418        Called when an informational header block is sent (that is, a block
419        where the :status header has a 1XX value).
420
421        Only enforces that these are sent *before* final headers are sent.
422        """
423        if self.headers_sent:
424            raise ProtocolError("Information response after final response")
425
426        event = _ResponseSent()
427        return [event]
428
429    def recv_informational_response(self, previous_state):
430        """
431        Called when an informational header block is received (that is, a block
432        where the :status header has a 1XX value).
433        """
434        if self.headers_received:
435            raise ProtocolError("Informational response after final response")
436
437        event = InformationalResponseReceived()
438        event.stream_id = self.stream_id
439        return [event]
440
441    def recv_alt_svc(self, previous_state):
442        """
443        Called when receiving an ALTSVC frame.
444
445        RFC 7838 allows us to receive ALTSVC frames at any stream state, which
446        is really absurdly overzealous. For that reason, we want to limit the
447        states in which we can actually receive it. It's really only sensible
448        to receive it after we've sent our own headers and before the server
449        has sent its header block: the server can't guarantee that we have any
450        state around after it completes its header block, and the server
451        doesn't know what origin we're talking about before we've sent ours.
452
453        For that reason, this function applies a few extra checks on both state
454        and some of the little state variables we keep around. If those suggest
455        an unreasonable situation for the ALTSVC frame to have been sent in,
456        we quietly ignore it (as RFC 7838 suggests).
457
458        This function is also *not* always called by the state machine. In some
459        states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it,
460        because we know the frame cannot be valid in that state (IDLE because
461        the server cannot know what origin the stream applies to, CLOSED
462        because the server cannot assume we still have state around,
463        RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL
464        state then *we* are the server).
465        """
466        # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore
467        # them.
468        if self.client is False:
469            return []
470
471        # If we've received the response headers from the server they can't
472        # guarantee we still have any state around. Other implementations
473        # (like nghttp2) ignore ALTSVC in this state, so we will too.
474        if self.headers_received:
475            return []
476
477        # Otherwise, this is a sensible enough frame to have received. Return
478        # the event and let it get populated.
479        return [AlternativeServiceAvailable()]
480
481    def send_alt_svc(self, previous_state):
482        """
483        Called when sending an ALTSVC frame on this stream.
484
485        For consistency with the restrictions we apply on receiving ALTSVC
486        frames in ``recv_alt_svc``, we want to restrict when users can send
487        ALTSVC frames to the situations when we ourselves would accept them.
488
489        That means: when we are a server, when we have received the request
490        headers, and when we have not yet sent our own response headers.
491        """
492        # We should not send ALTSVC after we've sent response headers, as the
493        # client may have disposed of its state.
494        if self.headers_sent:
495            raise ProtocolError(
496                "Cannot send ALTSVC after sending response headers."
497            )
498
499        return
500
501
502# STATE MACHINE
503#
504# The stream state machine is defined here to avoid the need to allocate it
505# repeatedly for each stream. It cannot be defined in the stream class because
506# it needs to be able to reference the callbacks defined on the class, but
507# because Python's scoping rules are weird the class object is not actually in
508# scope during the body of the class object.
509#
510# For the sake of clarity, we reproduce the RFC 7540 state machine here:
511#
512#                          +--------+
513#                  send PP |        | recv PP
514#                 ,--------|  idle  |--------.
515#                /         |        |         \
516#               v          +--------+          v
517#        +----------+          |           +----------+
518#        |          |          | send H /  |          |
519# ,------| reserved |          | recv H    | reserved |------.
520# |      | (local)  |          |           | (remote) |      |
521# |      +----------+          v           +----------+      |
522# |          |             +--------+             |          |
523# |          |     recv ES |        | send ES     |          |
524# |   send H |     ,-------|  open  |-------.     | recv H   |
525# |          |    /        |        |        \    |          |
526# |          v   v         +--------+         v   v          |
527# |      +----------+          |           +----------+      |
528# |      |   half   |          |           |   half   |      |
529# |      |  closed  |          | send R /  |  closed  |      |
530# |      | (remote) |          | recv R    | (local)  |      |
531# |      +----------+          |           +----------+      |
532# |           |                |                 |           |
533# |           | send ES /      |       recv ES / |           |
534# |           | send R /       v        send R / |           |
535# |           | recv R     +--------+   recv R   |           |
536# | send R /  `----------->|        |<-----------'  send R / |
537# | recv R                 | closed |               recv R   |
538# `----------------------->|        |<----------------------'
539#                          +--------+
540#
541#    send:   endpoint sends this frame
542#    recv:   endpoint receives this frame
543#
544#    H:  HEADERS frame (with implied CONTINUATIONs)
545#    PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
546#    ES: END_STREAM flag
547#    R:  RST_STREAM frame
548#
549# For the purposes of this state machine we treat HEADERS and their
550# associated CONTINUATION frames as a single jumbo frame. The protocol
551# allows/requires this by preventing other frames from being interleved in
552# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is
553# received without a prior HEADERS frame, it *will* be passed to this state
554# machine. The state machine should always reject that frame, either as an
555# invalid transition or because the stream is closed.
556#
557# There is a confusing relationship around PUSH_PROMISE frames. The state
558# machine above considers them to be frames belonging to the new stream,
559# which is *somewhat* true. However, they are sent with the stream ID of
560# their related stream, and are only sendable in some cases.
561# For this reason, our state machine implementation below allows for
562# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also
563# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states.
564# Essentially, for hyper-h2, PUSH_PROMISE frames are effectively sent on
565# two streams.
566#
567# The _transitions dictionary contains a mapping of tuples of
568# (state, input) to tuples of (side_effect_function, end_state). This
569# map contains all allowed transitions: anything not in this map is
570# invalid and immediately causes a transition to ``closed``.
571_transitions = {
572    # State: idle
573    (StreamState.IDLE, StreamInputs.SEND_HEADERS):
574        (H2StreamStateMachine.request_sent, StreamState.OPEN),
575    (StreamState.IDLE, StreamInputs.RECV_HEADERS):
576        (H2StreamStateMachine.request_received, StreamState.OPEN),
577    (StreamState.IDLE, StreamInputs.RECV_DATA):
578        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
579    (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE):
580        (H2StreamStateMachine.send_new_pushed_stream,
581            StreamState.RESERVED_LOCAL),
582    (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE):
583        (H2StreamStateMachine.recv_new_pushed_stream,
584            StreamState.RESERVED_REMOTE),
585    (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
586        (None, StreamState.IDLE),
587    (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT):
588        (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL),
589    (StreamState.IDLE, StreamInputs.UPGRADE_SERVER):
590        (H2StreamStateMachine.request_received,
591            StreamState.HALF_CLOSED_REMOTE),
592
593    # State: reserved local
594    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS):
595        (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
596    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA):
597        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
598    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
599        (None, StreamState.RESERVED_LOCAL),
600    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
601        (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL),
602    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM):
603        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
604    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM):
605        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
606    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
607        (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL),
608    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
609        (None, StreamState.RESERVED_LOCAL),
610
611    # State: reserved remote
612    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS):
613        (H2StreamStateMachine.response_received,
614            StreamState.HALF_CLOSED_LOCAL),
615    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA):
616        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
617    (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
618        (None, StreamState.RESERVED_REMOTE),
619    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
620        (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE),
621    (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM):
622        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
623    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM):
624        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
625    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
626        (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE),
627
628    # State: open
629    (StreamState.OPEN, StreamInputs.SEND_HEADERS):
630        (H2StreamStateMachine.response_sent, StreamState.OPEN),
631    (StreamState.OPEN, StreamInputs.RECV_HEADERS):
632        (H2StreamStateMachine.response_received, StreamState.OPEN),
633    (StreamState.OPEN, StreamInputs.SEND_DATA):
634        (None, StreamState.OPEN),
635    (StreamState.OPEN, StreamInputs.RECV_DATA):
636        (H2StreamStateMachine.data_received, StreamState.OPEN),
637    (StreamState.OPEN, StreamInputs.SEND_END_STREAM):
638        (None, StreamState.HALF_CLOSED_LOCAL),
639    (StreamState.OPEN, StreamInputs.RECV_END_STREAM):
640        (H2StreamStateMachine.stream_half_closed,
641         StreamState.HALF_CLOSED_REMOTE),
642    (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE):
643        (None, StreamState.OPEN),
644    (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE):
645        (H2StreamStateMachine.window_updated, StreamState.OPEN),
646    (StreamState.OPEN, StreamInputs.SEND_RST_STREAM):
647        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
648    (StreamState.OPEN, StreamInputs.RECV_RST_STREAM):
649        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
650    (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE):
651        (H2StreamStateMachine.send_push_promise, StreamState.OPEN),
652    (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE):
653        (H2StreamStateMachine.recv_push_promise, StreamState.OPEN),
654    (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS):
655        (H2StreamStateMachine.send_informational_response, StreamState.OPEN),
656    (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS):
657        (H2StreamStateMachine.recv_informational_response, StreamState.OPEN),
658    (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE):
659        (H2StreamStateMachine.send_alt_svc, StreamState.OPEN),
660    (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE):
661        (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN),
662
663    # State: half-closed remote
664    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS):
665        (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
666    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS):
667        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
668    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA):
669        (None, StreamState.HALF_CLOSED_REMOTE),
670    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA):
671        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
672    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM):
673        (H2StreamStateMachine.send_end_stream, StreamState.CLOSED),
674    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
675        (None, StreamState.HALF_CLOSED_REMOTE),
676    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
677        (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE),
678    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM):
679        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
680    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM):
681        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
682    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE):
683        (H2StreamStateMachine.send_push_promise,
684            StreamState.HALF_CLOSED_REMOTE),
685    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE):
686        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
687    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS):
688        (H2StreamStateMachine.send_informational_response,
689            StreamState.HALF_CLOSED_REMOTE),
690    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE):
691        (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE),
692    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
693        (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE),
694
695    # State: half-closed local
696    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS):
697        (H2StreamStateMachine.response_received,
698            StreamState.HALF_CLOSED_LOCAL),
699    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA):
700        (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL),
701    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM):
702        (H2StreamStateMachine.stream_ended, StreamState.CLOSED),
703    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
704        (None, StreamState.HALF_CLOSED_LOCAL),
705    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
706        (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL),
707    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM):
708        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
709    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM):
710        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
711    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE):
712        (H2StreamStateMachine.recv_push_promise,
713            StreamState.HALF_CLOSED_LOCAL),
714    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS):
715        (H2StreamStateMachine.recv_informational_response,
716            StreamState.HALF_CLOSED_LOCAL),
717    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
718        (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL),
719    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
720        (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL),
721
722    # State: closed
723    (StreamState.CLOSED, StreamInputs.RECV_END_STREAM):
724        (None, StreamState.CLOSED),
725    (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE):
726        (None, StreamState.CLOSED),
727
728    # RFC 7540 Section 5.1 defines how the end point should react when
729    # receiving a frame on a closed stream with the following statements:
730    #
731    # > An endpoint that receives any frame other than PRIORITY after receiving
732    # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED.
733    # > An endpoint that receives any frames after receiving a frame with the
734    # > END_STREAM flag set MUST treat that as a connection error of type
735    # > STREAM_CLOSED.
736    (StreamState.CLOSED, StreamInputs.RECV_HEADERS):
737        (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
738    (StreamState.CLOSED, StreamInputs.RECV_DATA):
739        (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
740
741    # > WINDOW_UPDATE or RST_STREAM frames can be received in this state
742    # > for a short period after a DATA or HEADERS frame containing a
743    # > END_STREAM flag is sent.
744    (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE):
745        (H2StreamStateMachine.window_on_closed_stream, StreamState.CLOSED),
746    (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM):
747        (H2StreamStateMachine.reset_on_closed_stream, StreamState.CLOSED),
748
749    # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is
750    # > neither "open" nor "half-closed (local)" as a connection error of type
751    # > PROTOCOL_ERROR.
752    (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE):
753        (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED),
754
755    # Also, users should be forbidden from sending on closed streams.
756    (StreamState.CLOSED, StreamInputs.SEND_HEADERS):
757        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
758    (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE):
759        (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED),
760    (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM):
761        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
762    (StreamState.CLOSED, StreamInputs.SEND_DATA):
763        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
764    (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE):
765        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
766    (StreamState.CLOSED, StreamInputs.SEND_END_STREAM):
767        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
768}
769
770
771class H2Stream(object):
772    """
773    A low-level HTTP/2 stream object. This handles building and receiving
774    frames and maintains per-stream state.
775
776    This wraps a HTTP/2 Stream state machine implementation, ensuring that
777    frames can only be sent/received when the stream is in a valid state.
778    Attempts to create frames that cannot be sent will raise a
779    ``ProtocolError``.
780    """
781    def __init__(self,
782                 stream_id,
783                 config,
784                 inbound_window_size,
785                 outbound_window_size):
786        self.state_machine = H2StreamStateMachine(stream_id)
787        self.stream_id = stream_id
788        self.max_outbound_frame_size = None
789        self.request_method = None
790
791        # The curent value of the outbound stream flow control window
792        self.outbound_flow_control_window = outbound_window_size
793
794        # The flow control manager.
795        self._inbound_window_manager = WindowManager(inbound_window_size)
796
797        # The expected content length, if any.
798        self._expected_content_length = None
799
800        # The actual received content length. Always tracked.
801        self._actual_content_length = 0
802
803        # The authority we believe this stream belongs to.
804        self._authority = None
805
806        # The configuration for this stream.
807        self.config = config
808
809    def __repr__(self):
810        return "<%s id:%d state:%r>" % (
811            type(self).__name__,
812            self.stream_id,
813            self.state_machine.state
814        )
815
816    @property
817    def inbound_flow_control_window(self):
818        """
819        The size of the inbound flow control window for the stream. This is
820        rarely publicly useful: instead, use :meth:`remote_flow_control_window
821        <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is
822        largely present to provide a shortcut to this data.
823        """
824        return self._inbound_window_manager.current_window_size
825
826    @property
827    def open(self):
828        """
829        Whether the stream is 'open' in any sense: that is, whether it counts
830        against the number of concurrent streams.
831        """
832        # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either
833        # the OPEN state or either of the HALF_CLOSED states. Perplexingly,
834        # this excludes the reserved states.
835        # For more detail on why we're doing this in this slightly weird way,
836        # see the comment on ``STREAM_OPEN`` at the top of the file.
837        return STREAM_OPEN[self.state_machine.state]
838
839    @property
840    def closed(self):
841        """
842        Whether the stream is closed.
843        """
844        return self.state_machine.state == StreamState.CLOSED
845
846    @property
847    def closed_by(self):
848        """
849        Returns how the stream was closed, as one of StreamClosedBy.
850        """
851        return self.state_machine.stream_closed_by
852
853    def upgrade(self, client_side):
854        """
855        Called by the connection to indicate that this stream is the initial
856        request/response of an upgraded connection. Places the stream into an
857        appropriate state.
858        """
859        self.config.logger.debug("Upgrading %r", self)
860
861        assert self.stream_id == 1
862        input_ = (
863            StreamInputs.UPGRADE_CLIENT if client_side
864            else StreamInputs.UPGRADE_SERVER
865        )
866
867        # This may return events, we deliberately don't want them.
868        self.state_machine.process_input(input_)
869        return
870
871    def send_headers(self, headers, encoder, end_stream=False):
872        """
873        Returns a list of HEADERS/CONTINUATION frames to emit as either headers
874        or trailers.
875        """
876        self.config.logger.debug("Send headers %s on %r", headers, self)
877
878        # Because encoding headers makes an irreversible change to the header
879        # compression context, we make the state transition before we encode
880        # them.
881
882        # First, check if we're a client. If we are, no problem: if we aren't,
883        # we need to scan the header block to see if this is an informational
884        # response.
885        input_ = StreamInputs.SEND_HEADERS
886        if ((not self.state_machine.client) and
887                is_informational_response(headers)):
888            if end_stream:
889                raise ProtocolError(
890                    "Cannot set END_STREAM on informational responses."
891                )
892
893            input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS
894
895        events = self.state_machine.process_input(input_)
896
897        hf = HeadersFrame(self.stream_id)
898        hdr_validation_flags = self._build_hdr_validation_flags(events)
899        frames = self._build_headers_frames(
900            headers, encoder, hf, hdr_validation_flags
901        )
902
903        if end_stream:
904            # Not a bug: the END_STREAM flag is valid on the initial HEADERS
905            # frame, not the CONTINUATION frames that follow.
906            self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
907            frames[0].flags.add('END_STREAM')
908
909        if self.state_machine.trailers_sent and not end_stream:
910            raise ProtocolError("Trailers must have END_STREAM set.")
911
912        if self.state_machine.client and self._authority is None:
913            self._authority = authority_from_headers(headers)
914
915        # store request method for _initialize_content_length
916        self.request_method = extract_method_header(headers)
917
918        return frames
919
920    def push_stream_in_band(self, related_stream_id, headers, encoder):
921        """
922        Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed
923        stream header. Called on the stream that has the PUSH_PROMISE frame
924        sent on it.
925        """
926        self.config.logger.debug("Push stream %r", self)
927
928        # Because encoding headers makes an irreversible change to the header
929        # compression context, we make the state transition *first*.
930
931        events = self.state_machine.process_input(
932            StreamInputs.SEND_PUSH_PROMISE
933        )
934
935        ppf = PushPromiseFrame(self.stream_id)
936        ppf.promised_stream_id = related_stream_id
937        hdr_validation_flags = self._build_hdr_validation_flags(events)
938        frames = self._build_headers_frames(
939            headers, encoder, ppf, hdr_validation_flags
940        )
941
942        return frames
943
944    def locally_pushed(self):
945        """
946        Mark this stream as one that was pushed by this peer. Must be called
947        immediately after initialization. Sends no frames, simply updates the
948        state machine.
949        """
950        # This does not trigger any events.
951        events = self.state_machine.process_input(
952            StreamInputs.SEND_PUSH_PROMISE
953        )
954        assert not events
955        return []
956
957    def send_data(self, data, end_stream=False, pad_length=None):
958        """
959        Prepare some data frames. Optionally end the stream.
960
961        .. warning:: Does not perform flow control checks.
962        """
963        self.config.logger.debug(
964            "Send data on %r with end stream set to %s", self, end_stream
965        )
966
967        self.state_machine.process_input(StreamInputs.SEND_DATA)
968
969        df = DataFrame(self.stream_id)
970        df.data = data
971        if end_stream:
972            self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
973            df.flags.add('END_STREAM')
974        if pad_length is not None:
975            df.flags.add('PADDED')
976            df.pad_length = pad_length
977
978        # Subtract flow_controlled_length to account for possible padding
979        self.outbound_flow_control_window -= df.flow_controlled_length
980        assert self.outbound_flow_control_window >= 0
981
982        return [df]
983
984    def end_stream(self):
985        """
986        End a stream without sending data.
987        """
988        self.config.logger.debug("End stream %r", self)
989
990        self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
991        df = DataFrame(self.stream_id)
992        df.flags.add('END_STREAM')
993        return [df]
994
995    def advertise_alternative_service(self, field_value):
996        """
997        Advertise an RFC 7838 alternative service. The semantics of this are
998        better documented in the ``H2Connection`` class.
999        """
1000        self.config.logger.debug(
1001            "Advertise alternative service of %r for %r", field_value, self
1002        )
1003        self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE)
1004        asf = AltSvcFrame(self.stream_id)
1005        asf.field = field_value
1006        return [asf]
1007
1008    def increase_flow_control_window(self, increment):
1009        """
1010        Increase the size of the flow control window for the remote side.
1011        """
1012        self.config.logger.debug(
1013            "Increase flow control window for %r by %d",
1014            self, increment
1015        )
1016        self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE)
1017        self._inbound_window_manager.window_opened(increment)
1018
1019        wuf = WindowUpdateFrame(self.stream_id)
1020        wuf.window_increment = increment
1021        return [wuf]
1022
1023    def receive_push_promise_in_band(self,
1024                                     promised_stream_id,
1025                                     headers,
1026                                     header_encoding):
1027        """
1028        Receives a push promise frame sent on this stream, pushing a remote
1029        stream. This is called on the stream that has the PUSH_PROMISE sent
1030        on it.
1031        """
1032        self.config.logger.debug(
1033            "Receive Push Promise on %r for remote stream %d",
1034            self, promised_stream_id
1035        )
1036        events = self.state_machine.process_input(
1037            StreamInputs.RECV_PUSH_PROMISE
1038        )
1039        events[0].pushed_stream_id = promised_stream_id
1040
1041        hdr_validation_flags = self._build_hdr_validation_flags(events)
1042        events[0].headers = self._process_received_headers(
1043            headers, hdr_validation_flags, header_encoding
1044        )
1045        return [], events
1046
1047    def remotely_pushed(self, pushed_headers):
1048        """
1049        Mark this stream as one that was pushed by the remote peer. Must be
1050        called immediately after initialization. Sends no frames, simply
1051        updates the state machine.
1052        """
1053        self.config.logger.debug("%r pushed by remote peer", self)
1054        events = self.state_machine.process_input(
1055            StreamInputs.RECV_PUSH_PROMISE
1056        )
1057        self._authority = authority_from_headers(pushed_headers)
1058        return [], events
1059
1060    def receive_headers(self, headers, end_stream, header_encoding):
1061        """
1062        Receive a set of headers (or trailers).
1063        """
1064        if is_informational_response(headers):
1065            if end_stream:
1066                raise ProtocolError(
1067                    "Cannot set END_STREAM on informational responses"
1068                )
1069            input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS
1070        else:
1071            input_ = StreamInputs.RECV_HEADERS
1072
1073        events = self.state_machine.process_input(input_)
1074
1075        if end_stream:
1076            es_events = self.state_machine.process_input(
1077                StreamInputs.RECV_END_STREAM
1078            )
1079            events[0].stream_ended = es_events[0]
1080            events += es_events
1081
1082        self._initialize_content_length(headers)
1083
1084        if isinstance(events[0], TrailersReceived):
1085            if not end_stream:
1086                raise ProtocolError("Trailers must have END_STREAM set")
1087
1088        hdr_validation_flags = self._build_hdr_validation_flags(events)
1089        events[0].headers = self._process_received_headers(
1090            headers, hdr_validation_flags, header_encoding
1091        )
1092        return [], events
1093
1094    def receive_data(self, data, end_stream, flow_control_len):
1095        """
1096        Receive some data.
1097        """
1098        self.config.logger.debug(
1099            "Receive data on %r with end stream %s and flow control length "
1100            "set to %d", self, end_stream, flow_control_len
1101        )
1102        events = self.state_machine.process_input(StreamInputs.RECV_DATA)
1103        self._inbound_window_manager.window_consumed(flow_control_len)
1104        self._track_content_length(len(data), end_stream)
1105
1106        if end_stream:
1107            es_events = self.state_machine.process_input(
1108                StreamInputs.RECV_END_STREAM
1109            )
1110            events[0].stream_ended = es_events[0]
1111            events.extend(es_events)
1112
1113        events[0].data = data
1114        events[0].flow_controlled_length = flow_control_len
1115        return [], events
1116
1117    def receive_window_update(self, increment):
1118        """
1119        Handle a WINDOW_UPDATE increment.
1120        """
1121        self.config.logger.debug(
1122            "Receive Window Update on %r for increment of %d",
1123            self, increment
1124        )
1125        events = self.state_machine.process_input(
1126            StreamInputs.RECV_WINDOW_UPDATE
1127        )
1128        frames = []
1129
1130        # If we encounter a problem with incrementing the flow control window,
1131        # this should be treated as a *stream* error, not a *connection* error.
1132        # That means we need to catch the error and forcibly close the stream.
1133        if events:
1134            events[0].delta = increment
1135            try:
1136                self.outbound_flow_control_window = guard_increment_window(
1137                    self.outbound_flow_control_window,
1138                    increment
1139                )
1140            except FlowControlError:
1141                # Ok, this is bad. We're going to need to perform a local
1142                # reset.
1143                event = StreamReset()
1144                event.stream_id = self.stream_id
1145                event.error_code = ErrorCodes.FLOW_CONTROL_ERROR
1146                event.remote_reset = False
1147
1148                events = [event]
1149                frames = self.reset_stream(event.error_code)
1150
1151        return frames, events
1152
1153    def receive_continuation(self):
1154        """
1155        A naked CONTINUATION frame has been received. This is always an error,
1156        but the type of error it is depends on the state of the stream and must
1157        transition the state of the stream, so we need to handle it.
1158        """
1159        self.config.logger.debug("Receive Continuation frame on %r", self)
1160        self.state_machine.process_input(
1161            StreamInputs.RECV_CONTINUATION
1162        )
1163        assert False, "Should not be reachable"
1164
1165    def receive_alt_svc(self, frame):
1166        """
1167        An Alternative Service frame was received on the stream. This frame
1168        inherits the origin associated with this stream.
1169        """
1170        self.config.logger.debug(
1171            "Receive Alternative Service frame on stream %r", self
1172        )
1173
1174        # If the origin is present, RFC 7838 says we have to ignore it.
1175        if frame.origin:
1176            return [], []
1177
1178        events = self.state_machine.process_input(
1179            StreamInputs.RECV_ALTERNATIVE_SERVICE
1180        )
1181
1182        # There are lots of situations where we want to ignore the ALTSVC
1183        # frame. If we need to pay attention, we'll have an event and should
1184        # fill it out.
1185        if events:
1186            assert isinstance(events[0], AlternativeServiceAvailable)
1187            events[0].origin = self._authority
1188            events[0].field_value = frame.field
1189
1190        return [], events
1191
1192    def reset_stream(self, error_code=0):
1193        """
1194        Close the stream locally. Reset the stream with an error code.
1195        """
1196        self.config.logger.debug(
1197            "Local reset %r with error code: %d", self, error_code
1198        )
1199        self.state_machine.process_input(StreamInputs.SEND_RST_STREAM)
1200
1201        rsf = RstStreamFrame(self.stream_id)
1202        rsf.error_code = error_code
1203        return [rsf]
1204
1205    def stream_reset(self, frame):
1206        """
1207        Handle a stream being reset remotely.
1208        """
1209        self.config.logger.debug(
1210            "Remote reset %r with error code: %d", self, frame.error_code
1211        )
1212        events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM)
1213
1214        if events:
1215            # We don't fire an event if this stream is already closed.
1216            events[0].error_code = _error_code_from_int(frame.error_code)
1217
1218        return [], events
1219
1220    def acknowledge_received_data(self, acknowledged_size):
1221        """
1222        The user has informed us that they've processed some amount of data
1223        that was received on this stream. Pass that to the window manager and
1224        potentially return some WindowUpdate frames.
1225        """
1226        self.config.logger.debug(
1227            "Acknowledge received data with size %d on %r",
1228            acknowledged_size, self
1229        )
1230        increment = self._inbound_window_manager.process_bytes(
1231            acknowledged_size
1232        )
1233        if increment:
1234            f = WindowUpdateFrame(self.stream_id)
1235            f.window_increment = increment
1236            return [f]
1237
1238        return []
1239
1240    def _build_hdr_validation_flags(self, events):
1241        """
1242        Constructs a set of header validation flags for use when normalizing
1243        and validating header blocks.
1244        """
1245        is_trailer = isinstance(
1246            events[0], (_TrailersSent, TrailersReceived)
1247        )
1248        is_response_header = isinstance(
1249            events[0],
1250            (
1251                _ResponseSent,
1252                ResponseReceived,
1253                InformationalResponseReceived
1254            )
1255        )
1256        is_push_promise = isinstance(
1257            events[0], (PushedStreamReceived, _PushedRequestSent)
1258        )
1259
1260        return HeaderValidationFlags(
1261            is_client=self.state_machine.client,
1262            is_trailer=is_trailer,
1263            is_response_header=is_response_header,
1264            is_push_promise=is_push_promise,
1265        )
1266
1267    def _build_headers_frames(self,
1268                              headers,
1269                              encoder,
1270                              first_frame,
1271                              hdr_validation_flags):
1272        """
1273        Helper method to build headers or push promise frames.
1274        """
1275        # We need to lowercase the header names, and to ensure that secure
1276        # header fields are kept out of compression contexts.
1277        if self.config.normalize_outbound_headers:
1278            headers = normalize_outbound_headers(
1279                headers, hdr_validation_flags
1280            )
1281        if self.config.validate_outbound_headers:
1282            headers = validate_outbound_headers(
1283                headers, hdr_validation_flags
1284            )
1285
1286        encoded_headers = encoder.encode(headers)
1287
1288        # Slice into blocks of max_outbound_frame_size. Be careful with this:
1289        # it only works right because we never send padded frames or priority
1290        # information on the frames. Revisit this if we do.
1291        header_blocks = [
1292            encoded_headers[i:i+self.max_outbound_frame_size]
1293            for i in range(
1294                0, len(encoded_headers), self.max_outbound_frame_size
1295            )
1296        ]
1297
1298        frames = []
1299        first_frame.data = header_blocks[0]
1300        frames.append(first_frame)
1301
1302        for block in header_blocks[1:]:
1303            cf = ContinuationFrame(self.stream_id)
1304            cf.data = block
1305            frames.append(cf)
1306
1307        frames[-1].flags.add('END_HEADERS')
1308        return frames
1309
1310    def _process_received_headers(self,
1311                                  headers,
1312                                  header_validation_flags,
1313                                  header_encoding):
1314        """
1315        When headers have been received from the remote peer, run a processing
1316        pipeline on them to transform them into the appropriate form for
1317        attaching to an event.
1318        """
1319        if self.config.normalize_inbound_headers:
1320            headers = normalize_inbound_headers(
1321                headers, header_validation_flags
1322            )
1323
1324        if self.config.validate_inbound_headers:
1325            headers = validate_headers(headers, header_validation_flags)
1326
1327        if header_encoding:
1328            headers = _decode_headers(headers, header_encoding)
1329
1330        # The above steps are all generators, so we need to concretize the
1331        # headers now.
1332        return list(headers)
1333
1334    def _initialize_content_length(self, headers):
1335        """
1336        Checks the headers for a content-length header and initializes the
1337        _expected_content_length field from it. It's not an error for no
1338        Content-Length header to be present.
1339        """
1340        if self.request_method == b'HEAD':
1341            self._expected_content_length = 0
1342            return
1343
1344        for n, v in headers:
1345            if n == b'content-length':
1346                try:
1347                    self._expected_content_length = int(v, 10)
1348                except ValueError:
1349                    raise ProtocolError(
1350                        "Invalid content-length header: %s" % v
1351                    )
1352
1353                return
1354
1355    def _track_content_length(self, length, end_stream):
1356        """
1357        Update the expected content length in response to data being received.
1358        Validates that the appropriate amount of data is sent. Always updates
1359        the received data, but only validates the length against the
1360        content-length header if one was sent.
1361
1362        :param length: The length of the body chunk received.
1363        :param end_stream: If this is the last body chunk received.
1364        """
1365        self._actual_content_length += length
1366        actual = self._actual_content_length
1367        expected = self._expected_content_length
1368
1369        if expected is not None:
1370            if expected < actual:
1371                raise InvalidBodyLengthError(expected, actual)
1372
1373            if end_stream and expected != actual:
1374                raise InvalidBodyLengthError(expected, actual)
1375
1376    def _inbound_flow_control_change_from_settings(self, delta):
1377        """
1378        We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to
1379        update the target window size for flow control. For our flow control
1380        strategy, this means we need to do two things: we need to adjust the
1381        current window size, but we also need to set the target maximum window
1382        size to the new value.
1383        """
1384        new_max_size = self._inbound_window_manager.max_window_size + delta
1385        self._inbound_window_manager.window_opened(delta)
1386        self._inbound_window_manager.max_window_size = new_max_size
1387
1388
1389def _decode_headers(headers, encoding):
1390    """
1391    Given an iterable of header two-tuples and an encoding, decodes those
1392    headers using that encoding while preserving the type of the header tuple.
1393    This ensures that the use of ``HeaderTuple`` is preserved.
1394    """
1395    for header in headers:
1396        # This function expects to work on decoded headers, which are always
1397        # HeaderTuple objects.
1398        assert isinstance(header, HeaderTuple)
1399
1400        name, value = header
1401        name = name.decode(encoding)
1402        value = value.decode(encoding)
1403        yield header.__class__(name, value)
1404