1# -*- coding: utf-8 -*-
2"""
3h2/stream
4~~~~~~~~~
5
6An implementation of a HTTP/2 stream.
7"""
8from enum import Enum, IntEnum
9from hpack import HeaderTuple
10from hyperframe.frame import (
11    HeadersFrame, ContinuationFrame, DataFrame, WindowUpdateFrame,
12    RstStreamFrame, PushPromiseFrame, AltSvcFrame
13)
14
15from .errors import ErrorCodes, _error_code_from_int
16from .events import (
17    RequestReceived, ResponseReceived, DataReceived, WindowUpdated,
18    StreamEnded, PushedStreamReceived, StreamReset, TrailersReceived,
19    InformationalResponseReceived, AlternativeServiceAvailable,
20    _ResponseSent, _RequestSent, _TrailersSent, _PushedRequestSent
21)
22from .exceptions import (
23    ProtocolError, StreamClosedError, InvalidBodyLengthError, FlowControlError
24)
25from .utilities import (
26    guard_increment_window, is_informational_response, authority_from_headers,
27    validate_headers, validate_outbound_headers, normalize_outbound_headers,
28    HeaderValidationFlags, extract_method_header, normalize_inbound_headers
29)
30from .windows import WindowManager
31
32
33class StreamState(IntEnum):
34    IDLE = 0
35    RESERVED_REMOTE = 1
36    RESERVED_LOCAL = 2
37    OPEN = 3
38    HALF_CLOSED_REMOTE = 4
39    HALF_CLOSED_LOCAL = 5
40    CLOSED = 6
41
42
43class StreamInputs(Enum):
44    SEND_HEADERS = 0
45    SEND_PUSH_PROMISE = 1
46    SEND_RST_STREAM = 2
47    SEND_DATA = 3
48    SEND_WINDOW_UPDATE = 4
49    SEND_END_STREAM = 5
50    RECV_HEADERS = 6
51    RECV_PUSH_PROMISE = 7
52    RECV_RST_STREAM = 8
53    RECV_DATA = 9
54    RECV_WINDOW_UPDATE = 10
55    RECV_END_STREAM = 11
56    RECV_CONTINUATION = 12  # Added in 2.0.0
57    SEND_INFORMATIONAL_HEADERS = 13  # Added in 2.2.0
58    RECV_INFORMATIONAL_HEADERS = 14  # Added in 2.2.0
59    SEND_ALTERNATIVE_SERVICE = 15  # Added in 2.3.0
60    RECV_ALTERNATIVE_SERVICE = 16  # Added in 2.3.0
61    UPGRADE_CLIENT = 17  # Added 2.3.0
62    UPGRADE_SERVER = 18  # Added 2.3.0
63
64
65class StreamClosedBy(Enum):
66    SEND_END_STREAM = 0
67    RECV_END_STREAM = 1
68    SEND_RST_STREAM = 2
69    RECV_RST_STREAM = 3
70
71
72# This array is initialized once, and is indexed by the stream states above.
73# It indicates whether a stream in the given state is open. The reason we do
74# this is that we potentially check whether a stream in a given state is open
75# quite frequently: given that we check so often, we should do so in the
76# fastest and most performant way possible.
77STREAM_OPEN = [False for _ in range(0, len(StreamState))]
78STREAM_OPEN[StreamState.OPEN] = True
79STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True
80STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True
81
82
83class H2StreamStateMachine:
84    """
85    A single HTTP/2 stream state machine.
86
87    This stream object implements basically the state machine described in
88    RFC 7540 section 5.1.
89
90    :param stream_id: The stream ID of this stream. This is stored primarily
91        for logging purposes.
92    """
93    def __init__(self, stream_id):
94        self.state = StreamState.IDLE
95        self.stream_id = stream_id
96
97        #: Whether this peer is the client side of this stream.
98        self.client = None
99
100        # Whether trailers have been sent/received on this stream or not.
101        self.headers_sent = None
102        self.trailers_sent = None
103        self.headers_received = None
104        self.trailers_received = None
105
106        # How the stream was closed. One of StreamClosedBy.
107        self.stream_closed_by = None
108
109    def process_input(self, input_):
110        """
111        Process a specific input in the state machine.
112        """
113        if not isinstance(input_, StreamInputs):
114            raise ValueError("Input must be an instance of StreamInputs")
115
116        try:
117            func, target_state = _transitions[(self.state, input_)]
118        except KeyError:
119            old_state = self.state
120            self.state = StreamState.CLOSED
121            raise ProtocolError(
122                "Invalid input %s in state %s" % (input_, old_state)
123            )
124        else:
125            previous_state = self.state
126            self.state = target_state
127            if func is not None:
128                try:
129                    return func(self, previous_state)
130                except ProtocolError:
131                    self.state = StreamState.CLOSED
132                    raise
133                except AssertionError as e:  # pragma: no cover
134                    self.state = StreamState.CLOSED
135                    raise ProtocolError(e)
136
137            return []
138
139    def request_sent(self, previous_state):
140        """
141        Fires when a request is sent.
142        """
143        self.client = True
144        self.headers_sent = True
145        event = _RequestSent()
146
147        return [event]
148
149    def response_sent(self, previous_state):
150        """
151        Fires when something that should be a response is sent. This 'response'
152        may actually be trailers.
153        """
154        if not self.headers_sent:
155            if self.client is True or self.client is None:
156                raise ProtocolError("Client cannot send responses.")
157            self.headers_sent = True
158            event = _ResponseSent()
159        else:
160            assert not self.trailers_sent
161            self.trailers_sent = True
162            event = _TrailersSent()
163
164        return [event]
165
166    def request_received(self, previous_state):
167        """
168        Fires when a request is received.
169        """
170        assert not self.headers_received
171        assert not self.trailers_received
172
173        self.client = False
174        self.headers_received = True
175        event = RequestReceived()
176
177        event.stream_id = self.stream_id
178        return [event]
179
180    def response_received(self, previous_state):
181        """
182        Fires when a response is received. Also disambiguates between responses
183        and trailers.
184        """
185        if not self.headers_received:
186            assert self.client is True
187            self.headers_received = True
188            event = ResponseReceived()
189        else:
190            assert not self.trailers_received
191            self.trailers_received = True
192            event = TrailersReceived()
193
194        event.stream_id = self.stream_id
195        return [event]
196
197    def data_received(self, previous_state):
198        """
199        Fires when data is received.
200        """
201        if not self.headers_received:
202            raise ProtocolError("cannot receive data before headers")
203        event = DataReceived()
204        event.stream_id = self.stream_id
205        return [event]
206
207    def window_updated(self, previous_state):
208        """
209        Fires when a window update frame is received.
210        """
211        event = WindowUpdated()
212        event.stream_id = self.stream_id
213        return [event]
214
215    def stream_half_closed(self, previous_state):
216        """
217        Fires when an END_STREAM flag is received in the OPEN state,
218        transitioning this stream to a HALF_CLOSED_REMOTE state.
219        """
220        event = StreamEnded()
221        event.stream_id = self.stream_id
222        return [event]
223
224    def stream_ended(self, previous_state):
225        """
226        Fires when a stream is cleanly ended.
227        """
228        self.stream_closed_by = StreamClosedBy.RECV_END_STREAM
229        event = StreamEnded()
230        event.stream_id = self.stream_id
231        return [event]
232
233    def stream_reset(self, previous_state):
234        """
235        Fired when a stream is forcefully reset.
236        """
237        self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM
238        event = StreamReset()
239        event.stream_id = self.stream_id
240        return [event]
241
242    def send_new_pushed_stream(self, previous_state):
243        """
244        Fires on the newly pushed stream, when pushed by the local peer.
245
246        No event here, but definitionally this peer must be a server.
247        """
248        assert self.client is None
249        self.client = False
250        self.headers_received = True
251        return []
252
253    def recv_new_pushed_stream(self, previous_state):
254        """
255        Fires on the newly pushed stream, when pushed by the remote peer.
256
257        No event here, but definitionally this peer must be a client.
258        """
259        assert self.client is None
260        self.client = True
261        self.headers_sent = True
262        return []
263
264    def send_push_promise(self, previous_state):
265        """
266        Fires on the already-existing stream when a PUSH_PROMISE frame is sent.
267        We may only send PUSH_PROMISE frames if we're a server.
268        """
269        if self.client is True:
270            raise ProtocolError("Cannot push streams from client peers.")
271
272        event = _PushedRequestSent()
273        return [event]
274
275    def recv_push_promise(self, previous_state):
276        """
277        Fires on the already-existing stream when a PUSH_PROMISE frame is
278        received. We may only receive PUSH_PROMISE frames if we're a client.
279
280        Fires a PushedStreamReceived event.
281        """
282        if not self.client:
283            if self.client is None:  # pragma: no cover
284                msg = "Idle streams cannot receive pushes"
285            else:  # pragma: no cover
286                msg = "Cannot receive pushed streams as a server"
287            raise ProtocolError(msg)
288
289        event = PushedStreamReceived()
290        event.parent_stream_id = self.stream_id
291        return [event]
292
293    def send_end_stream(self, previous_state):
294        """
295        Called when an attempt is made to send END_STREAM in the
296        HALF_CLOSED_REMOTE state.
297        """
298        self.stream_closed_by = StreamClosedBy.SEND_END_STREAM
299
300    def send_reset_stream(self, previous_state):
301        """
302        Called when an attempt is made to send RST_STREAM in a non-closed
303        stream state.
304        """
305        self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
306
307    def reset_stream_on_error(self, previous_state):
308        """
309        Called when we need to forcefully emit another RST_STREAM frame on
310        behalf of the state machine.
311
312        If this is the first time we've done this, we should also hang an event
313        off the StreamClosedError so that the user can be informed. We know
314        it's the first time we've done this if the stream is currently in a
315        state other than CLOSED.
316        """
317        self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
318
319        error = StreamClosedError(self.stream_id)
320
321        event = StreamReset()
322        event.stream_id = self.stream_id
323        event.error_code = ErrorCodes.STREAM_CLOSED
324        event.remote_reset = False
325        error._events = [event]
326        raise error
327
328    def recv_on_closed_stream(self, previous_state):
329        """
330        Called when an unexpected frame is received on an already-closed
331        stream.
332
333        An endpoint that receives an unexpected frame should treat it as
334        a stream error or connection error with type STREAM_CLOSED, depending
335        on the specific frame. The error handling is done at a higher level:
336        this just raises the appropriate error.
337        """
338        raise StreamClosedError(self.stream_id)
339
340    def send_on_closed_stream(self, previous_state):
341        """
342        Called when an attempt is made to send data on an already-closed
343        stream.
344
345        This essentially overrides the standard logic by throwing a
346        more-specific error: StreamClosedError. This is a ProtocolError, so it
347        matches the standard API of the state machine, but provides more detail
348        to the user.
349        """
350        raise StreamClosedError(self.stream_id)
351
352    def recv_push_on_closed_stream(self, previous_state):
353        """
354        Called when a PUSH_PROMISE frame is received on a full stop
355        stream.
356
357        If the stream was closed by us sending a RST_STREAM frame, then we
358        presume that the PUSH_PROMISE was in flight when we reset the parent
359        stream. Rathen than accept the new stream, we just reset it.
360        Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a
361        naturally closed stream is a real problem because it creates a brand
362        new stream that the remote peer now believes exists.
363        """
364        assert self.stream_closed_by is not None
365
366        if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM:
367            raise StreamClosedError(self.stream_id)
368        else:
369            raise ProtocolError("Attempted to push on closed stream.")
370
371    def send_push_on_closed_stream(self, previous_state):
372        """
373        Called when an attempt is made to push on an already-closed stream.
374
375        This essentially overrides the standard logic by providing a more
376        useful error message. It's necessary because simply indicating that the
377        stream is closed is not enough: there is now a new stream that is not
378        allowed to be there. The only recourse is to tear the whole connection
379        down.
380        """
381        raise ProtocolError("Attempted to push on closed stream.")
382
383    def send_informational_response(self, previous_state):
384        """
385        Called when an informational header block is sent (that is, a block
386        where the :status header has a 1XX value).
387
388        Only enforces that these are sent *before* final headers are sent.
389        """
390        if self.headers_sent:
391            raise ProtocolError("Information response after final response")
392
393        event = _ResponseSent()
394        return [event]
395
396    def recv_informational_response(self, previous_state):
397        """
398        Called when an informational header block is received (that is, a block
399        where the :status header has a 1XX value).
400        """
401        if self.headers_received:
402            raise ProtocolError("Informational response after final response")
403
404        event = InformationalResponseReceived()
405        event.stream_id = self.stream_id
406        return [event]
407
408    def recv_alt_svc(self, previous_state):
409        """
410        Called when receiving an ALTSVC frame.
411
412        RFC 7838 allows us to receive ALTSVC frames at any stream state, which
413        is really absurdly overzealous. For that reason, we want to limit the
414        states in which we can actually receive it. It's really only sensible
415        to receive it after we've sent our own headers and before the server
416        has sent its header block: the server can't guarantee that we have any
417        state around after it completes its header block, and the server
418        doesn't know what origin we're talking about before we've sent ours.
419
420        For that reason, this function applies a few extra checks on both state
421        and some of the little state variables we keep around. If those suggest
422        an unreasonable situation for the ALTSVC frame to have been sent in,
423        we quietly ignore it (as RFC 7838 suggests).
424
425        This function is also *not* always called by the state machine. In some
426        states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it,
427        because we know the frame cannot be valid in that state (IDLE because
428        the server cannot know what origin the stream applies to, CLOSED
429        because the server cannot assume we still have state around,
430        RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL
431        state then *we* are the server).
432        """
433        # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore
434        # them.
435        if self.client is False:
436            return []
437
438        # If we've received the response headers from the server they can't
439        # guarantee we still have any state around. Other implementations
440        # (like nghttp2) ignore ALTSVC in this state, so we will too.
441        if self.headers_received:
442            return []
443
444        # Otherwise, this is a sensible enough frame to have received. Return
445        # the event and let it get populated.
446        return [AlternativeServiceAvailable()]
447
448    def send_alt_svc(self, previous_state):
449        """
450        Called when sending an ALTSVC frame on this stream.
451
452        For consistency with the restrictions we apply on receiving ALTSVC
453        frames in ``recv_alt_svc``, we want to restrict when users can send
454        ALTSVC frames to the situations when we ourselves would accept them.
455
456        That means: when we are a server, when we have received the request
457        headers, and when we have not yet sent our own response headers.
458        """
459        # We should not send ALTSVC after we've sent response headers, as the
460        # client may have disposed of its state.
461        if self.headers_sent:
462            raise ProtocolError(
463                "Cannot send ALTSVC after sending response headers."
464            )
465
466        return
467
468
469# STATE MACHINE
470#
471# The stream state machine is defined here to avoid the need to allocate it
472# repeatedly for each stream. It cannot be defined in the stream class because
473# it needs to be able to reference the callbacks defined on the class, but
474# because Python's scoping rules are weird the class object is not actually in
475# scope during the body of the class object.
476#
477# For the sake of clarity, we reproduce the RFC 7540 state machine here:
478#
479#                          +--------+
480#                  send PP |        | recv PP
481#                 ,--------|  idle  |--------.
482#                /         |        |         \
483#               v          +--------+          v
484#        +----------+          |           +----------+
485#        |          |          | send H /  |          |
486# ,------| reserved |          | recv H    | reserved |------.
487# |      | (local)  |          |           | (remote) |      |
488# |      +----------+          v           +----------+      |
489# |          |             +--------+             |          |
490# |          |     recv ES |        | send ES     |          |
491# |   send H |     ,-------|  open  |-------.     | recv H   |
492# |          |    /        |        |        \    |          |
493# |          v   v         +--------+         v   v          |
494# |      +----------+          |           +----------+      |
495# |      |   half   |          |           |   half   |      |
496# |      |  closed  |          | send R /  |  closed  |      |
497# |      | (remote) |          | recv R    | (local)  |      |
498# |      +----------+          |           +----------+      |
499# |           |                |                 |           |
500# |           | send ES /      |       recv ES / |           |
501# |           | send R /       v        send R / |           |
502# |           | recv R     +--------+   recv R   |           |
503# | send R /  `----------->|        |<-----------'  send R / |
504# | recv R                 | closed |               recv R   |
505# `----------------------->|        |<----------------------'
506#                          +--------+
507#
508#    send:   endpoint sends this frame
509#    recv:   endpoint receives this frame
510#
511#    H:  HEADERS frame (with implied CONTINUATIONs)
512#    PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
513#    ES: END_STREAM flag
514#    R:  RST_STREAM frame
515#
516# For the purposes of this state machine we treat HEADERS and their
517# associated CONTINUATION frames as a single jumbo frame. The protocol
518# allows/requires this by preventing other frames from being interleved in
519# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is
520# received without a prior HEADERS frame, it *will* be passed to this state
521# machine. The state machine should always reject that frame, either as an
522# invalid transition or because the stream is closed.
523#
524# There is a confusing relationship around PUSH_PROMISE frames. The state
525# machine above considers them to be frames belonging to the new stream,
526# which is *somewhat* true. However, they are sent with the stream ID of
527# their related stream, and are only sendable in some cases.
528# For this reason, our state machine implementation below allows for
529# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also
530# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states.
531# Essentially, for hyper-h2, PUSH_PROMISE frames are effectively sent on
532# two streams.
533#
534# The _transitions dictionary contains a mapping of tuples of
535# (state, input) to tuples of (side_effect_function, end_state). This
536# map contains all allowed transitions: anything not in this map is
537# invalid and immediately causes a transition to ``closed``.
538_transitions = {
539    # State: idle
540    (StreamState.IDLE, StreamInputs.SEND_HEADERS):
541        (H2StreamStateMachine.request_sent, StreamState.OPEN),
542    (StreamState.IDLE, StreamInputs.RECV_HEADERS):
543        (H2StreamStateMachine.request_received, StreamState.OPEN),
544    (StreamState.IDLE, StreamInputs.RECV_DATA):
545        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
546    (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE):
547        (H2StreamStateMachine.send_new_pushed_stream,
548            StreamState.RESERVED_LOCAL),
549    (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE):
550        (H2StreamStateMachine.recv_new_pushed_stream,
551            StreamState.RESERVED_REMOTE),
552    (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
553        (None, StreamState.IDLE),
554    (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT):
555        (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL),
556    (StreamState.IDLE, StreamInputs.UPGRADE_SERVER):
557        (H2StreamStateMachine.request_received,
558            StreamState.HALF_CLOSED_REMOTE),
559
560    # State: reserved local
561    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS):
562        (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
563    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA):
564        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
565    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
566        (None, StreamState.RESERVED_LOCAL),
567    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
568        (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL),
569    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM):
570        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
571    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM):
572        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
573    (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
574        (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL),
575    (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
576        (None, StreamState.RESERVED_LOCAL),
577
578    # State: reserved remote
579    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS):
580        (H2StreamStateMachine.response_received,
581            StreamState.HALF_CLOSED_LOCAL),
582    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA):
583        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
584    (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
585        (None, StreamState.RESERVED_REMOTE),
586    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
587        (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE),
588    (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM):
589        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
590    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM):
591        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
592    (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
593        (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE),
594
595    # State: open
596    (StreamState.OPEN, StreamInputs.SEND_HEADERS):
597        (H2StreamStateMachine.response_sent, StreamState.OPEN),
598    (StreamState.OPEN, StreamInputs.RECV_HEADERS):
599        (H2StreamStateMachine.response_received, StreamState.OPEN),
600    (StreamState.OPEN, StreamInputs.SEND_DATA):
601        (None, StreamState.OPEN),
602    (StreamState.OPEN, StreamInputs.RECV_DATA):
603        (H2StreamStateMachine.data_received, StreamState.OPEN),
604    (StreamState.OPEN, StreamInputs.SEND_END_STREAM):
605        (None, StreamState.HALF_CLOSED_LOCAL),
606    (StreamState.OPEN, StreamInputs.RECV_END_STREAM):
607        (H2StreamStateMachine.stream_half_closed,
608         StreamState.HALF_CLOSED_REMOTE),
609    (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE):
610        (None, StreamState.OPEN),
611    (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE):
612        (H2StreamStateMachine.window_updated, StreamState.OPEN),
613    (StreamState.OPEN, StreamInputs.SEND_RST_STREAM):
614        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
615    (StreamState.OPEN, StreamInputs.RECV_RST_STREAM):
616        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
617    (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE):
618        (H2StreamStateMachine.send_push_promise, StreamState.OPEN),
619    (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE):
620        (H2StreamStateMachine.recv_push_promise, StreamState.OPEN),
621    (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS):
622        (H2StreamStateMachine.send_informational_response, StreamState.OPEN),
623    (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS):
624        (H2StreamStateMachine.recv_informational_response, StreamState.OPEN),
625    (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE):
626        (H2StreamStateMachine.send_alt_svc, StreamState.OPEN),
627    (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE):
628        (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN),
629
630    # State: half-closed remote
631    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS):
632        (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
633    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS):
634        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
635    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA):
636        (None, StreamState.HALF_CLOSED_REMOTE),
637    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA):
638        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
639    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM):
640        (H2StreamStateMachine.send_end_stream, StreamState.CLOSED),
641    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
642        (None, StreamState.HALF_CLOSED_REMOTE),
643    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
644        (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE),
645    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM):
646        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
647    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM):
648        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
649    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE):
650        (H2StreamStateMachine.send_push_promise,
651            StreamState.HALF_CLOSED_REMOTE),
652    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE):
653        (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
654    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS):
655        (H2StreamStateMachine.send_informational_response,
656            StreamState.HALF_CLOSED_REMOTE),
657    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE):
658        (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE),
659    (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
660        (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE),
661
662    # State: half-closed local
663    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS):
664        (H2StreamStateMachine.response_received,
665            StreamState.HALF_CLOSED_LOCAL),
666    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA):
667        (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL),
668    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM):
669        (H2StreamStateMachine.stream_ended, StreamState.CLOSED),
670    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
671        (None, StreamState.HALF_CLOSED_LOCAL),
672    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
673        (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL),
674    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM):
675        (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
676    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM):
677        (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
678    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE):
679        (H2StreamStateMachine.recv_push_promise,
680            StreamState.HALF_CLOSED_LOCAL),
681    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS):
682        (H2StreamStateMachine.recv_informational_response,
683            StreamState.HALF_CLOSED_LOCAL),
684    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
685        (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL),
686    (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
687        (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL),
688
689    # State: closed
690    (StreamState.CLOSED, StreamInputs.RECV_END_STREAM):
691        (None, StreamState.CLOSED),
692    (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE):
693        (None, StreamState.CLOSED),
694
695    # RFC 7540 Section 5.1 defines how the end point should react when
696    # receiving a frame on a closed stream with the following statements:
697    #
698    # > An endpoint that receives any frame other than PRIORITY after receiving
699    # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED.
700    # > An endpoint that receives any frames after receiving a frame with the
701    # > END_STREAM flag set MUST treat that as a connection error of type
702    # > STREAM_CLOSED.
703    (StreamState.CLOSED, StreamInputs.RECV_HEADERS):
704        (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
705    (StreamState.CLOSED, StreamInputs.RECV_DATA):
706        (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
707
708    # > WINDOW_UPDATE or RST_STREAM frames can be received in this state
709    # > for a short period after a DATA or HEADERS frame containing a
710    # > END_STREAM flag is sent, as instructed in RFC 7540 Section 5.1. But we
711    # > don't have access to a clock so we just always allow it.
712    (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE):
713        (None, StreamState.CLOSED),
714    (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM):
715        (None, StreamState.CLOSED),
716
717    # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is
718    # > neither "open" nor "half-closed (local)" as a connection error of type
719    # > PROTOCOL_ERROR.
720    (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE):
721        (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED),
722
723    # Also, users should be forbidden from sending on closed streams.
724    (StreamState.CLOSED, StreamInputs.SEND_HEADERS):
725        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
726    (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE):
727        (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED),
728    (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM):
729        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
730    (StreamState.CLOSED, StreamInputs.SEND_DATA):
731        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
732    (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE):
733        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
734    (StreamState.CLOSED, StreamInputs.SEND_END_STREAM):
735        (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
736}
737
738
739class H2Stream:
740    """
741    A low-level HTTP/2 stream object. This handles building and receiving
742    frames and maintains per-stream state.
743
744    This wraps a HTTP/2 Stream state machine implementation, ensuring that
745    frames can only be sent/received when the stream is in a valid state.
746    Attempts to create frames that cannot be sent will raise a
747    ``ProtocolError``.
748    """
749    def __init__(self,
750                 stream_id,
751                 config,
752                 inbound_window_size,
753                 outbound_window_size):
754        self.state_machine = H2StreamStateMachine(stream_id)
755        self.stream_id = stream_id
756        self.max_outbound_frame_size = None
757        self.request_method = None
758
759        # The current value of the outbound stream flow control window
760        self.outbound_flow_control_window = outbound_window_size
761
762        # The flow control manager.
763        self._inbound_window_manager = WindowManager(inbound_window_size)
764
765        # The expected content length, if any.
766        self._expected_content_length = None
767
768        # The actual received content length. Always tracked.
769        self._actual_content_length = 0
770
771        # The authority we believe this stream belongs to.
772        self._authority = None
773
774        # The configuration for this stream.
775        self.config = config
776
777    def __repr__(self):
778        return "<%s id:%d state:%r>" % (
779            type(self).__name__,
780            self.stream_id,
781            self.state_machine.state
782        )
783
784    @property
785    def inbound_flow_control_window(self):
786        """
787        The size of the inbound flow control window for the stream. This is
788        rarely publicly useful: instead, use :meth:`remote_flow_control_window
789        <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is
790        largely present to provide a shortcut to this data.
791        """
792        return self._inbound_window_manager.current_window_size
793
794    @property
795    def open(self):
796        """
797        Whether the stream is 'open' in any sense: that is, whether it counts
798        against the number of concurrent streams.
799        """
800        # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either
801        # the OPEN state or either of the HALF_CLOSED states. Perplexingly,
802        # this excludes the reserved states.
803        # For more detail on why we're doing this in this slightly weird way,
804        # see the comment on ``STREAM_OPEN`` at the top of the file.
805        return STREAM_OPEN[self.state_machine.state]
806
807    @property
808    def closed(self):
809        """
810        Whether the stream is closed.
811        """
812        return self.state_machine.state == StreamState.CLOSED
813
814    @property
815    def closed_by(self):
816        """
817        Returns how the stream was closed, as one of StreamClosedBy.
818        """
819        return self.state_machine.stream_closed_by
820
821    def upgrade(self, client_side):
822        """
823        Called by the connection to indicate that this stream is the initial
824        request/response of an upgraded connection. Places the stream into an
825        appropriate state.
826        """
827        self.config.logger.debug("Upgrading %r", self)
828
829        assert self.stream_id == 1
830        input_ = (
831            StreamInputs.UPGRADE_CLIENT if client_side
832            else StreamInputs.UPGRADE_SERVER
833        )
834
835        # This may return events, we deliberately don't want them.
836        self.state_machine.process_input(input_)
837        return
838
839    def send_headers(self, headers, encoder, end_stream=False):
840        """
841        Returns a list of HEADERS/CONTINUATION frames to emit as either headers
842        or trailers.
843        """
844        self.config.logger.debug("Send headers %s on %r", headers, self)
845
846        # Because encoding headers makes an irreversible change to the header
847        # compression context, we make the state transition before we encode
848        # them.
849
850        # First, check if we're a client. If we are, no problem: if we aren't,
851        # we need to scan the header block to see if this is an informational
852        # response.
853        input_ = StreamInputs.SEND_HEADERS
854        if ((not self.state_machine.client) and
855                is_informational_response(headers)):
856            if end_stream:
857                raise ProtocolError(
858                    "Cannot set END_STREAM on informational responses."
859                )
860
861            input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS
862
863        events = self.state_machine.process_input(input_)
864
865        hf = HeadersFrame(self.stream_id)
866        hdr_validation_flags = self._build_hdr_validation_flags(events)
867        frames = self._build_headers_frames(
868            headers, encoder, hf, hdr_validation_flags
869        )
870
871        if end_stream:
872            # Not a bug: the END_STREAM flag is valid on the initial HEADERS
873            # frame, not the CONTINUATION frames that follow.
874            self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
875            frames[0].flags.add('END_STREAM')
876
877        if self.state_machine.trailers_sent and not end_stream:
878            raise ProtocolError("Trailers must have END_STREAM set.")
879
880        if self.state_machine.client and self._authority is None:
881            self._authority = authority_from_headers(headers)
882
883        # store request method for _initialize_content_length
884        self.request_method = extract_method_header(headers)
885
886        return frames
887
888    def push_stream_in_band(self, related_stream_id, headers, encoder):
889        """
890        Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed
891        stream header. Called on the stream that has the PUSH_PROMISE frame
892        sent on it.
893        """
894        self.config.logger.debug("Push stream %r", self)
895
896        # Because encoding headers makes an irreversible change to the header
897        # compression context, we make the state transition *first*.
898
899        events = self.state_machine.process_input(
900            StreamInputs.SEND_PUSH_PROMISE
901        )
902
903        ppf = PushPromiseFrame(self.stream_id)
904        ppf.promised_stream_id = related_stream_id
905        hdr_validation_flags = self._build_hdr_validation_flags(events)
906        frames = self._build_headers_frames(
907            headers, encoder, ppf, hdr_validation_flags
908        )
909
910        return frames
911
912    def locally_pushed(self):
913        """
914        Mark this stream as one that was pushed by this peer. Must be called
915        immediately after initialization. Sends no frames, simply updates the
916        state machine.
917        """
918        # This does not trigger any events.
919        events = self.state_machine.process_input(
920            StreamInputs.SEND_PUSH_PROMISE
921        )
922        assert not events
923        return []
924
925    def send_data(self, data, end_stream=False, pad_length=None):
926        """
927        Prepare some data frames. Optionally end the stream.
928
929        .. warning:: Does not perform flow control checks.
930        """
931        self.config.logger.debug(
932            "Send data on %r with end stream set to %s", self, end_stream
933        )
934
935        self.state_machine.process_input(StreamInputs.SEND_DATA)
936
937        df = DataFrame(self.stream_id)
938        df.data = data
939        if end_stream:
940            self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
941            df.flags.add('END_STREAM')
942        if pad_length is not None:
943            df.flags.add('PADDED')
944            df.pad_length = pad_length
945
946        # Subtract flow_controlled_length to account for possible padding
947        self.outbound_flow_control_window -= df.flow_controlled_length
948        assert self.outbound_flow_control_window >= 0
949
950        return [df]
951
952    def end_stream(self):
953        """
954        End a stream without sending data.
955        """
956        self.config.logger.debug("End stream %r", self)
957
958        self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
959        df = DataFrame(self.stream_id)
960        df.flags.add('END_STREAM')
961        return [df]
962
963    def advertise_alternative_service(self, field_value):
964        """
965        Advertise an RFC 7838 alternative service. The semantics of this are
966        better documented in the ``H2Connection`` class.
967        """
968        self.config.logger.debug(
969            "Advertise alternative service of %r for %r", field_value, self
970        )
971        self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE)
972        asf = AltSvcFrame(self.stream_id)
973        asf.field = field_value
974        return [asf]
975
976    def increase_flow_control_window(self, increment):
977        """
978        Increase the size of the flow control window for the remote side.
979        """
980        self.config.logger.debug(
981            "Increase flow control window for %r by %d",
982            self, increment
983        )
984        self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE)
985        self._inbound_window_manager.window_opened(increment)
986
987        wuf = WindowUpdateFrame(self.stream_id)
988        wuf.window_increment = increment
989        return [wuf]
990
991    def receive_push_promise_in_band(self,
992                                     promised_stream_id,
993                                     headers,
994                                     header_encoding):
995        """
996        Receives a push promise frame sent on this stream, pushing a remote
997        stream. This is called on the stream that has the PUSH_PROMISE sent
998        on it.
999        """
1000        self.config.logger.debug(
1001            "Receive Push Promise on %r for remote stream %d",
1002            self, promised_stream_id
1003        )
1004        events = self.state_machine.process_input(
1005            StreamInputs.RECV_PUSH_PROMISE
1006        )
1007        events[0].pushed_stream_id = promised_stream_id
1008
1009        hdr_validation_flags = self._build_hdr_validation_flags(events)
1010        events[0].headers = self._process_received_headers(
1011            headers, hdr_validation_flags, header_encoding
1012        )
1013        return [], events
1014
1015    def remotely_pushed(self, pushed_headers):
1016        """
1017        Mark this stream as one that was pushed by the remote peer. Must be
1018        called immediately after initialization. Sends no frames, simply
1019        updates the state machine.
1020        """
1021        self.config.logger.debug("%r pushed by remote peer", self)
1022        events = self.state_machine.process_input(
1023            StreamInputs.RECV_PUSH_PROMISE
1024        )
1025        self._authority = authority_from_headers(pushed_headers)
1026        return [], events
1027
1028    def receive_headers(self, headers, end_stream, header_encoding):
1029        """
1030        Receive a set of headers (or trailers).
1031        """
1032        if is_informational_response(headers):
1033            if end_stream:
1034                raise ProtocolError(
1035                    "Cannot set END_STREAM on informational responses"
1036                )
1037            input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS
1038        else:
1039            input_ = StreamInputs.RECV_HEADERS
1040
1041        events = self.state_machine.process_input(input_)
1042
1043        if end_stream:
1044            es_events = self.state_machine.process_input(
1045                StreamInputs.RECV_END_STREAM
1046            )
1047            events[0].stream_ended = es_events[0]
1048            events += es_events
1049
1050        self._initialize_content_length(headers)
1051
1052        if isinstance(events[0], TrailersReceived):
1053            if not end_stream:
1054                raise ProtocolError("Trailers must have END_STREAM set")
1055
1056        hdr_validation_flags = self._build_hdr_validation_flags(events)
1057        events[0].headers = self._process_received_headers(
1058            headers, hdr_validation_flags, header_encoding
1059        )
1060        return [], events
1061
1062    def receive_data(self, data, end_stream, flow_control_len):
1063        """
1064        Receive some data.
1065        """
1066        self.config.logger.debug(
1067            "Receive data on %r with end stream %s and flow control length "
1068            "set to %d", self, end_stream, flow_control_len
1069        )
1070        events = self.state_machine.process_input(StreamInputs.RECV_DATA)
1071        self._inbound_window_manager.window_consumed(flow_control_len)
1072        self._track_content_length(len(data), end_stream)
1073
1074        if end_stream:
1075            es_events = self.state_machine.process_input(
1076                StreamInputs.RECV_END_STREAM
1077            )
1078            events[0].stream_ended = es_events[0]
1079            events.extend(es_events)
1080
1081        events[0].data = data
1082        events[0].flow_controlled_length = flow_control_len
1083        return [], events
1084
1085    def receive_window_update(self, increment):
1086        """
1087        Handle a WINDOW_UPDATE increment.
1088        """
1089        self.config.logger.debug(
1090            "Receive Window Update on %r for increment of %d",
1091            self, increment
1092        )
1093        events = self.state_machine.process_input(
1094            StreamInputs.RECV_WINDOW_UPDATE
1095        )
1096        frames = []
1097
1098        # If we encounter a problem with incrementing the flow control window,
1099        # this should be treated as a *stream* error, not a *connection* error.
1100        # That means we need to catch the error and forcibly close the stream.
1101        if events:
1102            events[0].delta = increment
1103            try:
1104                self.outbound_flow_control_window = guard_increment_window(
1105                    self.outbound_flow_control_window,
1106                    increment
1107                )
1108            except FlowControlError:
1109                # Ok, this is bad. We're going to need to perform a local
1110                # reset.
1111                event = StreamReset()
1112                event.stream_id = self.stream_id
1113                event.error_code = ErrorCodes.FLOW_CONTROL_ERROR
1114                event.remote_reset = False
1115
1116                events = [event]
1117                frames = self.reset_stream(event.error_code)
1118
1119        return frames, events
1120
1121    def receive_continuation(self):
1122        """
1123        A naked CONTINUATION frame has been received. This is always an error,
1124        but the type of error it is depends on the state of the stream and must
1125        transition the state of the stream, so we need to handle it.
1126        """
1127        self.config.logger.debug("Receive Continuation frame on %r", self)
1128        self.state_machine.process_input(
1129            StreamInputs.RECV_CONTINUATION
1130        )
1131        assert False, "Should not be reachable"
1132
1133    def receive_alt_svc(self, frame):
1134        """
1135        An Alternative Service frame was received on the stream. This frame
1136        inherits the origin associated with this stream.
1137        """
1138        self.config.logger.debug(
1139            "Receive Alternative Service frame on stream %r", self
1140        )
1141
1142        # If the origin is present, RFC 7838 says we have to ignore it.
1143        if frame.origin:
1144            return [], []
1145
1146        events = self.state_machine.process_input(
1147            StreamInputs.RECV_ALTERNATIVE_SERVICE
1148        )
1149
1150        # There are lots of situations where we want to ignore the ALTSVC
1151        # frame. If we need to pay attention, we'll have an event and should
1152        # fill it out.
1153        if events:
1154            assert isinstance(events[0], AlternativeServiceAvailable)
1155            events[0].origin = self._authority
1156            events[0].field_value = frame.field
1157
1158        return [], events
1159
1160    def reset_stream(self, error_code=0):
1161        """
1162        Close the stream locally. Reset the stream with an error code.
1163        """
1164        self.config.logger.debug(
1165            "Local reset %r with error code: %d", self, error_code
1166        )
1167        self.state_machine.process_input(StreamInputs.SEND_RST_STREAM)
1168
1169        rsf = RstStreamFrame(self.stream_id)
1170        rsf.error_code = error_code
1171        return [rsf]
1172
1173    def stream_reset(self, frame):
1174        """
1175        Handle a stream being reset remotely.
1176        """
1177        self.config.logger.debug(
1178            "Remote reset %r with error code: %d", self, frame.error_code
1179        )
1180        events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM)
1181
1182        if events:
1183            # We don't fire an event if this stream is already closed.
1184            events[0].error_code = _error_code_from_int(frame.error_code)
1185
1186        return [], events
1187
1188    def acknowledge_received_data(self, acknowledged_size):
1189        """
1190        The user has informed us that they've processed some amount of data
1191        that was received on this stream. Pass that to the window manager and
1192        potentially return some WindowUpdate frames.
1193        """
1194        self.config.logger.debug(
1195            "Acknowledge received data with size %d on %r",
1196            acknowledged_size, self
1197        )
1198        increment = self._inbound_window_manager.process_bytes(
1199            acknowledged_size
1200        )
1201        if increment:
1202            f = WindowUpdateFrame(self.stream_id)
1203            f.window_increment = increment
1204            return [f]
1205
1206        return []
1207
1208    def _build_hdr_validation_flags(self, events):
1209        """
1210        Constructs a set of header validation flags for use when normalizing
1211        and validating header blocks.
1212        """
1213        is_trailer = isinstance(
1214            events[0], (_TrailersSent, TrailersReceived)
1215        )
1216        is_response_header = isinstance(
1217            events[0],
1218            (
1219                _ResponseSent,
1220                ResponseReceived,
1221                InformationalResponseReceived
1222            )
1223        )
1224        is_push_promise = isinstance(
1225            events[0], (PushedStreamReceived, _PushedRequestSent)
1226        )
1227
1228        return HeaderValidationFlags(
1229            is_client=self.state_machine.client,
1230            is_trailer=is_trailer,
1231            is_response_header=is_response_header,
1232            is_push_promise=is_push_promise,
1233        )
1234
1235    def _build_headers_frames(self,
1236                              headers,
1237                              encoder,
1238                              first_frame,
1239                              hdr_validation_flags):
1240        """
1241        Helper method to build headers or push promise frames.
1242        """
1243        # We need to lowercase the header names, and to ensure that secure
1244        # header fields are kept out of compression contexts.
1245        if self.config.normalize_outbound_headers:
1246            headers = normalize_outbound_headers(
1247                headers, hdr_validation_flags
1248            )
1249        if self.config.validate_outbound_headers:
1250            headers = validate_outbound_headers(
1251                headers, hdr_validation_flags
1252            )
1253
1254        encoded_headers = encoder.encode(headers)
1255
1256        # Slice into blocks of max_outbound_frame_size. Be careful with this:
1257        # it only works right because we never send padded frames or priority
1258        # information on the frames. Revisit this if we do.
1259        header_blocks = [
1260            encoded_headers[i:i+self.max_outbound_frame_size]
1261            for i in range(
1262                0, len(encoded_headers), self.max_outbound_frame_size
1263            )
1264        ]
1265
1266        frames = []
1267        first_frame.data = header_blocks[0]
1268        frames.append(first_frame)
1269
1270        for block in header_blocks[1:]:
1271            cf = ContinuationFrame(self.stream_id)
1272            cf.data = block
1273            frames.append(cf)
1274
1275        frames[-1].flags.add('END_HEADERS')
1276        return frames
1277
1278    def _process_received_headers(self,
1279                                  headers,
1280                                  header_validation_flags,
1281                                  header_encoding):
1282        """
1283        When headers have been received from the remote peer, run a processing
1284        pipeline on them to transform them into the appropriate form for
1285        attaching to an event.
1286        """
1287        if self.config.normalize_inbound_headers:
1288            headers = normalize_inbound_headers(
1289                headers, header_validation_flags
1290            )
1291
1292        if self.config.validate_inbound_headers:
1293            headers = validate_headers(headers, header_validation_flags)
1294
1295        if header_encoding:
1296            headers = _decode_headers(headers, header_encoding)
1297
1298        # The above steps are all generators, so we need to concretize the
1299        # headers now.
1300        return list(headers)
1301
1302    def _initialize_content_length(self, headers):
1303        """
1304        Checks the headers for a content-length header and initializes the
1305        _expected_content_length field from it. It's not an error for no
1306        Content-Length header to be present.
1307        """
1308        if self.request_method == b'HEAD':
1309            self._expected_content_length = 0
1310            return
1311
1312        for n, v in headers:
1313            if n == b'content-length':
1314                try:
1315                    self._expected_content_length = int(v, 10)
1316                except ValueError:
1317                    raise ProtocolError(
1318                        "Invalid content-length header: %s" % v
1319                    )
1320
1321                return
1322
1323    def _track_content_length(self, length, end_stream):
1324        """
1325        Update the expected content length in response to data being received.
1326        Validates that the appropriate amount of data is sent. Always updates
1327        the received data, but only validates the length against the
1328        content-length header if one was sent.
1329
1330        :param length: The length of the body chunk received.
1331        :param end_stream: If this is the last body chunk received.
1332        """
1333        self._actual_content_length += length
1334        actual = self._actual_content_length
1335        expected = self._expected_content_length
1336
1337        if expected is not None:
1338            if expected < actual:
1339                raise InvalidBodyLengthError(expected, actual)
1340
1341            if end_stream and expected != actual:
1342                raise InvalidBodyLengthError(expected, actual)
1343
1344    def _inbound_flow_control_change_from_settings(self, delta):
1345        """
1346        We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to
1347        update the target window size for flow control. For our flow control
1348        strategy, this means we need to do two things: we need to adjust the
1349        current window size, but we also need to set the target maximum window
1350        size to the new value.
1351        """
1352        new_max_size = self._inbound_window_manager.max_window_size + delta
1353        self._inbound_window_manager.window_opened(delta)
1354        self._inbound_window_manager.max_window_size = new_max_size
1355
1356
1357def _decode_headers(headers, encoding):
1358    """
1359    Given an iterable of header two-tuples and an encoding, decodes those
1360    headers using that encoding while preserving the type of the header tuple.
1361    This ensures that the use of ``HeaderTuple`` is preserved.
1362    """
1363    for header in headers:
1364        # This function expects to work on decoded headers, which are always
1365        # HeaderTuple objects.
1366        assert isinstance(header, HeaderTuple)
1367
1368        name, value = header
1369        name = name.decode(encoding)
1370        value = value.decode(encoding)
1371        yield header.__class__(name, value)
1372