1# -*- coding: utf-8 -*- 2""" 3h2/stream 4~~~~~~~~~ 5 6An implementation of a HTTP/2 stream. 7""" 8from enum import Enum, IntEnum 9from hpack import HeaderTuple 10from hyperframe.frame import ( 11 HeadersFrame, ContinuationFrame, DataFrame, WindowUpdateFrame, 12 RstStreamFrame, PushPromiseFrame, AltSvcFrame 13) 14 15from .errors import ErrorCodes, _error_code_from_int 16from .events import ( 17 RequestReceived, ResponseReceived, DataReceived, WindowUpdated, 18 StreamEnded, PushedStreamReceived, StreamReset, TrailersReceived, 19 InformationalResponseReceived, AlternativeServiceAvailable, 20 _ResponseSent, _RequestSent, _TrailersSent, _PushedRequestSent 21) 22from .exceptions import ( 23 ProtocolError, StreamClosedError, InvalidBodyLengthError, FlowControlError 24) 25from .utilities import ( 26 guard_increment_window, is_informational_response, authority_from_headers, 27 validate_headers, validate_outbound_headers, normalize_outbound_headers, 28 HeaderValidationFlags, extract_method_header, normalize_inbound_headers 29) 30from .windows import WindowManager 31 32 33class StreamState(IntEnum): 34 IDLE = 0 35 RESERVED_REMOTE = 1 36 RESERVED_LOCAL = 2 37 OPEN = 3 38 HALF_CLOSED_REMOTE = 4 39 HALF_CLOSED_LOCAL = 5 40 CLOSED = 6 41 42 43class StreamInputs(Enum): 44 SEND_HEADERS = 0 45 SEND_PUSH_PROMISE = 1 46 SEND_RST_STREAM = 2 47 SEND_DATA = 3 48 SEND_WINDOW_UPDATE = 4 49 SEND_END_STREAM = 5 50 RECV_HEADERS = 6 51 RECV_PUSH_PROMISE = 7 52 RECV_RST_STREAM = 8 53 RECV_DATA = 9 54 RECV_WINDOW_UPDATE = 10 55 RECV_END_STREAM = 11 56 RECV_CONTINUATION = 12 # Added in 2.0.0 57 SEND_INFORMATIONAL_HEADERS = 13 # Added in 2.2.0 58 RECV_INFORMATIONAL_HEADERS = 14 # Added in 2.2.0 59 SEND_ALTERNATIVE_SERVICE = 15 # Added in 2.3.0 60 RECV_ALTERNATIVE_SERVICE = 16 # Added in 2.3.0 61 UPGRADE_CLIENT = 17 # Added 2.3.0 62 UPGRADE_SERVER = 18 # Added 2.3.0 63 64 65class StreamClosedBy(Enum): 66 SEND_END_STREAM = 0 67 RECV_END_STREAM = 1 68 SEND_RST_STREAM = 2 69 RECV_RST_STREAM = 3 70 71 72# This array is initialized once, and is indexed by the stream states above. 73# It indicates whether a stream in the given state is open. The reason we do 74# this is that we potentially check whether a stream in a given state is open 75# quite frequently: given that we check so often, we should do so in the 76# fastest and most performant way possible. 77STREAM_OPEN = [False for _ in range(0, len(StreamState))] 78STREAM_OPEN[StreamState.OPEN] = True 79STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True 80STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True 81 82 83class H2StreamStateMachine: 84 """ 85 A single HTTP/2 stream state machine. 86 87 This stream object implements basically the state machine described in 88 RFC 7540 section 5.1. 89 90 :param stream_id: The stream ID of this stream. This is stored primarily 91 for logging purposes. 92 """ 93 def __init__(self, stream_id): 94 self.state = StreamState.IDLE 95 self.stream_id = stream_id 96 97 #: Whether this peer is the client side of this stream. 98 self.client = None 99 100 # Whether trailers have been sent/received on this stream or not. 101 self.headers_sent = None 102 self.trailers_sent = None 103 self.headers_received = None 104 self.trailers_received = None 105 106 # How the stream was closed. One of StreamClosedBy. 107 self.stream_closed_by = None 108 109 def process_input(self, input_): 110 """ 111 Process a specific input in the state machine. 112 """ 113 if not isinstance(input_, StreamInputs): 114 raise ValueError("Input must be an instance of StreamInputs") 115 116 try: 117 func, target_state = _transitions[(self.state, input_)] 118 except KeyError: 119 old_state = self.state 120 self.state = StreamState.CLOSED 121 raise ProtocolError( 122 "Invalid input %s in state %s" % (input_, old_state) 123 ) 124 else: 125 previous_state = self.state 126 self.state = target_state 127 if func is not None: 128 try: 129 return func(self, previous_state) 130 except ProtocolError: 131 self.state = StreamState.CLOSED 132 raise 133 except AssertionError as e: # pragma: no cover 134 self.state = StreamState.CLOSED 135 raise ProtocolError(e) 136 137 return [] 138 139 def request_sent(self, previous_state): 140 """ 141 Fires when a request is sent. 142 """ 143 self.client = True 144 self.headers_sent = True 145 event = _RequestSent() 146 147 return [event] 148 149 def response_sent(self, previous_state): 150 """ 151 Fires when something that should be a response is sent. This 'response' 152 may actually be trailers. 153 """ 154 if not self.headers_sent: 155 if self.client is True or self.client is None: 156 raise ProtocolError("Client cannot send responses.") 157 self.headers_sent = True 158 event = _ResponseSent() 159 else: 160 assert not self.trailers_sent 161 self.trailers_sent = True 162 event = _TrailersSent() 163 164 return [event] 165 166 def request_received(self, previous_state): 167 """ 168 Fires when a request is received. 169 """ 170 assert not self.headers_received 171 assert not self.trailers_received 172 173 self.client = False 174 self.headers_received = True 175 event = RequestReceived() 176 177 event.stream_id = self.stream_id 178 return [event] 179 180 def response_received(self, previous_state): 181 """ 182 Fires when a response is received. Also disambiguates between responses 183 and trailers. 184 """ 185 if not self.headers_received: 186 assert self.client is True 187 self.headers_received = True 188 event = ResponseReceived() 189 else: 190 assert not self.trailers_received 191 self.trailers_received = True 192 event = TrailersReceived() 193 194 event.stream_id = self.stream_id 195 return [event] 196 197 def data_received(self, previous_state): 198 """ 199 Fires when data is received. 200 """ 201 if not self.headers_received: 202 raise ProtocolError("cannot receive data before headers") 203 event = DataReceived() 204 event.stream_id = self.stream_id 205 return [event] 206 207 def window_updated(self, previous_state): 208 """ 209 Fires when a window update frame is received. 210 """ 211 event = WindowUpdated() 212 event.stream_id = self.stream_id 213 return [event] 214 215 def stream_half_closed(self, previous_state): 216 """ 217 Fires when an END_STREAM flag is received in the OPEN state, 218 transitioning this stream to a HALF_CLOSED_REMOTE state. 219 """ 220 event = StreamEnded() 221 event.stream_id = self.stream_id 222 return [event] 223 224 def stream_ended(self, previous_state): 225 """ 226 Fires when a stream is cleanly ended. 227 """ 228 self.stream_closed_by = StreamClosedBy.RECV_END_STREAM 229 event = StreamEnded() 230 event.stream_id = self.stream_id 231 return [event] 232 233 def stream_reset(self, previous_state): 234 """ 235 Fired when a stream is forcefully reset. 236 """ 237 self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM 238 event = StreamReset() 239 event.stream_id = self.stream_id 240 return [event] 241 242 def send_new_pushed_stream(self, previous_state): 243 """ 244 Fires on the newly pushed stream, when pushed by the local peer. 245 246 No event here, but definitionally this peer must be a server. 247 """ 248 assert self.client is None 249 self.client = False 250 self.headers_received = True 251 return [] 252 253 def recv_new_pushed_stream(self, previous_state): 254 """ 255 Fires on the newly pushed stream, when pushed by the remote peer. 256 257 No event here, but definitionally this peer must be a client. 258 """ 259 assert self.client is None 260 self.client = True 261 self.headers_sent = True 262 return [] 263 264 def send_push_promise(self, previous_state): 265 """ 266 Fires on the already-existing stream when a PUSH_PROMISE frame is sent. 267 We may only send PUSH_PROMISE frames if we're a server. 268 """ 269 if self.client is True: 270 raise ProtocolError("Cannot push streams from client peers.") 271 272 event = _PushedRequestSent() 273 return [event] 274 275 def recv_push_promise(self, previous_state): 276 """ 277 Fires on the already-existing stream when a PUSH_PROMISE frame is 278 received. We may only receive PUSH_PROMISE frames if we're a client. 279 280 Fires a PushedStreamReceived event. 281 """ 282 if not self.client: 283 if self.client is None: # pragma: no cover 284 msg = "Idle streams cannot receive pushes" 285 else: # pragma: no cover 286 msg = "Cannot receive pushed streams as a server" 287 raise ProtocolError(msg) 288 289 event = PushedStreamReceived() 290 event.parent_stream_id = self.stream_id 291 return [event] 292 293 def send_end_stream(self, previous_state): 294 """ 295 Called when an attempt is made to send END_STREAM in the 296 HALF_CLOSED_REMOTE state. 297 """ 298 self.stream_closed_by = StreamClosedBy.SEND_END_STREAM 299 300 def send_reset_stream(self, previous_state): 301 """ 302 Called when an attempt is made to send RST_STREAM in a non-closed 303 stream state. 304 """ 305 self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM 306 307 def reset_stream_on_error(self, previous_state): 308 """ 309 Called when we need to forcefully emit another RST_STREAM frame on 310 behalf of the state machine. 311 312 If this is the first time we've done this, we should also hang an event 313 off the StreamClosedError so that the user can be informed. We know 314 it's the first time we've done this if the stream is currently in a 315 state other than CLOSED. 316 """ 317 self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM 318 319 error = StreamClosedError(self.stream_id) 320 321 event = StreamReset() 322 event.stream_id = self.stream_id 323 event.error_code = ErrorCodes.STREAM_CLOSED 324 event.remote_reset = False 325 error._events = [event] 326 raise error 327 328 def recv_on_closed_stream(self, previous_state): 329 """ 330 Called when an unexpected frame is received on an already-closed 331 stream. 332 333 An endpoint that receives an unexpected frame should treat it as 334 a stream error or connection error with type STREAM_CLOSED, depending 335 on the specific frame. The error handling is done at a higher level: 336 this just raises the appropriate error. 337 """ 338 raise StreamClosedError(self.stream_id) 339 340 def send_on_closed_stream(self, previous_state): 341 """ 342 Called when an attempt is made to send data on an already-closed 343 stream. 344 345 This essentially overrides the standard logic by throwing a 346 more-specific error: StreamClosedError. This is a ProtocolError, so it 347 matches the standard API of the state machine, but provides more detail 348 to the user. 349 """ 350 raise StreamClosedError(self.stream_id) 351 352 def recv_push_on_closed_stream(self, previous_state): 353 """ 354 Called when a PUSH_PROMISE frame is received on a full stop 355 stream. 356 357 If the stream was closed by us sending a RST_STREAM frame, then we 358 presume that the PUSH_PROMISE was in flight when we reset the parent 359 stream. Rathen than accept the new stream, we just reset it. 360 Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a 361 naturally closed stream is a real problem because it creates a brand 362 new stream that the remote peer now believes exists. 363 """ 364 assert self.stream_closed_by is not None 365 366 if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM: 367 raise StreamClosedError(self.stream_id) 368 else: 369 raise ProtocolError("Attempted to push on closed stream.") 370 371 def send_push_on_closed_stream(self, previous_state): 372 """ 373 Called when an attempt is made to push on an already-closed stream. 374 375 This essentially overrides the standard logic by providing a more 376 useful error message. It's necessary because simply indicating that the 377 stream is closed is not enough: there is now a new stream that is not 378 allowed to be there. The only recourse is to tear the whole connection 379 down. 380 """ 381 raise ProtocolError("Attempted to push on closed stream.") 382 383 def send_informational_response(self, previous_state): 384 """ 385 Called when an informational header block is sent (that is, a block 386 where the :status header has a 1XX value). 387 388 Only enforces that these are sent *before* final headers are sent. 389 """ 390 if self.headers_sent: 391 raise ProtocolError("Information response after final response") 392 393 event = _ResponseSent() 394 return [event] 395 396 def recv_informational_response(self, previous_state): 397 """ 398 Called when an informational header block is received (that is, a block 399 where the :status header has a 1XX value). 400 """ 401 if self.headers_received: 402 raise ProtocolError("Informational response after final response") 403 404 event = InformationalResponseReceived() 405 event.stream_id = self.stream_id 406 return [event] 407 408 def recv_alt_svc(self, previous_state): 409 """ 410 Called when receiving an ALTSVC frame. 411 412 RFC 7838 allows us to receive ALTSVC frames at any stream state, which 413 is really absurdly overzealous. For that reason, we want to limit the 414 states in which we can actually receive it. It's really only sensible 415 to receive it after we've sent our own headers and before the server 416 has sent its header block: the server can't guarantee that we have any 417 state around after it completes its header block, and the server 418 doesn't know what origin we're talking about before we've sent ours. 419 420 For that reason, this function applies a few extra checks on both state 421 and some of the little state variables we keep around. If those suggest 422 an unreasonable situation for the ALTSVC frame to have been sent in, 423 we quietly ignore it (as RFC 7838 suggests). 424 425 This function is also *not* always called by the state machine. In some 426 states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it, 427 because we know the frame cannot be valid in that state (IDLE because 428 the server cannot know what origin the stream applies to, CLOSED 429 because the server cannot assume we still have state around, 430 RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL 431 state then *we* are the server). 432 """ 433 # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore 434 # them. 435 if self.client is False: 436 return [] 437 438 # If we've received the response headers from the server they can't 439 # guarantee we still have any state around. Other implementations 440 # (like nghttp2) ignore ALTSVC in this state, so we will too. 441 if self.headers_received: 442 return [] 443 444 # Otherwise, this is a sensible enough frame to have received. Return 445 # the event and let it get populated. 446 return [AlternativeServiceAvailable()] 447 448 def send_alt_svc(self, previous_state): 449 """ 450 Called when sending an ALTSVC frame on this stream. 451 452 For consistency with the restrictions we apply on receiving ALTSVC 453 frames in ``recv_alt_svc``, we want to restrict when users can send 454 ALTSVC frames to the situations when we ourselves would accept them. 455 456 That means: when we are a server, when we have received the request 457 headers, and when we have not yet sent our own response headers. 458 """ 459 # We should not send ALTSVC after we've sent response headers, as the 460 # client may have disposed of its state. 461 if self.headers_sent: 462 raise ProtocolError( 463 "Cannot send ALTSVC after sending response headers." 464 ) 465 466 return 467 468 469# STATE MACHINE 470# 471# The stream state machine is defined here to avoid the need to allocate it 472# repeatedly for each stream. It cannot be defined in the stream class because 473# it needs to be able to reference the callbacks defined on the class, but 474# because Python's scoping rules are weird the class object is not actually in 475# scope during the body of the class object. 476# 477# For the sake of clarity, we reproduce the RFC 7540 state machine here: 478# 479# +--------+ 480# send PP | | recv PP 481# ,--------| idle |--------. 482# / | | \ 483# v +--------+ v 484# +----------+ | +----------+ 485# | | | send H / | | 486# ,------| reserved | | recv H | reserved |------. 487# | | (local) | | | (remote) | | 488# | +----------+ v +----------+ | 489# | | +--------+ | | 490# | | recv ES | | send ES | | 491# | send H | ,-------| open |-------. | recv H | 492# | | / | | \ | | 493# | v v +--------+ v v | 494# | +----------+ | +----------+ | 495# | | half | | | half | | 496# | | closed | | send R / | closed | | 497# | | (remote) | | recv R | (local) | | 498# | +----------+ | +----------+ | 499# | | | | | 500# | | send ES / | recv ES / | | 501# | | send R / v send R / | | 502# | | recv R +--------+ recv R | | 503# | send R / `----------->| |<-----------' send R / | 504# | recv R | closed | recv R | 505# `----------------------->| |<----------------------' 506# +--------+ 507# 508# send: endpoint sends this frame 509# recv: endpoint receives this frame 510# 511# H: HEADERS frame (with implied CONTINUATIONs) 512# PP: PUSH_PROMISE frame (with implied CONTINUATIONs) 513# ES: END_STREAM flag 514# R: RST_STREAM frame 515# 516# For the purposes of this state machine we treat HEADERS and their 517# associated CONTINUATION frames as a single jumbo frame. The protocol 518# allows/requires this by preventing other frames from being interleved in 519# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is 520# received without a prior HEADERS frame, it *will* be passed to this state 521# machine. The state machine should always reject that frame, either as an 522# invalid transition or because the stream is closed. 523# 524# There is a confusing relationship around PUSH_PROMISE frames. The state 525# machine above considers them to be frames belonging to the new stream, 526# which is *somewhat* true. However, they are sent with the stream ID of 527# their related stream, and are only sendable in some cases. 528# For this reason, our state machine implementation below allows for 529# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also 530# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states. 531# Essentially, for hyper-h2, PUSH_PROMISE frames are effectively sent on 532# two streams. 533# 534# The _transitions dictionary contains a mapping of tuples of 535# (state, input) to tuples of (side_effect_function, end_state). This 536# map contains all allowed transitions: anything not in this map is 537# invalid and immediately causes a transition to ``closed``. 538_transitions = { 539 # State: idle 540 (StreamState.IDLE, StreamInputs.SEND_HEADERS): 541 (H2StreamStateMachine.request_sent, StreamState.OPEN), 542 (StreamState.IDLE, StreamInputs.RECV_HEADERS): 543 (H2StreamStateMachine.request_received, StreamState.OPEN), 544 (StreamState.IDLE, StreamInputs.RECV_DATA): 545 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 546 (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE): 547 (H2StreamStateMachine.send_new_pushed_stream, 548 StreamState.RESERVED_LOCAL), 549 (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE): 550 (H2StreamStateMachine.recv_new_pushed_stream, 551 StreamState.RESERVED_REMOTE), 552 (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE): 553 (None, StreamState.IDLE), 554 (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT): 555 (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL), 556 (StreamState.IDLE, StreamInputs.UPGRADE_SERVER): 557 (H2StreamStateMachine.request_received, 558 StreamState.HALF_CLOSED_REMOTE), 559 560 # State: reserved local 561 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS): 562 (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), 563 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA): 564 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 565 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): 566 (None, StreamState.RESERVED_LOCAL), 567 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): 568 (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL), 569 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM): 570 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 571 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM): 572 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 573 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): 574 (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL), 575 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): 576 (None, StreamState.RESERVED_LOCAL), 577 578 # State: reserved remote 579 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS): 580 (H2StreamStateMachine.response_received, 581 StreamState.HALF_CLOSED_LOCAL), 582 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA): 583 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 584 (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): 585 (None, StreamState.RESERVED_REMOTE), 586 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): 587 (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE), 588 (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM): 589 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 590 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM): 591 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 592 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): 593 (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE), 594 595 # State: open 596 (StreamState.OPEN, StreamInputs.SEND_HEADERS): 597 (H2StreamStateMachine.response_sent, StreamState.OPEN), 598 (StreamState.OPEN, StreamInputs.RECV_HEADERS): 599 (H2StreamStateMachine.response_received, StreamState.OPEN), 600 (StreamState.OPEN, StreamInputs.SEND_DATA): 601 (None, StreamState.OPEN), 602 (StreamState.OPEN, StreamInputs.RECV_DATA): 603 (H2StreamStateMachine.data_received, StreamState.OPEN), 604 (StreamState.OPEN, StreamInputs.SEND_END_STREAM): 605 (None, StreamState.HALF_CLOSED_LOCAL), 606 (StreamState.OPEN, StreamInputs.RECV_END_STREAM): 607 (H2StreamStateMachine.stream_half_closed, 608 StreamState.HALF_CLOSED_REMOTE), 609 (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE): 610 (None, StreamState.OPEN), 611 (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE): 612 (H2StreamStateMachine.window_updated, StreamState.OPEN), 613 (StreamState.OPEN, StreamInputs.SEND_RST_STREAM): 614 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 615 (StreamState.OPEN, StreamInputs.RECV_RST_STREAM): 616 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 617 (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE): 618 (H2StreamStateMachine.send_push_promise, StreamState.OPEN), 619 (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE): 620 (H2StreamStateMachine.recv_push_promise, StreamState.OPEN), 621 (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS): 622 (H2StreamStateMachine.send_informational_response, StreamState.OPEN), 623 (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS): 624 (H2StreamStateMachine.recv_informational_response, StreamState.OPEN), 625 (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE): 626 (H2StreamStateMachine.send_alt_svc, StreamState.OPEN), 627 (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE): 628 (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN), 629 630 # State: half-closed remote 631 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS): 632 (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), 633 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS): 634 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 635 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA): 636 (None, StreamState.HALF_CLOSED_REMOTE), 637 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA): 638 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 639 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM): 640 (H2StreamStateMachine.send_end_stream, StreamState.CLOSED), 641 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): 642 (None, StreamState.HALF_CLOSED_REMOTE), 643 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): 644 (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE), 645 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM): 646 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 647 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM): 648 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 649 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE): 650 (H2StreamStateMachine.send_push_promise, 651 StreamState.HALF_CLOSED_REMOTE), 652 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE): 653 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 654 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS): 655 (H2StreamStateMachine.send_informational_response, 656 StreamState.HALF_CLOSED_REMOTE), 657 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE): 658 (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE), 659 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): 660 (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE), 661 662 # State: half-closed local 663 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS): 664 (H2StreamStateMachine.response_received, 665 StreamState.HALF_CLOSED_LOCAL), 666 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA): 667 (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL), 668 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM): 669 (H2StreamStateMachine.stream_ended, StreamState.CLOSED), 670 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): 671 (None, StreamState.HALF_CLOSED_LOCAL), 672 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): 673 (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL), 674 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM): 675 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 676 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM): 677 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 678 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE): 679 (H2StreamStateMachine.recv_push_promise, 680 StreamState.HALF_CLOSED_LOCAL), 681 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS): 682 (H2StreamStateMachine.recv_informational_response, 683 StreamState.HALF_CLOSED_LOCAL), 684 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): 685 (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL), 686 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): 687 (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL), 688 689 # State: closed 690 (StreamState.CLOSED, StreamInputs.RECV_END_STREAM): 691 (None, StreamState.CLOSED), 692 (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE): 693 (None, StreamState.CLOSED), 694 695 # RFC 7540 Section 5.1 defines how the end point should react when 696 # receiving a frame on a closed stream with the following statements: 697 # 698 # > An endpoint that receives any frame other than PRIORITY after receiving 699 # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED. 700 # > An endpoint that receives any frames after receiving a frame with the 701 # > END_STREAM flag set MUST treat that as a connection error of type 702 # > STREAM_CLOSED. 703 (StreamState.CLOSED, StreamInputs.RECV_HEADERS): 704 (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), 705 (StreamState.CLOSED, StreamInputs.RECV_DATA): 706 (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), 707 708 # > WINDOW_UPDATE or RST_STREAM frames can be received in this state 709 # > for a short period after a DATA or HEADERS frame containing a 710 # > END_STREAM flag is sent, as instructed in RFC 7540 Section 5.1. But we 711 # > don't have access to a clock so we just always allow it. 712 (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE): 713 (None, StreamState.CLOSED), 714 (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM): 715 (None, StreamState.CLOSED), 716 717 # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is 718 # > neither "open" nor "half-closed (local)" as a connection error of type 719 # > PROTOCOL_ERROR. 720 (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE): 721 (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED), 722 723 # Also, users should be forbidden from sending on closed streams. 724 (StreamState.CLOSED, StreamInputs.SEND_HEADERS): 725 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 726 (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE): 727 (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED), 728 (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM): 729 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 730 (StreamState.CLOSED, StreamInputs.SEND_DATA): 731 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 732 (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE): 733 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 734 (StreamState.CLOSED, StreamInputs.SEND_END_STREAM): 735 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 736} 737 738 739class H2Stream: 740 """ 741 A low-level HTTP/2 stream object. This handles building and receiving 742 frames and maintains per-stream state. 743 744 This wraps a HTTP/2 Stream state machine implementation, ensuring that 745 frames can only be sent/received when the stream is in a valid state. 746 Attempts to create frames that cannot be sent will raise a 747 ``ProtocolError``. 748 """ 749 def __init__(self, 750 stream_id, 751 config, 752 inbound_window_size, 753 outbound_window_size): 754 self.state_machine = H2StreamStateMachine(stream_id) 755 self.stream_id = stream_id 756 self.max_outbound_frame_size = None 757 self.request_method = None 758 759 # The current value of the outbound stream flow control window 760 self.outbound_flow_control_window = outbound_window_size 761 762 # The flow control manager. 763 self._inbound_window_manager = WindowManager(inbound_window_size) 764 765 # The expected content length, if any. 766 self._expected_content_length = None 767 768 # The actual received content length. Always tracked. 769 self._actual_content_length = 0 770 771 # The authority we believe this stream belongs to. 772 self._authority = None 773 774 # The configuration for this stream. 775 self.config = config 776 777 def __repr__(self): 778 return "<%s id:%d state:%r>" % ( 779 type(self).__name__, 780 self.stream_id, 781 self.state_machine.state 782 ) 783 784 @property 785 def inbound_flow_control_window(self): 786 """ 787 The size of the inbound flow control window for the stream. This is 788 rarely publicly useful: instead, use :meth:`remote_flow_control_window 789 <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is 790 largely present to provide a shortcut to this data. 791 """ 792 return self._inbound_window_manager.current_window_size 793 794 @property 795 def open(self): 796 """ 797 Whether the stream is 'open' in any sense: that is, whether it counts 798 against the number of concurrent streams. 799 """ 800 # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either 801 # the OPEN state or either of the HALF_CLOSED states. Perplexingly, 802 # this excludes the reserved states. 803 # For more detail on why we're doing this in this slightly weird way, 804 # see the comment on ``STREAM_OPEN`` at the top of the file. 805 return STREAM_OPEN[self.state_machine.state] 806 807 @property 808 def closed(self): 809 """ 810 Whether the stream is closed. 811 """ 812 return self.state_machine.state == StreamState.CLOSED 813 814 @property 815 def closed_by(self): 816 """ 817 Returns how the stream was closed, as one of StreamClosedBy. 818 """ 819 return self.state_machine.stream_closed_by 820 821 def upgrade(self, client_side): 822 """ 823 Called by the connection to indicate that this stream is the initial 824 request/response of an upgraded connection. Places the stream into an 825 appropriate state. 826 """ 827 self.config.logger.debug("Upgrading %r", self) 828 829 assert self.stream_id == 1 830 input_ = ( 831 StreamInputs.UPGRADE_CLIENT if client_side 832 else StreamInputs.UPGRADE_SERVER 833 ) 834 835 # This may return events, we deliberately don't want them. 836 self.state_machine.process_input(input_) 837 return 838 839 def send_headers(self, headers, encoder, end_stream=False): 840 """ 841 Returns a list of HEADERS/CONTINUATION frames to emit as either headers 842 or trailers. 843 """ 844 self.config.logger.debug("Send headers %s on %r", headers, self) 845 846 # Because encoding headers makes an irreversible change to the header 847 # compression context, we make the state transition before we encode 848 # them. 849 850 # First, check if we're a client. If we are, no problem: if we aren't, 851 # we need to scan the header block to see if this is an informational 852 # response. 853 input_ = StreamInputs.SEND_HEADERS 854 if ((not self.state_machine.client) and 855 is_informational_response(headers)): 856 if end_stream: 857 raise ProtocolError( 858 "Cannot set END_STREAM on informational responses." 859 ) 860 861 input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS 862 863 events = self.state_machine.process_input(input_) 864 865 hf = HeadersFrame(self.stream_id) 866 hdr_validation_flags = self._build_hdr_validation_flags(events) 867 frames = self._build_headers_frames( 868 headers, encoder, hf, hdr_validation_flags 869 ) 870 871 if end_stream: 872 # Not a bug: the END_STREAM flag is valid on the initial HEADERS 873 # frame, not the CONTINUATION frames that follow. 874 self.state_machine.process_input(StreamInputs.SEND_END_STREAM) 875 frames[0].flags.add('END_STREAM') 876 877 if self.state_machine.trailers_sent and not end_stream: 878 raise ProtocolError("Trailers must have END_STREAM set.") 879 880 if self.state_machine.client and self._authority is None: 881 self._authority = authority_from_headers(headers) 882 883 # store request method for _initialize_content_length 884 self.request_method = extract_method_header(headers) 885 886 return frames 887 888 def push_stream_in_band(self, related_stream_id, headers, encoder): 889 """ 890 Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed 891 stream header. Called on the stream that has the PUSH_PROMISE frame 892 sent on it. 893 """ 894 self.config.logger.debug("Push stream %r", self) 895 896 # Because encoding headers makes an irreversible change to the header 897 # compression context, we make the state transition *first*. 898 899 events = self.state_machine.process_input( 900 StreamInputs.SEND_PUSH_PROMISE 901 ) 902 903 ppf = PushPromiseFrame(self.stream_id) 904 ppf.promised_stream_id = related_stream_id 905 hdr_validation_flags = self._build_hdr_validation_flags(events) 906 frames = self._build_headers_frames( 907 headers, encoder, ppf, hdr_validation_flags 908 ) 909 910 return frames 911 912 def locally_pushed(self): 913 """ 914 Mark this stream as one that was pushed by this peer. Must be called 915 immediately after initialization. Sends no frames, simply updates the 916 state machine. 917 """ 918 # This does not trigger any events. 919 events = self.state_machine.process_input( 920 StreamInputs.SEND_PUSH_PROMISE 921 ) 922 assert not events 923 return [] 924 925 def send_data(self, data, end_stream=False, pad_length=None): 926 """ 927 Prepare some data frames. Optionally end the stream. 928 929 .. warning:: Does not perform flow control checks. 930 """ 931 self.config.logger.debug( 932 "Send data on %r with end stream set to %s", self, end_stream 933 ) 934 935 self.state_machine.process_input(StreamInputs.SEND_DATA) 936 937 df = DataFrame(self.stream_id) 938 df.data = data 939 if end_stream: 940 self.state_machine.process_input(StreamInputs.SEND_END_STREAM) 941 df.flags.add('END_STREAM') 942 if pad_length is not None: 943 df.flags.add('PADDED') 944 df.pad_length = pad_length 945 946 # Subtract flow_controlled_length to account for possible padding 947 self.outbound_flow_control_window -= df.flow_controlled_length 948 assert self.outbound_flow_control_window >= 0 949 950 return [df] 951 952 def end_stream(self): 953 """ 954 End a stream without sending data. 955 """ 956 self.config.logger.debug("End stream %r", self) 957 958 self.state_machine.process_input(StreamInputs.SEND_END_STREAM) 959 df = DataFrame(self.stream_id) 960 df.flags.add('END_STREAM') 961 return [df] 962 963 def advertise_alternative_service(self, field_value): 964 """ 965 Advertise an RFC 7838 alternative service. The semantics of this are 966 better documented in the ``H2Connection`` class. 967 """ 968 self.config.logger.debug( 969 "Advertise alternative service of %r for %r", field_value, self 970 ) 971 self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE) 972 asf = AltSvcFrame(self.stream_id) 973 asf.field = field_value 974 return [asf] 975 976 def increase_flow_control_window(self, increment): 977 """ 978 Increase the size of the flow control window for the remote side. 979 """ 980 self.config.logger.debug( 981 "Increase flow control window for %r by %d", 982 self, increment 983 ) 984 self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE) 985 self._inbound_window_manager.window_opened(increment) 986 987 wuf = WindowUpdateFrame(self.stream_id) 988 wuf.window_increment = increment 989 return [wuf] 990 991 def receive_push_promise_in_band(self, 992 promised_stream_id, 993 headers, 994 header_encoding): 995 """ 996 Receives a push promise frame sent on this stream, pushing a remote 997 stream. This is called on the stream that has the PUSH_PROMISE sent 998 on it. 999 """ 1000 self.config.logger.debug( 1001 "Receive Push Promise on %r for remote stream %d", 1002 self, promised_stream_id 1003 ) 1004 events = self.state_machine.process_input( 1005 StreamInputs.RECV_PUSH_PROMISE 1006 ) 1007 events[0].pushed_stream_id = promised_stream_id 1008 1009 hdr_validation_flags = self._build_hdr_validation_flags(events) 1010 events[0].headers = self._process_received_headers( 1011 headers, hdr_validation_flags, header_encoding 1012 ) 1013 return [], events 1014 1015 def remotely_pushed(self, pushed_headers): 1016 """ 1017 Mark this stream as one that was pushed by the remote peer. Must be 1018 called immediately after initialization. Sends no frames, simply 1019 updates the state machine. 1020 """ 1021 self.config.logger.debug("%r pushed by remote peer", self) 1022 events = self.state_machine.process_input( 1023 StreamInputs.RECV_PUSH_PROMISE 1024 ) 1025 self._authority = authority_from_headers(pushed_headers) 1026 return [], events 1027 1028 def receive_headers(self, headers, end_stream, header_encoding): 1029 """ 1030 Receive a set of headers (or trailers). 1031 """ 1032 if is_informational_response(headers): 1033 if end_stream: 1034 raise ProtocolError( 1035 "Cannot set END_STREAM on informational responses" 1036 ) 1037 input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS 1038 else: 1039 input_ = StreamInputs.RECV_HEADERS 1040 1041 events = self.state_machine.process_input(input_) 1042 1043 if end_stream: 1044 es_events = self.state_machine.process_input( 1045 StreamInputs.RECV_END_STREAM 1046 ) 1047 events[0].stream_ended = es_events[0] 1048 events += es_events 1049 1050 self._initialize_content_length(headers) 1051 1052 if isinstance(events[0], TrailersReceived): 1053 if not end_stream: 1054 raise ProtocolError("Trailers must have END_STREAM set") 1055 1056 hdr_validation_flags = self._build_hdr_validation_flags(events) 1057 events[0].headers = self._process_received_headers( 1058 headers, hdr_validation_flags, header_encoding 1059 ) 1060 return [], events 1061 1062 def receive_data(self, data, end_stream, flow_control_len): 1063 """ 1064 Receive some data. 1065 """ 1066 self.config.logger.debug( 1067 "Receive data on %r with end stream %s and flow control length " 1068 "set to %d", self, end_stream, flow_control_len 1069 ) 1070 events = self.state_machine.process_input(StreamInputs.RECV_DATA) 1071 self._inbound_window_manager.window_consumed(flow_control_len) 1072 self._track_content_length(len(data), end_stream) 1073 1074 if end_stream: 1075 es_events = self.state_machine.process_input( 1076 StreamInputs.RECV_END_STREAM 1077 ) 1078 events[0].stream_ended = es_events[0] 1079 events.extend(es_events) 1080 1081 events[0].data = data 1082 events[0].flow_controlled_length = flow_control_len 1083 return [], events 1084 1085 def receive_window_update(self, increment): 1086 """ 1087 Handle a WINDOW_UPDATE increment. 1088 """ 1089 self.config.logger.debug( 1090 "Receive Window Update on %r for increment of %d", 1091 self, increment 1092 ) 1093 events = self.state_machine.process_input( 1094 StreamInputs.RECV_WINDOW_UPDATE 1095 ) 1096 frames = [] 1097 1098 # If we encounter a problem with incrementing the flow control window, 1099 # this should be treated as a *stream* error, not a *connection* error. 1100 # That means we need to catch the error and forcibly close the stream. 1101 if events: 1102 events[0].delta = increment 1103 try: 1104 self.outbound_flow_control_window = guard_increment_window( 1105 self.outbound_flow_control_window, 1106 increment 1107 ) 1108 except FlowControlError: 1109 # Ok, this is bad. We're going to need to perform a local 1110 # reset. 1111 event = StreamReset() 1112 event.stream_id = self.stream_id 1113 event.error_code = ErrorCodes.FLOW_CONTROL_ERROR 1114 event.remote_reset = False 1115 1116 events = [event] 1117 frames = self.reset_stream(event.error_code) 1118 1119 return frames, events 1120 1121 def receive_continuation(self): 1122 """ 1123 A naked CONTINUATION frame has been received. This is always an error, 1124 but the type of error it is depends on the state of the stream and must 1125 transition the state of the stream, so we need to handle it. 1126 """ 1127 self.config.logger.debug("Receive Continuation frame on %r", self) 1128 self.state_machine.process_input( 1129 StreamInputs.RECV_CONTINUATION 1130 ) 1131 assert False, "Should not be reachable" 1132 1133 def receive_alt_svc(self, frame): 1134 """ 1135 An Alternative Service frame was received on the stream. This frame 1136 inherits the origin associated with this stream. 1137 """ 1138 self.config.logger.debug( 1139 "Receive Alternative Service frame on stream %r", self 1140 ) 1141 1142 # If the origin is present, RFC 7838 says we have to ignore it. 1143 if frame.origin: 1144 return [], [] 1145 1146 events = self.state_machine.process_input( 1147 StreamInputs.RECV_ALTERNATIVE_SERVICE 1148 ) 1149 1150 # There are lots of situations where we want to ignore the ALTSVC 1151 # frame. If we need to pay attention, we'll have an event and should 1152 # fill it out. 1153 if events: 1154 assert isinstance(events[0], AlternativeServiceAvailable) 1155 events[0].origin = self._authority 1156 events[0].field_value = frame.field 1157 1158 return [], events 1159 1160 def reset_stream(self, error_code=0): 1161 """ 1162 Close the stream locally. Reset the stream with an error code. 1163 """ 1164 self.config.logger.debug( 1165 "Local reset %r with error code: %d", self, error_code 1166 ) 1167 self.state_machine.process_input(StreamInputs.SEND_RST_STREAM) 1168 1169 rsf = RstStreamFrame(self.stream_id) 1170 rsf.error_code = error_code 1171 return [rsf] 1172 1173 def stream_reset(self, frame): 1174 """ 1175 Handle a stream being reset remotely. 1176 """ 1177 self.config.logger.debug( 1178 "Remote reset %r with error code: %d", self, frame.error_code 1179 ) 1180 events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM) 1181 1182 if events: 1183 # We don't fire an event if this stream is already closed. 1184 events[0].error_code = _error_code_from_int(frame.error_code) 1185 1186 return [], events 1187 1188 def acknowledge_received_data(self, acknowledged_size): 1189 """ 1190 The user has informed us that they've processed some amount of data 1191 that was received on this stream. Pass that to the window manager and 1192 potentially return some WindowUpdate frames. 1193 """ 1194 self.config.logger.debug( 1195 "Acknowledge received data with size %d on %r", 1196 acknowledged_size, self 1197 ) 1198 increment = self._inbound_window_manager.process_bytes( 1199 acknowledged_size 1200 ) 1201 if increment: 1202 f = WindowUpdateFrame(self.stream_id) 1203 f.window_increment = increment 1204 return [f] 1205 1206 return [] 1207 1208 def _build_hdr_validation_flags(self, events): 1209 """ 1210 Constructs a set of header validation flags for use when normalizing 1211 and validating header blocks. 1212 """ 1213 is_trailer = isinstance( 1214 events[0], (_TrailersSent, TrailersReceived) 1215 ) 1216 is_response_header = isinstance( 1217 events[0], 1218 ( 1219 _ResponseSent, 1220 ResponseReceived, 1221 InformationalResponseReceived 1222 ) 1223 ) 1224 is_push_promise = isinstance( 1225 events[0], (PushedStreamReceived, _PushedRequestSent) 1226 ) 1227 1228 return HeaderValidationFlags( 1229 is_client=self.state_machine.client, 1230 is_trailer=is_trailer, 1231 is_response_header=is_response_header, 1232 is_push_promise=is_push_promise, 1233 ) 1234 1235 def _build_headers_frames(self, 1236 headers, 1237 encoder, 1238 first_frame, 1239 hdr_validation_flags): 1240 """ 1241 Helper method to build headers or push promise frames. 1242 """ 1243 # We need to lowercase the header names, and to ensure that secure 1244 # header fields are kept out of compression contexts. 1245 if self.config.normalize_outbound_headers: 1246 headers = normalize_outbound_headers( 1247 headers, hdr_validation_flags 1248 ) 1249 if self.config.validate_outbound_headers: 1250 headers = validate_outbound_headers( 1251 headers, hdr_validation_flags 1252 ) 1253 1254 encoded_headers = encoder.encode(headers) 1255 1256 # Slice into blocks of max_outbound_frame_size. Be careful with this: 1257 # it only works right because we never send padded frames or priority 1258 # information on the frames. Revisit this if we do. 1259 header_blocks = [ 1260 encoded_headers[i:i+self.max_outbound_frame_size] 1261 for i in range( 1262 0, len(encoded_headers), self.max_outbound_frame_size 1263 ) 1264 ] 1265 1266 frames = [] 1267 first_frame.data = header_blocks[0] 1268 frames.append(first_frame) 1269 1270 for block in header_blocks[1:]: 1271 cf = ContinuationFrame(self.stream_id) 1272 cf.data = block 1273 frames.append(cf) 1274 1275 frames[-1].flags.add('END_HEADERS') 1276 return frames 1277 1278 def _process_received_headers(self, 1279 headers, 1280 header_validation_flags, 1281 header_encoding): 1282 """ 1283 When headers have been received from the remote peer, run a processing 1284 pipeline on them to transform them into the appropriate form for 1285 attaching to an event. 1286 """ 1287 if self.config.normalize_inbound_headers: 1288 headers = normalize_inbound_headers( 1289 headers, header_validation_flags 1290 ) 1291 1292 if self.config.validate_inbound_headers: 1293 headers = validate_headers(headers, header_validation_flags) 1294 1295 if header_encoding: 1296 headers = _decode_headers(headers, header_encoding) 1297 1298 # The above steps are all generators, so we need to concretize the 1299 # headers now. 1300 return list(headers) 1301 1302 def _initialize_content_length(self, headers): 1303 """ 1304 Checks the headers for a content-length header and initializes the 1305 _expected_content_length field from it. It's not an error for no 1306 Content-Length header to be present. 1307 """ 1308 if self.request_method == b'HEAD': 1309 self._expected_content_length = 0 1310 return 1311 1312 for n, v in headers: 1313 if n == b'content-length': 1314 try: 1315 self._expected_content_length = int(v, 10) 1316 except ValueError: 1317 raise ProtocolError( 1318 "Invalid content-length header: %s" % v 1319 ) 1320 1321 return 1322 1323 def _track_content_length(self, length, end_stream): 1324 """ 1325 Update the expected content length in response to data being received. 1326 Validates that the appropriate amount of data is sent. Always updates 1327 the received data, but only validates the length against the 1328 content-length header if one was sent. 1329 1330 :param length: The length of the body chunk received. 1331 :param end_stream: If this is the last body chunk received. 1332 """ 1333 self._actual_content_length += length 1334 actual = self._actual_content_length 1335 expected = self._expected_content_length 1336 1337 if expected is not None: 1338 if expected < actual: 1339 raise InvalidBodyLengthError(expected, actual) 1340 1341 if end_stream and expected != actual: 1342 raise InvalidBodyLengthError(expected, actual) 1343 1344 def _inbound_flow_control_change_from_settings(self, delta): 1345 """ 1346 We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to 1347 update the target window size for flow control. For our flow control 1348 strategy, this means we need to do two things: we need to adjust the 1349 current window size, but we also need to set the target maximum window 1350 size to the new value. 1351 """ 1352 new_max_size = self._inbound_window_manager.max_window_size + delta 1353 self._inbound_window_manager.window_opened(delta) 1354 self._inbound_window_manager.max_window_size = new_max_size 1355 1356 1357def _decode_headers(headers, encoding): 1358 """ 1359 Given an iterable of header two-tuples and an encoding, decodes those 1360 headers using that encoding while preserving the type of the header tuple. 1361 This ensures that the use of ``HeaderTuple`` is preserved. 1362 """ 1363 for header in headers: 1364 # This function expects to work on decoded headers, which are always 1365 # HeaderTuple objects. 1366 assert isinstance(header, HeaderTuple) 1367 1368 name, value = header 1369 name = name.decode(encoding) 1370 value = value.decode(encoding) 1371 yield header.__class__(name, value) 1372