1# -*- coding: utf-8 -*- 2""" 3h2/stream 4~~~~~~~~~ 5 6An implementation of a HTTP/2 stream. 7""" 8from enum import Enum, IntEnum 9from hpack import HeaderTuple 10from hyperframe.frame import ( 11 HeadersFrame, ContinuationFrame, DataFrame, WindowUpdateFrame, 12 RstStreamFrame, PushPromiseFrame, AltSvcFrame 13) 14 15from .errors import ErrorCodes, _error_code_from_int 16from .events import ( 17 RequestReceived, ResponseReceived, DataReceived, WindowUpdated, 18 StreamEnded, PushedStreamReceived, StreamReset, TrailersReceived, 19 InformationalResponseReceived, AlternativeServiceAvailable, 20 _ResponseSent, _RequestSent, _TrailersSent, _PushedRequestSent 21) 22from .exceptions import ( 23 ProtocolError, StreamClosedError, InvalidBodyLengthError, FlowControlError 24) 25from .utilities import ( 26 guard_increment_window, is_informational_response, authority_from_headers, 27 validate_headers, validate_outbound_headers, normalize_outbound_headers, 28 HeaderValidationFlags, extract_method_header, normalize_inbound_headers 29) 30from .windows import WindowManager 31 32 33class StreamState(IntEnum): 34 IDLE = 0 35 RESERVED_REMOTE = 1 36 RESERVED_LOCAL = 2 37 OPEN = 3 38 HALF_CLOSED_REMOTE = 4 39 HALF_CLOSED_LOCAL = 5 40 CLOSED = 6 41 42 43class StreamInputs(Enum): 44 SEND_HEADERS = 0 45 SEND_PUSH_PROMISE = 1 46 SEND_RST_STREAM = 2 47 SEND_DATA = 3 48 SEND_WINDOW_UPDATE = 4 49 SEND_END_STREAM = 5 50 RECV_HEADERS = 6 51 RECV_PUSH_PROMISE = 7 52 RECV_RST_STREAM = 8 53 RECV_DATA = 9 54 RECV_WINDOW_UPDATE = 10 55 RECV_END_STREAM = 11 56 RECV_CONTINUATION = 12 # Added in 2.0.0 57 SEND_INFORMATIONAL_HEADERS = 13 # Added in 2.2.0 58 RECV_INFORMATIONAL_HEADERS = 14 # Added in 2.2.0 59 SEND_ALTERNATIVE_SERVICE = 15 # Added in 2.3.0 60 RECV_ALTERNATIVE_SERVICE = 16 # Added in 2.3.0 61 UPGRADE_CLIENT = 17 # Added 2.3.0 62 UPGRADE_SERVER = 18 # Added 2.3.0 63 64 65class StreamClosedBy(Enum): 66 SEND_END_STREAM = 0 67 RECV_END_STREAM = 1 68 SEND_RST_STREAM = 2 69 RECV_RST_STREAM = 3 70 71 72# This array is initialized once, and is indexed by the stream states above. 73# It indicates whether a stream in the given state is open. The reason we do 74# this is that we potentially check whether a stream in a given state is open 75# quite frequently: given that we check so often, we should do so in the 76# fastest and most performant way possible. 77STREAM_OPEN = [False for _ in range(0, len(StreamState))] 78STREAM_OPEN[StreamState.OPEN] = True 79STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True 80STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True 81 82 83class H2StreamStateMachine(object): 84 """ 85 A single HTTP/2 stream state machine. 86 87 This stream object implements basically the state machine described in 88 RFC 7540 section 5.1. 89 90 :param stream_id: The stream ID of this stream. This is stored primarily 91 for logging purposes. 92 """ 93 def __init__(self, stream_id): 94 self.state = StreamState.IDLE 95 self.stream_id = stream_id 96 97 #: Whether this peer is the client side of this stream. 98 self.client = None 99 100 # Whether trailers have been sent/received on this stream or not. 101 self.headers_sent = None 102 self.trailers_sent = None 103 self.headers_received = None 104 self.trailers_received = None 105 106 # How the stream was closed. One of StreamClosedBy. 107 self.stream_closed_by = None 108 109 def process_input(self, input_): 110 """ 111 Process a specific input in the state machine. 112 """ 113 if not isinstance(input_, StreamInputs): 114 raise ValueError("Input must be an instance of StreamInputs") 115 116 try: 117 func, target_state = _transitions[(self.state, input_)] 118 except KeyError: 119 old_state = self.state 120 self.state = StreamState.CLOSED 121 raise ProtocolError( 122 "Invalid input %s in state %s" % (input_, old_state) 123 ) 124 else: 125 previous_state = self.state 126 self.state = target_state 127 if func is not None: 128 try: 129 return func(self, previous_state) 130 except ProtocolError: 131 self.state = StreamState.CLOSED 132 raise 133 except AssertionError as e: # pragma: no cover 134 self.state = StreamState.CLOSED 135 raise ProtocolError(e) 136 137 return [] 138 139 def request_sent(self, previous_state): 140 """ 141 Fires when a request is sent. 142 """ 143 self.client = True 144 self.headers_sent = True 145 event = _RequestSent() 146 147 return [event] 148 149 def response_sent(self, previous_state): 150 """ 151 Fires when something that should be a response is sent. This 'response' 152 may actually be trailers. 153 """ 154 if not self.headers_sent: 155 if self.client is True or self.client is None: 156 raise ProtocolError("Client cannot send responses.") 157 self.headers_sent = True 158 event = _ResponseSent() 159 else: 160 assert not self.trailers_sent 161 self.trailers_sent = True 162 event = _TrailersSent() 163 164 return [event] 165 166 def request_received(self, previous_state): 167 """ 168 Fires when a request is received. 169 """ 170 assert not self.headers_received 171 assert not self.trailers_received 172 173 self.client = False 174 self.headers_received = True 175 event = RequestReceived() 176 177 event.stream_id = self.stream_id 178 return [event] 179 180 def response_received(self, previous_state): 181 """ 182 Fires when a response is received. Also disambiguates between responses 183 and trailers. 184 """ 185 if not self.headers_received: 186 assert self.client is True 187 self.headers_received = True 188 event = ResponseReceived() 189 else: 190 assert not self.trailers_received 191 self.trailers_received = True 192 event = TrailersReceived() 193 194 event.stream_id = self.stream_id 195 return [event] 196 197 def data_received(self, previous_state): 198 """ 199 Fires when data is received. 200 """ 201 event = DataReceived() 202 event.stream_id = self.stream_id 203 return [event] 204 205 def window_updated(self, previous_state): 206 """ 207 Fires when a window update frame is received. 208 """ 209 event = WindowUpdated() 210 event.stream_id = self.stream_id 211 return [event] 212 213 def stream_half_closed(self, previous_state): 214 """ 215 Fires when an END_STREAM flag is received in the OPEN state, 216 transitioning this stream to a HALF_CLOSED_REMOTE state. 217 """ 218 event = StreamEnded() 219 event.stream_id = self.stream_id 220 return [event] 221 222 def stream_ended(self, previous_state): 223 """ 224 Fires when a stream is cleanly ended. 225 """ 226 self.stream_closed_by = StreamClosedBy.RECV_END_STREAM 227 event = StreamEnded() 228 event.stream_id = self.stream_id 229 return [event] 230 231 def stream_reset(self, previous_state): 232 """ 233 Fired when a stream is forcefully reset. 234 """ 235 self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM 236 event = StreamReset() 237 event.stream_id = self.stream_id 238 return [event] 239 240 def send_new_pushed_stream(self, previous_state): 241 """ 242 Fires on the newly pushed stream, when pushed by the local peer. 243 244 No event here, but definitionally this peer must be a server. 245 """ 246 assert self.client is None 247 self.client = False 248 self.headers_received = True 249 return [] 250 251 def recv_new_pushed_stream(self, previous_state): 252 """ 253 Fires on the newly pushed stream, when pushed by the remote peer. 254 255 No event here, but definitionally this peer must be a client. 256 """ 257 assert self.client is None 258 self.client = True 259 self.headers_sent = True 260 return [] 261 262 def send_push_promise(self, previous_state): 263 """ 264 Fires on the already-existing stream when a PUSH_PROMISE frame is sent. 265 We may only send PUSH_PROMISE frames if we're a server. 266 """ 267 if self.client is True: 268 raise ProtocolError("Cannot push streams from client peers.") 269 270 event = _PushedRequestSent() 271 return [event] 272 273 def recv_push_promise(self, previous_state): 274 """ 275 Fires on the already-existing stream when a PUSH_PROMISE frame is 276 received. We may only receive PUSH_PROMISE frames if we're a client. 277 278 Fires a PushedStreamReceived event. 279 """ 280 if not self.client: 281 if self.client is None: # pragma: no cover 282 msg = "Idle streams cannot receive pushes" 283 else: # pragma: no cover 284 msg = "Cannot receive pushed streams as a server" 285 raise ProtocolError(msg) 286 287 event = PushedStreamReceived() 288 event.parent_stream_id = self.stream_id 289 return [event] 290 291 def send_end_stream(self, previous_state): 292 """ 293 Called when an attempt is made to send END_STREAM in the 294 HALF_CLOSED_REMOTE state. 295 """ 296 self.stream_closed_by = StreamClosedBy.SEND_END_STREAM 297 298 def send_reset_stream(self, previous_state): 299 """ 300 Called when an attempt is made to send RST_STREAM in a non-closed 301 stream state. 302 """ 303 self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM 304 305 def reset_stream_on_error(self, previous_state): 306 """ 307 Called when we need to forcefully emit another RST_STREAM frame on 308 behalf of the state machine. 309 310 If this is the first time we've done this, we should also hang an event 311 off the StreamClosedError so that the user can be informed. We know 312 it's the first time we've done this if the stream is currently in a 313 state other than CLOSED. 314 """ 315 self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM 316 317 error = StreamClosedError(self.stream_id) 318 319 event = StreamReset() 320 event.stream_id = self.stream_id 321 event.error_code = ErrorCodes.STREAM_CLOSED 322 event.remote_reset = False 323 error._events = [event] 324 raise error 325 326 def recv_on_closed_stream(self, previous_state): 327 """ 328 Called when an unexpected frame is received on an already-closed 329 stream. 330 331 An endpoint that receives an unexpected frame should treat it as 332 a stream error or connection error with type STREAM_CLOSED, depending 333 on the specific frame. The error handling is done at a higher level: 334 this just raises the appropriate error. 335 """ 336 raise StreamClosedError(self.stream_id) 337 338 def send_on_closed_stream(self, previous_state): 339 """ 340 Called when an attempt is made to send data on an already-closed 341 stream. 342 343 This essentially overrides the standard logic by throwing a 344 more-specific error: StreamClosedError. This is a ProtocolError, so it 345 matches the standard API of the state machine, but provides more detail 346 to the user. 347 """ 348 raise StreamClosedError(self.stream_id) 349 350 def recv_push_on_closed_stream(self, previous_state): 351 """ 352 Called when a PUSH_PROMISE frame is received on a full stop 353 stream. 354 355 If the stream was closed by us sending a RST_STREAM frame, then we 356 presume that the PUSH_PROMISE was in flight when we reset the parent 357 stream. Rathen than accept the new stream, we just reset it. 358 Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a 359 naturally closed stream is a real problem because it creates a brand 360 new stream that the remote peer now believes exists. 361 """ 362 assert self.stream_closed_by is not None 363 364 if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM: 365 raise StreamClosedError(self.stream_id) 366 else: 367 raise ProtocolError("Attempted to push on closed stream.") 368 369 def send_push_on_closed_stream(self, previous_state): 370 """ 371 Called when an attempt is made to push on an already-closed stream. 372 373 This essentially overrides the standard logic by providing a more 374 useful error message. It's necessary because simply indicating that the 375 stream is closed is not enough: there is now a new stream that is not 376 allowed to be there. The only recourse is to tear the whole connection 377 down. 378 """ 379 raise ProtocolError("Attempted to push on closed stream.") 380 381 def send_informational_response(self, previous_state): 382 """ 383 Called when an informational header block is sent (that is, a block 384 where the :status header has a 1XX value). 385 386 Only enforces that these are sent *before* final headers are sent. 387 """ 388 if self.headers_sent: 389 raise ProtocolError("Information response after final response") 390 391 event = _ResponseSent() 392 return [event] 393 394 def recv_informational_response(self, previous_state): 395 """ 396 Called when an informational header block is received (that is, a block 397 where the :status header has a 1XX value). 398 """ 399 if self.headers_received: 400 raise ProtocolError("Informational response after final response") 401 402 event = InformationalResponseReceived() 403 event.stream_id = self.stream_id 404 return [event] 405 406 def recv_alt_svc(self, previous_state): 407 """ 408 Called when receiving an ALTSVC frame. 409 410 RFC 7838 allows us to receive ALTSVC frames at any stream state, which 411 is really absurdly overzealous. For that reason, we want to limit the 412 states in which we can actually receive it. It's really only sensible 413 to receive it after we've sent our own headers and before the server 414 has sent its header block: the server can't guarantee that we have any 415 state around after it completes its header block, and the server 416 doesn't know what origin we're talking about before we've sent ours. 417 418 For that reason, this function applies a few extra checks on both state 419 and some of the little state variables we keep around. If those suggest 420 an unreasonable situation for the ALTSVC frame to have been sent in, 421 we quietly ignore it (as RFC 7838 suggests). 422 423 This function is also *not* always called by the state machine. In some 424 states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it, 425 because we know the frame cannot be valid in that state (IDLE because 426 the server cannot know what origin the stream applies to, CLOSED 427 because the server cannot assume we still have state around, 428 RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL 429 state then *we* are the server). 430 """ 431 # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore 432 # them. 433 if self.client is False: 434 return [] 435 436 # If we've received the response headers from the server they can't 437 # guarantee we still have any state around. Other implementations 438 # (like nghttp2) ignore ALTSVC in this state, so we will too. 439 if self.headers_received: 440 return [] 441 442 # Otherwise, this is a sensible enough frame to have received. Return 443 # the event and let it get populated. 444 return [AlternativeServiceAvailable()] 445 446 def send_alt_svc(self, previous_state): 447 """ 448 Called when sending an ALTSVC frame on this stream. 449 450 For consistency with the restrictions we apply on receiving ALTSVC 451 frames in ``recv_alt_svc``, we want to restrict when users can send 452 ALTSVC frames to the situations when we ourselves would accept them. 453 454 That means: when we are a server, when we have received the request 455 headers, and when we have not yet sent our own response headers. 456 """ 457 # We should not send ALTSVC after we've sent response headers, as the 458 # client may have disposed of its state. 459 if self.headers_sent: 460 raise ProtocolError( 461 "Cannot send ALTSVC after sending response headers." 462 ) 463 464 return 465 466 467# STATE MACHINE 468# 469# The stream state machine is defined here to avoid the need to allocate it 470# repeatedly for each stream. It cannot be defined in the stream class because 471# it needs to be able to reference the callbacks defined on the class, but 472# because Python's scoping rules are weird the class object is not actually in 473# scope during the body of the class object. 474# 475# For the sake of clarity, we reproduce the RFC 7540 state machine here: 476# 477# +--------+ 478# send PP | | recv PP 479# ,--------| idle |--------. 480# / | | \ 481# v +--------+ v 482# +----------+ | +----------+ 483# | | | send H / | | 484# ,------| reserved | | recv H | reserved |------. 485# | | (local) | | | (remote) | | 486# | +----------+ v +----------+ | 487# | | +--------+ | | 488# | | recv ES | | send ES | | 489# | send H | ,-------| open |-------. | recv H | 490# | | / | | \ | | 491# | v v +--------+ v v | 492# | +----------+ | +----------+ | 493# | | half | | | half | | 494# | | closed | | send R / | closed | | 495# | | (remote) | | recv R | (local) | | 496# | +----------+ | +----------+ | 497# | | | | | 498# | | send ES / | recv ES / | | 499# | | send R / v send R / | | 500# | | recv R +--------+ recv R | | 501# | send R / `----------->| |<-----------' send R / | 502# | recv R | closed | recv R | 503# `----------------------->| |<----------------------' 504# +--------+ 505# 506# send: endpoint sends this frame 507# recv: endpoint receives this frame 508# 509# H: HEADERS frame (with implied CONTINUATIONs) 510# PP: PUSH_PROMISE frame (with implied CONTINUATIONs) 511# ES: END_STREAM flag 512# R: RST_STREAM frame 513# 514# For the purposes of this state machine we treat HEADERS and their 515# associated CONTINUATION frames as a single jumbo frame. The protocol 516# allows/requires this by preventing other frames from being interleved in 517# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is 518# received without a prior HEADERS frame, it *will* be passed to this state 519# machine. The state machine should always reject that frame, either as an 520# invalid transition or because the stream is closed. 521# 522# There is a confusing relationship around PUSH_PROMISE frames. The state 523# machine above considers them to be frames belonging to the new stream, 524# which is *somewhat* true. However, they are sent with the stream ID of 525# their related stream, and are only sendable in some cases. 526# For this reason, our state machine implementation below allows for 527# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also 528# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states. 529# Essentially, for hyper-h2, PUSH_PROMISE frames are effectively sent on 530# two streams. 531# 532# The _transitions dictionary contains a mapping of tuples of 533# (state, input) to tuples of (side_effect_function, end_state). This 534# map contains all allowed transitions: anything not in this map is 535# invalid and immediately causes a transition to ``closed``. 536_transitions = { 537 # State: idle 538 (StreamState.IDLE, StreamInputs.SEND_HEADERS): 539 (H2StreamStateMachine.request_sent, StreamState.OPEN), 540 (StreamState.IDLE, StreamInputs.RECV_HEADERS): 541 (H2StreamStateMachine.request_received, StreamState.OPEN), 542 (StreamState.IDLE, StreamInputs.RECV_DATA): 543 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 544 (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE): 545 (H2StreamStateMachine.send_new_pushed_stream, 546 StreamState.RESERVED_LOCAL), 547 (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE): 548 (H2StreamStateMachine.recv_new_pushed_stream, 549 StreamState.RESERVED_REMOTE), 550 (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE): 551 (None, StreamState.IDLE), 552 (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT): 553 (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL), 554 (StreamState.IDLE, StreamInputs.UPGRADE_SERVER): 555 (H2StreamStateMachine.request_received, 556 StreamState.HALF_CLOSED_REMOTE), 557 558 # State: reserved local 559 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS): 560 (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), 561 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA): 562 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 563 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): 564 (None, StreamState.RESERVED_LOCAL), 565 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): 566 (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL), 567 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM): 568 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 569 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM): 570 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 571 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): 572 (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL), 573 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): 574 (None, StreamState.RESERVED_LOCAL), 575 576 # State: reserved remote 577 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS): 578 (H2StreamStateMachine.response_received, 579 StreamState.HALF_CLOSED_LOCAL), 580 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA): 581 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 582 (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): 583 (None, StreamState.RESERVED_REMOTE), 584 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): 585 (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE), 586 (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM): 587 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 588 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM): 589 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 590 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): 591 (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE), 592 593 # State: open 594 (StreamState.OPEN, StreamInputs.SEND_HEADERS): 595 (H2StreamStateMachine.response_sent, StreamState.OPEN), 596 (StreamState.OPEN, StreamInputs.RECV_HEADERS): 597 (H2StreamStateMachine.response_received, StreamState.OPEN), 598 (StreamState.OPEN, StreamInputs.SEND_DATA): 599 (None, StreamState.OPEN), 600 (StreamState.OPEN, StreamInputs.RECV_DATA): 601 (H2StreamStateMachine.data_received, StreamState.OPEN), 602 (StreamState.OPEN, StreamInputs.SEND_END_STREAM): 603 (None, StreamState.HALF_CLOSED_LOCAL), 604 (StreamState.OPEN, StreamInputs.RECV_END_STREAM): 605 (H2StreamStateMachine.stream_half_closed, 606 StreamState.HALF_CLOSED_REMOTE), 607 (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE): 608 (None, StreamState.OPEN), 609 (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE): 610 (H2StreamStateMachine.window_updated, StreamState.OPEN), 611 (StreamState.OPEN, StreamInputs.SEND_RST_STREAM): 612 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 613 (StreamState.OPEN, StreamInputs.RECV_RST_STREAM): 614 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 615 (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE): 616 (H2StreamStateMachine.send_push_promise, StreamState.OPEN), 617 (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE): 618 (H2StreamStateMachine.recv_push_promise, StreamState.OPEN), 619 (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS): 620 (H2StreamStateMachine.send_informational_response, StreamState.OPEN), 621 (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS): 622 (H2StreamStateMachine.recv_informational_response, StreamState.OPEN), 623 (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE): 624 (H2StreamStateMachine.send_alt_svc, StreamState.OPEN), 625 (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE): 626 (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN), 627 628 # State: half-closed remote 629 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS): 630 (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), 631 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS): 632 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 633 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA): 634 (None, StreamState.HALF_CLOSED_REMOTE), 635 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA): 636 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 637 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM): 638 (H2StreamStateMachine.send_end_stream, StreamState.CLOSED), 639 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): 640 (None, StreamState.HALF_CLOSED_REMOTE), 641 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): 642 (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE), 643 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM): 644 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 645 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM): 646 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 647 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE): 648 (H2StreamStateMachine.send_push_promise, 649 StreamState.HALF_CLOSED_REMOTE), 650 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE): 651 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 652 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS): 653 (H2StreamStateMachine.send_informational_response, 654 StreamState.HALF_CLOSED_REMOTE), 655 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE): 656 (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE), 657 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): 658 (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE), 659 660 # State: half-closed local 661 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS): 662 (H2StreamStateMachine.response_received, 663 StreamState.HALF_CLOSED_LOCAL), 664 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA): 665 (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL), 666 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM): 667 (H2StreamStateMachine.stream_ended, StreamState.CLOSED), 668 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): 669 (None, StreamState.HALF_CLOSED_LOCAL), 670 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): 671 (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL), 672 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM): 673 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 674 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM): 675 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 676 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE): 677 (H2StreamStateMachine.recv_push_promise, 678 StreamState.HALF_CLOSED_LOCAL), 679 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS): 680 (H2StreamStateMachine.recv_informational_response, 681 StreamState.HALF_CLOSED_LOCAL), 682 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): 683 (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL), 684 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): 685 (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL), 686 687 # State: closed 688 (StreamState.CLOSED, StreamInputs.RECV_END_STREAM): 689 (None, StreamState.CLOSED), 690 (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE): 691 (None, StreamState.CLOSED), 692 693 # RFC 7540 Section 5.1 defines how the end point should react when 694 # receiving a frame on a closed stream with the following statements: 695 # 696 # > An endpoint that receives any frame other than PRIORITY after receiving 697 # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED. 698 # > An endpoint that receives any frames after receiving a frame with the 699 # > END_STREAM flag set MUST treat that as a connection error of type 700 # > STREAM_CLOSED. 701 (StreamState.CLOSED, StreamInputs.RECV_HEADERS): 702 (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), 703 (StreamState.CLOSED, StreamInputs.RECV_DATA): 704 (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), 705 706 # > WINDOW_UPDATE or RST_STREAM frames can be received in this state 707 # > for a short period after a DATA or HEADERS frame containing a 708 # > END_STREAM flag is sent, as instructed in RFC 7540 Section 5.1. But we 709 # > don't have access to a clock so we just always allow it. 710 (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE): 711 (None, StreamState.CLOSED), 712 (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM): 713 (None, StreamState.CLOSED), 714 715 # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is 716 # > neither "open" nor "half-closed (local)" as a connection error of type 717 # > PROTOCOL_ERROR. 718 (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE): 719 (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED), 720 721 # Also, users should be forbidden from sending on closed streams. 722 (StreamState.CLOSED, StreamInputs.SEND_HEADERS): 723 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 724 (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE): 725 (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED), 726 (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM): 727 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 728 (StreamState.CLOSED, StreamInputs.SEND_DATA): 729 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 730 (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE): 731 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 732 (StreamState.CLOSED, StreamInputs.SEND_END_STREAM): 733 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 734} 735 736 737class H2Stream(object): 738 """ 739 A low-level HTTP/2 stream object. This handles building and receiving 740 frames and maintains per-stream state. 741 742 This wraps a HTTP/2 Stream state machine implementation, ensuring that 743 frames can only be sent/received when the stream is in a valid state. 744 Attempts to create frames that cannot be sent will raise a 745 ``ProtocolError``. 746 """ 747 def __init__(self, 748 stream_id, 749 config, 750 inbound_window_size, 751 outbound_window_size): 752 self.state_machine = H2StreamStateMachine(stream_id) 753 self.stream_id = stream_id 754 self.max_outbound_frame_size = None 755 self.request_method = None 756 757 # The current value of the outbound stream flow control window 758 self.outbound_flow_control_window = outbound_window_size 759 760 # The flow control manager. 761 self._inbound_window_manager = WindowManager(inbound_window_size) 762 763 # The expected content length, if any. 764 self._expected_content_length = None 765 766 # The actual received content length. Always tracked. 767 self._actual_content_length = 0 768 769 # The authority we believe this stream belongs to. 770 self._authority = None 771 772 # The configuration for this stream. 773 self.config = config 774 775 def __repr__(self): 776 return "<%s id:%d state:%r>" % ( 777 type(self).__name__, 778 self.stream_id, 779 self.state_machine.state 780 ) 781 782 @property 783 def inbound_flow_control_window(self): 784 """ 785 The size of the inbound flow control window for the stream. This is 786 rarely publicly useful: instead, use :meth:`remote_flow_control_window 787 <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is 788 largely present to provide a shortcut to this data. 789 """ 790 return self._inbound_window_manager.current_window_size 791 792 @property 793 def open(self): 794 """ 795 Whether the stream is 'open' in any sense: that is, whether it counts 796 against the number of concurrent streams. 797 """ 798 # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either 799 # the OPEN state or either of the HALF_CLOSED states. Perplexingly, 800 # this excludes the reserved states. 801 # For more detail on why we're doing this in this slightly weird way, 802 # see the comment on ``STREAM_OPEN`` at the top of the file. 803 return STREAM_OPEN[self.state_machine.state] 804 805 @property 806 def closed(self): 807 """ 808 Whether the stream is closed. 809 """ 810 return self.state_machine.state == StreamState.CLOSED 811 812 @property 813 def closed_by(self): 814 """ 815 Returns how the stream was closed, as one of StreamClosedBy. 816 """ 817 return self.state_machine.stream_closed_by 818 819 def upgrade(self, client_side): 820 """ 821 Called by the connection to indicate that this stream is the initial 822 request/response of an upgraded connection. Places the stream into an 823 appropriate state. 824 """ 825 self.config.logger.debug("Upgrading %r", self) 826 827 assert self.stream_id == 1 828 input_ = ( 829 StreamInputs.UPGRADE_CLIENT if client_side 830 else StreamInputs.UPGRADE_SERVER 831 ) 832 833 # This may return events, we deliberately don't want them. 834 self.state_machine.process_input(input_) 835 return 836 837 def send_headers(self, headers, encoder, end_stream=False): 838 """ 839 Returns a list of HEADERS/CONTINUATION frames to emit as either headers 840 or trailers. 841 """ 842 self.config.logger.debug("Send headers %s on %r", headers, self) 843 844 # Because encoding headers makes an irreversible change to the header 845 # compression context, we make the state transition before we encode 846 # them. 847 848 # First, check if we're a client. If we are, no problem: if we aren't, 849 # we need to scan the header block to see if this is an informational 850 # response. 851 input_ = StreamInputs.SEND_HEADERS 852 if ((not self.state_machine.client) and 853 is_informational_response(headers)): 854 if end_stream: 855 raise ProtocolError( 856 "Cannot set END_STREAM on informational responses." 857 ) 858 859 input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS 860 861 events = self.state_machine.process_input(input_) 862 863 hf = HeadersFrame(self.stream_id) 864 hdr_validation_flags = self._build_hdr_validation_flags(events) 865 frames = self._build_headers_frames( 866 headers, encoder, hf, hdr_validation_flags 867 ) 868 869 if end_stream: 870 # Not a bug: the END_STREAM flag is valid on the initial HEADERS 871 # frame, not the CONTINUATION frames that follow. 872 self.state_machine.process_input(StreamInputs.SEND_END_STREAM) 873 frames[0].flags.add('END_STREAM') 874 875 if self.state_machine.trailers_sent and not end_stream: 876 raise ProtocolError("Trailers must have END_STREAM set.") 877 878 if self.state_machine.client and self._authority is None: 879 self._authority = authority_from_headers(headers) 880 881 # store request method for _initialize_content_length 882 self.request_method = extract_method_header(headers) 883 884 return frames 885 886 def push_stream_in_band(self, related_stream_id, headers, encoder): 887 """ 888 Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed 889 stream header. Called on the stream that has the PUSH_PROMISE frame 890 sent on it. 891 """ 892 self.config.logger.debug("Push stream %r", self) 893 894 # Because encoding headers makes an irreversible change to the header 895 # compression context, we make the state transition *first*. 896 897 events = self.state_machine.process_input( 898 StreamInputs.SEND_PUSH_PROMISE 899 ) 900 901 ppf = PushPromiseFrame(self.stream_id) 902 ppf.promised_stream_id = related_stream_id 903 hdr_validation_flags = self._build_hdr_validation_flags(events) 904 frames = self._build_headers_frames( 905 headers, encoder, ppf, hdr_validation_flags 906 ) 907 908 return frames 909 910 def locally_pushed(self): 911 """ 912 Mark this stream as one that was pushed by this peer. Must be called 913 immediately after initialization. Sends no frames, simply updates the 914 state machine. 915 """ 916 # This does not trigger any events. 917 events = self.state_machine.process_input( 918 StreamInputs.SEND_PUSH_PROMISE 919 ) 920 assert not events 921 return [] 922 923 def send_data(self, data, end_stream=False, pad_length=None): 924 """ 925 Prepare some data frames. Optionally end the stream. 926 927 .. warning:: Does not perform flow control checks. 928 """ 929 self.config.logger.debug( 930 "Send data on %r with end stream set to %s", self, end_stream 931 ) 932 933 self.state_machine.process_input(StreamInputs.SEND_DATA) 934 935 df = DataFrame(self.stream_id) 936 df.data = data 937 if end_stream: 938 self.state_machine.process_input(StreamInputs.SEND_END_STREAM) 939 df.flags.add('END_STREAM') 940 if pad_length is not None: 941 df.flags.add('PADDED') 942 df.pad_length = pad_length 943 944 # Subtract flow_controlled_length to account for possible padding 945 self.outbound_flow_control_window -= df.flow_controlled_length 946 assert self.outbound_flow_control_window >= 0 947 948 return [df] 949 950 def end_stream(self): 951 """ 952 End a stream without sending data. 953 """ 954 self.config.logger.debug("End stream %r", self) 955 956 self.state_machine.process_input(StreamInputs.SEND_END_STREAM) 957 df = DataFrame(self.stream_id) 958 df.flags.add('END_STREAM') 959 return [df] 960 961 def advertise_alternative_service(self, field_value): 962 """ 963 Advertise an RFC 7838 alternative service. The semantics of this are 964 better documented in the ``H2Connection`` class. 965 """ 966 self.config.logger.debug( 967 "Advertise alternative service of %r for %r", field_value, self 968 ) 969 self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE) 970 asf = AltSvcFrame(self.stream_id) 971 asf.field = field_value 972 return [asf] 973 974 def increase_flow_control_window(self, increment): 975 """ 976 Increase the size of the flow control window for the remote side. 977 """ 978 self.config.logger.debug( 979 "Increase flow control window for %r by %d", 980 self, increment 981 ) 982 self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE) 983 self._inbound_window_manager.window_opened(increment) 984 985 wuf = WindowUpdateFrame(self.stream_id) 986 wuf.window_increment = increment 987 return [wuf] 988 989 def receive_push_promise_in_band(self, 990 promised_stream_id, 991 headers, 992 header_encoding): 993 """ 994 Receives a push promise frame sent on this stream, pushing a remote 995 stream. This is called on the stream that has the PUSH_PROMISE sent 996 on it. 997 """ 998 self.config.logger.debug( 999 "Receive Push Promise on %r for remote stream %d", 1000 self, promised_stream_id 1001 ) 1002 events = self.state_machine.process_input( 1003 StreamInputs.RECV_PUSH_PROMISE 1004 ) 1005 events[0].pushed_stream_id = promised_stream_id 1006 1007 hdr_validation_flags = self._build_hdr_validation_flags(events) 1008 events[0].headers = self._process_received_headers( 1009 headers, hdr_validation_flags, header_encoding 1010 ) 1011 return [], events 1012 1013 def remotely_pushed(self, pushed_headers): 1014 """ 1015 Mark this stream as one that was pushed by the remote peer. Must be 1016 called immediately after initialization. Sends no frames, simply 1017 updates the state machine. 1018 """ 1019 self.config.logger.debug("%r pushed by remote peer", self) 1020 events = self.state_machine.process_input( 1021 StreamInputs.RECV_PUSH_PROMISE 1022 ) 1023 self._authority = authority_from_headers(pushed_headers) 1024 return [], events 1025 1026 def receive_headers(self, headers, end_stream, header_encoding): 1027 """ 1028 Receive a set of headers (or trailers). 1029 """ 1030 if is_informational_response(headers): 1031 if end_stream: 1032 raise ProtocolError( 1033 "Cannot set END_STREAM on informational responses" 1034 ) 1035 input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS 1036 else: 1037 input_ = StreamInputs.RECV_HEADERS 1038 1039 events = self.state_machine.process_input(input_) 1040 1041 if end_stream: 1042 es_events = self.state_machine.process_input( 1043 StreamInputs.RECV_END_STREAM 1044 ) 1045 events[0].stream_ended = es_events[0] 1046 events += es_events 1047 1048 self._initialize_content_length(headers) 1049 1050 if isinstance(events[0], TrailersReceived): 1051 if not end_stream: 1052 raise ProtocolError("Trailers must have END_STREAM set") 1053 1054 hdr_validation_flags = self._build_hdr_validation_flags(events) 1055 events[0].headers = self._process_received_headers( 1056 headers, hdr_validation_flags, header_encoding 1057 ) 1058 return [], events 1059 1060 def receive_data(self, data, end_stream, flow_control_len): 1061 """ 1062 Receive some data. 1063 """ 1064 self.config.logger.debug( 1065 "Receive data on %r with end stream %s and flow control length " 1066 "set to %d", self, end_stream, flow_control_len 1067 ) 1068 events = self.state_machine.process_input(StreamInputs.RECV_DATA) 1069 self._inbound_window_manager.window_consumed(flow_control_len) 1070 self._track_content_length(len(data), end_stream) 1071 1072 if end_stream: 1073 es_events = self.state_machine.process_input( 1074 StreamInputs.RECV_END_STREAM 1075 ) 1076 events[0].stream_ended = es_events[0] 1077 events.extend(es_events) 1078 1079 events[0].data = data 1080 events[0].flow_controlled_length = flow_control_len 1081 return [], events 1082 1083 def receive_window_update(self, increment): 1084 """ 1085 Handle a WINDOW_UPDATE increment. 1086 """ 1087 self.config.logger.debug( 1088 "Receive Window Update on %r for increment of %d", 1089 self, increment 1090 ) 1091 events = self.state_machine.process_input( 1092 StreamInputs.RECV_WINDOW_UPDATE 1093 ) 1094 frames = [] 1095 1096 # If we encounter a problem with incrementing the flow control window, 1097 # this should be treated as a *stream* error, not a *connection* error. 1098 # That means we need to catch the error and forcibly close the stream. 1099 if events: 1100 events[0].delta = increment 1101 try: 1102 self.outbound_flow_control_window = guard_increment_window( 1103 self.outbound_flow_control_window, 1104 increment 1105 ) 1106 except FlowControlError: 1107 # Ok, this is bad. We're going to need to perform a local 1108 # reset. 1109 event = StreamReset() 1110 event.stream_id = self.stream_id 1111 event.error_code = ErrorCodes.FLOW_CONTROL_ERROR 1112 event.remote_reset = False 1113 1114 events = [event] 1115 frames = self.reset_stream(event.error_code) 1116 1117 return frames, events 1118 1119 def receive_continuation(self): 1120 """ 1121 A naked CONTINUATION frame has been received. This is always an error, 1122 but the type of error it is depends on the state of the stream and must 1123 transition the state of the stream, so we need to handle it. 1124 """ 1125 self.config.logger.debug("Receive Continuation frame on %r", self) 1126 self.state_machine.process_input( 1127 StreamInputs.RECV_CONTINUATION 1128 ) 1129 assert False, "Should not be reachable" 1130 1131 def receive_alt_svc(self, frame): 1132 """ 1133 An Alternative Service frame was received on the stream. This frame 1134 inherits the origin associated with this stream. 1135 """ 1136 self.config.logger.debug( 1137 "Receive Alternative Service frame on stream %r", self 1138 ) 1139 1140 # If the origin is present, RFC 7838 says we have to ignore it. 1141 if frame.origin: 1142 return [], [] 1143 1144 events = self.state_machine.process_input( 1145 StreamInputs.RECV_ALTERNATIVE_SERVICE 1146 ) 1147 1148 # There are lots of situations where we want to ignore the ALTSVC 1149 # frame. If we need to pay attention, we'll have an event and should 1150 # fill it out. 1151 if events: 1152 assert isinstance(events[0], AlternativeServiceAvailable) 1153 events[0].origin = self._authority 1154 events[0].field_value = frame.field 1155 1156 return [], events 1157 1158 def reset_stream(self, error_code=0): 1159 """ 1160 Close the stream locally. Reset the stream with an error code. 1161 """ 1162 self.config.logger.debug( 1163 "Local reset %r with error code: %d", self, error_code 1164 ) 1165 self.state_machine.process_input(StreamInputs.SEND_RST_STREAM) 1166 1167 rsf = RstStreamFrame(self.stream_id) 1168 rsf.error_code = error_code 1169 return [rsf] 1170 1171 def stream_reset(self, frame): 1172 """ 1173 Handle a stream being reset remotely. 1174 """ 1175 self.config.logger.debug( 1176 "Remote reset %r with error code: %d", self, frame.error_code 1177 ) 1178 events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM) 1179 1180 if events: 1181 # We don't fire an event if this stream is already closed. 1182 events[0].error_code = _error_code_from_int(frame.error_code) 1183 1184 return [], events 1185 1186 def acknowledge_received_data(self, acknowledged_size): 1187 """ 1188 The user has informed us that they've processed some amount of data 1189 that was received on this stream. Pass that to the window manager and 1190 potentially return some WindowUpdate frames. 1191 """ 1192 self.config.logger.debug( 1193 "Acknowledge received data with size %d on %r", 1194 acknowledged_size, self 1195 ) 1196 increment = self._inbound_window_manager.process_bytes( 1197 acknowledged_size 1198 ) 1199 if increment: 1200 f = WindowUpdateFrame(self.stream_id) 1201 f.window_increment = increment 1202 return [f] 1203 1204 return [] 1205 1206 def _build_hdr_validation_flags(self, events): 1207 """ 1208 Constructs a set of header validation flags for use when normalizing 1209 and validating header blocks. 1210 """ 1211 is_trailer = isinstance( 1212 events[0], (_TrailersSent, TrailersReceived) 1213 ) 1214 is_response_header = isinstance( 1215 events[0], 1216 ( 1217 _ResponseSent, 1218 ResponseReceived, 1219 InformationalResponseReceived 1220 ) 1221 ) 1222 is_push_promise = isinstance( 1223 events[0], (PushedStreamReceived, _PushedRequestSent) 1224 ) 1225 1226 return HeaderValidationFlags( 1227 is_client=self.state_machine.client, 1228 is_trailer=is_trailer, 1229 is_response_header=is_response_header, 1230 is_push_promise=is_push_promise, 1231 ) 1232 1233 def _build_headers_frames(self, 1234 headers, 1235 encoder, 1236 first_frame, 1237 hdr_validation_flags): 1238 """ 1239 Helper method to build headers or push promise frames. 1240 """ 1241 # We need to lowercase the header names, and to ensure that secure 1242 # header fields are kept out of compression contexts. 1243 if self.config.normalize_outbound_headers: 1244 headers = normalize_outbound_headers( 1245 headers, hdr_validation_flags 1246 ) 1247 if self.config.validate_outbound_headers: 1248 headers = validate_outbound_headers( 1249 headers, hdr_validation_flags 1250 ) 1251 1252 encoded_headers = encoder.encode(headers) 1253 1254 # Slice into blocks of max_outbound_frame_size. Be careful with this: 1255 # it only works right because we never send padded frames or priority 1256 # information on the frames. Revisit this if we do. 1257 header_blocks = [ 1258 encoded_headers[i:i+self.max_outbound_frame_size] 1259 for i in range( 1260 0, len(encoded_headers), self.max_outbound_frame_size 1261 ) 1262 ] 1263 1264 frames = [] 1265 first_frame.data = header_blocks[0] 1266 frames.append(first_frame) 1267 1268 for block in header_blocks[1:]: 1269 cf = ContinuationFrame(self.stream_id) 1270 cf.data = block 1271 frames.append(cf) 1272 1273 frames[-1].flags.add('END_HEADERS') 1274 return frames 1275 1276 def _process_received_headers(self, 1277 headers, 1278 header_validation_flags, 1279 header_encoding): 1280 """ 1281 When headers have been received from the remote peer, run a processing 1282 pipeline on them to transform them into the appropriate form for 1283 attaching to an event. 1284 """ 1285 if self.config.normalize_inbound_headers: 1286 headers = normalize_inbound_headers( 1287 headers, header_validation_flags 1288 ) 1289 1290 if self.config.validate_inbound_headers: 1291 headers = validate_headers(headers, header_validation_flags) 1292 1293 if header_encoding: 1294 headers = _decode_headers(headers, header_encoding) 1295 1296 # The above steps are all generators, so we need to concretize the 1297 # headers now. 1298 return list(headers) 1299 1300 def _initialize_content_length(self, headers): 1301 """ 1302 Checks the headers for a content-length header and initializes the 1303 _expected_content_length field from it. It's not an error for no 1304 Content-Length header to be present. 1305 """ 1306 if self.request_method == b'HEAD': 1307 self._expected_content_length = 0 1308 return 1309 1310 for n, v in headers: 1311 if n == b'content-length': 1312 try: 1313 self._expected_content_length = int(v, 10) 1314 except ValueError: 1315 raise ProtocolError( 1316 "Invalid content-length header: %s" % v 1317 ) 1318 1319 return 1320 1321 def _track_content_length(self, length, end_stream): 1322 """ 1323 Update the expected content length in response to data being received. 1324 Validates that the appropriate amount of data is sent. Always updates 1325 the received data, but only validates the length against the 1326 content-length header if one was sent. 1327 1328 :param length: The length of the body chunk received. 1329 :param end_stream: If this is the last body chunk received. 1330 """ 1331 self._actual_content_length += length 1332 actual = self._actual_content_length 1333 expected = self._expected_content_length 1334 1335 if expected is not None: 1336 if expected < actual: 1337 raise InvalidBodyLengthError(expected, actual) 1338 1339 if end_stream and expected != actual: 1340 raise InvalidBodyLengthError(expected, actual) 1341 1342 def _inbound_flow_control_change_from_settings(self, delta): 1343 """ 1344 We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to 1345 update the target window size for flow control. For our flow control 1346 strategy, this means we need to do two things: we need to adjust the 1347 current window size, but we also need to set the target maximum window 1348 size to the new value. 1349 """ 1350 new_max_size = self._inbound_window_manager.max_window_size + delta 1351 self._inbound_window_manager.window_opened(delta) 1352 self._inbound_window_manager.max_window_size = new_max_size 1353 1354 1355def _decode_headers(headers, encoding): 1356 """ 1357 Given an iterable of header two-tuples and an encoding, decodes those 1358 headers using that encoding while preserving the type of the header tuple. 1359 This ensures that the use of ``HeaderTuple`` is preserved. 1360 """ 1361 for header in headers: 1362 # This function expects to work on decoded headers, which are always 1363 # HeaderTuple objects. 1364 assert isinstance(header, HeaderTuple) 1365 1366 name, value = header 1367 name = name.decode(encoding) 1368 value = value.decode(encoding) 1369 yield header.__class__(name, value) 1370