1# -*- coding: utf-8 -*- 2""" 3h2/stream 4~~~~~~~~~ 5 6An implementation of a HTTP/2 stream. 7""" 8from enum import Enum, IntEnum 9from hpack import HeaderTuple 10from hyperframe.frame import ( 11 HeadersFrame, ContinuationFrame, DataFrame, WindowUpdateFrame, 12 RstStreamFrame, PushPromiseFrame, AltSvcFrame 13) 14 15from .errors import ErrorCodes, _error_code_from_int 16from .events import ( 17 RequestReceived, ResponseReceived, DataReceived, WindowUpdated, 18 StreamEnded, PushedStreamReceived, StreamReset, TrailersReceived, 19 InformationalResponseReceived, AlternativeServiceAvailable, 20 _ResponseSent, _RequestSent, _TrailersSent, _PushedRequestSent 21) 22from .exceptions import ( 23 ProtocolError, StreamClosedError, InvalidBodyLengthError, FlowControlError 24) 25from .utilities import ( 26 guard_increment_window, is_informational_response, authority_from_headers, 27 validate_headers, validate_outbound_headers, normalize_outbound_headers, 28 HeaderValidationFlags, extract_method_header, normalize_inbound_headers 29) 30from .windows import WindowManager 31 32 33class StreamState(IntEnum): 34 IDLE = 0 35 RESERVED_REMOTE = 1 36 RESERVED_LOCAL = 2 37 OPEN = 3 38 HALF_CLOSED_REMOTE = 4 39 HALF_CLOSED_LOCAL = 5 40 CLOSED = 6 41 42 43class StreamInputs(Enum): 44 SEND_HEADERS = 0 45 SEND_PUSH_PROMISE = 1 46 SEND_RST_STREAM = 2 47 SEND_DATA = 3 48 SEND_WINDOW_UPDATE = 4 49 SEND_END_STREAM = 5 50 RECV_HEADERS = 6 51 RECV_PUSH_PROMISE = 7 52 RECV_RST_STREAM = 8 53 RECV_DATA = 9 54 RECV_WINDOW_UPDATE = 10 55 RECV_END_STREAM = 11 56 RECV_CONTINUATION = 12 # Added in 2.0.0 57 SEND_INFORMATIONAL_HEADERS = 13 # Added in 2.2.0 58 RECV_INFORMATIONAL_HEADERS = 14 # Added in 2.2.0 59 SEND_ALTERNATIVE_SERVICE = 15 # Added in 2.3.0 60 RECV_ALTERNATIVE_SERVICE = 16 # Added in 2.3.0 61 UPGRADE_CLIENT = 17 # Added 2.3.0 62 UPGRADE_SERVER = 18 # Added 2.3.0 63 64 65class StreamClosedBy(Enum): 66 SEND_END_STREAM = 0 67 RECV_END_STREAM = 1 68 SEND_RST_STREAM = 2 69 RECV_RST_STREAM = 3 70 71 72# This array is initialized once, and is indexed by the stream states above. 73# It indicates whether a stream in the given state is open. The reason we do 74# this is that we potentially check whether a stream in a given state is open 75# quite frequently: given that we check so often, we should do so in the 76# fastest and most performant way possible. 77STREAM_OPEN = [False for _ in range(0, len(StreamState))] 78STREAM_OPEN[StreamState.OPEN] = True 79STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True 80STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True 81 82 83class H2StreamStateMachine(object): 84 """ 85 A single HTTP/2 stream state machine. 86 87 This stream object implements basically the state machine described in 88 RFC 7540 section 5.1. 89 90 :param stream_id: The stream ID of this stream. This is stored primarily 91 for logging purposes. 92 """ 93 def __init__(self, stream_id): 94 self.state = StreamState.IDLE 95 self.stream_id = stream_id 96 97 #: Whether this peer is the client side of this stream. 98 self.client = None 99 100 # Whether trailers have been sent/received on this stream or not. 101 self.headers_sent = None 102 self.trailers_sent = None 103 self.headers_received = None 104 self.trailers_received = None 105 106 # How the stream was closed. One of StreamClosedBy. 107 self.stream_closed_by = None 108 109 def process_input(self, input_): 110 """ 111 Process a specific input in the state machine. 112 """ 113 if not isinstance(input_, StreamInputs): 114 raise ValueError("Input must be an instance of StreamInputs") 115 116 try: 117 func, target_state = _transitions[(self.state, input_)] 118 except KeyError: 119 old_state = self.state 120 self.state = StreamState.CLOSED 121 raise ProtocolError( 122 "Invalid input %s in state %s" % (input_, old_state) 123 ) 124 else: 125 previous_state = self.state 126 self.state = target_state 127 if func is not None: 128 try: 129 return func(self, previous_state) 130 except ProtocolError: 131 self.state = StreamState.CLOSED 132 raise 133 except AssertionError as e: # pragma: no cover 134 self.state = StreamState.CLOSED 135 raise ProtocolError(e) 136 137 return [] 138 139 def request_sent(self, previous_state): 140 """ 141 Fires when a request is sent. 142 """ 143 self.client = True 144 self.headers_sent = True 145 event = _RequestSent() 146 147 return [event] 148 149 def response_sent(self, previous_state): 150 """ 151 Fires when something that should be a response is sent. This 'response' 152 may actually be trailers. 153 """ 154 if not self.headers_sent: 155 if self.client is True or self.client is None: 156 raise ProtocolError("Client cannot send responses.") 157 self.headers_sent = True 158 event = _ResponseSent() 159 else: 160 assert not self.trailers_sent 161 self.trailers_sent = True 162 event = _TrailersSent() 163 164 return [event] 165 166 def request_received(self, previous_state): 167 """ 168 Fires when a request is received. 169 """ 170 assert not self.headers_received 171 assert not self.trailers_received 172 173 self.client = False 174 self.headers_received = True 175 event = RequestReceived() 176 177 event.stream_id = self.stream_id 178 return [event] 179 180 def response_received(self, previous_state): 181 """ 182 Fires when a response is received. Also disambiguates between responses 183 and trailers. 184 """ 185 if not self.headers_received: 186 assert self.client is True 187 self.headers_received = True 188 event = ResponseReceived() 189 else: 190 assert not self.trailers_received 191 self.trailers_received = True 192 event = TrailersReceived() 193 194 event.stream_id = self.stream_id 195 return [event] 196 197 def data_received(self, previous_state): 198 """ 199 Fires when data is received. 200 """ 201 event = DataReceived() 202 event.stream_id = self.stream_id 203 return [event] 204 205 def window_updated(self, previous_state): 206 """ 207 Fires when a window update frame is received. 208 """ 209 event = WindowUpdated() 210 event.stream_id = self.stream_id 211 return [event] 212 213 def stream_half_closed(self, previous_state): 214 """ 215 Fires when an END_STREAM flag is received in the OPEN state, 216 transitioning this stream to a HALF_CLOSED_REMOTE state. 217 """ 218 event = StreamEnded() 219 event.stream_id = self.stream_id 220 return [event] 221 222 def stream_ended(self, previous_state): 223 """ 224 Fires when a stream is cleanly ended. 225 """ 226 self.stream_closed_by = StreamClosedBy.RECV_END_STREAM 227 event = StreamEnded() 228 event.stream_id = self.stream_id 229 return [event] 230 231 def stream_reset(self, previous_state): 232 """ 233 Fired when a stream is forcefully reset. 234 """ 235 self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM 236 event = StreamReset() 237 event.stream_id = self.stream_id 238 return [event] 239 240 def send_new_pushed_stream(self, previous_state): 241 """ 242 Fires on the newly pushed stream, when pushed by the local peer. 243 244 No event here, but definitionally this peer must be a server. 245 """ 246 assert self.client is None 247 self.client = False 248 self.headers_received = True 249 return [] 250 251 def recv_new_pushed_stream(self, previous_state): 252 """ 253 Fires on the newly pushed stream, when pushed by the remote peer. 254 255 No event here, but definitionally this peer must be a client. 256 """ 257 assert self.client is None 258 self.client = True 259 self.headers_sent = True 260 return [] 261 262 def send_push_promise(self, previous_state): 263 """ 264 Fires on the already-existing stream when a PUSH_PROMISE frame is sent. 265 We may only send PUSH_PROMISE frames if we're a server. 266 """ 267 if self.client is True: 268 raise ProtocolError("Cannot push streams from client peers.") 269 270 event = _PushedRequestSent() 271 return [event] 272 273 def recv_push_promise(self, previous_state): 274 """ 275 Fires on the already-existing stream when a PUSH_PROMISE frame is 276 received. We may only receive PUSH_PROMISE frames if we're a client. 277 278 Fires a PushedStreamReceived event. 279 """ 280 if not self.client: 281 if self.client is None: # pragma: no cover 282 msg = "Idle streams cannot receive pushes" 283 else: # pragma: no cover 284 msg = "Cannot receive pushed streams as a server" 285 raise ProtocolError(msg) 286 287 event = PushedStreamReceived() 288 event.parent_stream_id = self.stream_id 289 return [event] 290 291 def send_end_stream(self, previous_state): 292 """ 293 Called when an attempt is made to send END_STREAM in the 294 HALF_CLOSED_REMOTE state. 295 """ 296 self.stream_closed_by = StreamClosedBy.SEND_END_STREAM 297 298 def send_reset_stream(self, previous_state): 299 """ 300 Called when an attempt is made to send RST_STREAM in a non-closed 301 stream state. 302 """ 303 self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM 304 305 def reset_stream_on_error(self, previous_state): 306 """ 307 Called when we need to forcefully emit another RST_STREAM frame on 308 behalf of the state machine. 309 310 If this is the first time we've done this, we should also hang an event 311 off the StreamClosedError so that the user can be informed. We know 312 it's the first time we've done this if the stream is currently in a 313 state other than CLOSED. 314 """ 315 self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM 316 317 error = StreamClosedError(self.stream_id) 318 319 event = StreamReset() 320 event.stream_id = self.stream_id 321 event.error_code = ErrorCodes.STREAM_CLOSED 322 event.remote_reset = False 323 error._events = [event] 324 raise error 325 326 def recv_on_closed_stream(self, previous_state): 327 """ 328 Called when an unexpected frame is received on an already-closed 329 stream. 330 331 An endpoint that receives an unexpected frame should treat it as 332 a stream error or connection error with type STREAM_CLOSED, depending 333 on the specific frame. The error handling is done at a higher level: 334 this just raises the appropriate error. 335 """ 336 raise StreamClosedError(self.stream_id) 337 338 def send_on_closed_stream(self, previous_state): 339 """ 340 Called when an attempt is made to send data on an already-closed 341 stream. 342 343 This essentially overrides the standard logic by throwing a 344 more-specific error: StreamClosedError. This is a ProtocolError, so it 345 matches the standard API of the state machine, but provides more detail 346 to the user. 347 """ 348 raise StreamClosedError(self.stream_id) 349 350 def recv_push_on_closed_stream(self, previous_state): 351 """ 352 Called when a PUSH_PROMISE frame is received on a full stop 353 stream. 354 355 If the stream was closed by us sending a RST_STREAM frame, then we 356 presume that the PUSH_PROMISE was in flight when we reset the parent 357 stream. Rathen than accept the new stream, we just reset it. 358 Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a 359 naturally closed stream is a real problem because it creates a brand 360 new stream that the remote peer now believes exists. 361 """ 362 assert self.stream_closed_by is not None 363 364 if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM: 365 raise StreamClosedError(self.stream_id) 366 else: 367 raise ProtocolError("Attempted to push on closed stream.") 368 369 def send_push_on_closed_stream(self, previous_state): 370 """ 371 Called when an attempt is made to push on an already-closed stream. 372 373 This essentially overrides the standard logic by providing a more 374 useful error message. It's necessary because simply indicating that the 375 stream is closed is not enough: there is now a new stream that is not 376 allowed to be there. The only recourse is to tear the whole connection 377 down. 378 """ 379 raise ProtocolError("Attempted to push on closed stream.") 380 381 def window_on_closed_stream(self, previous_state): 382 """ 383 Called when a WINDOW_UPDATE frame is received on an already-closed 384 stream. 385 386 If we sent an END_STREAM frame, we just ignore the frame, as instructed 387 in RFC 7540 Section 5.1. Technically we should eventually consider 388 WINDOW_UPDATE in this state an error, but we don't have access to a 389 clock so we just always allow it. If we closed the stream for any other 390 reason, we behave as we do for receiving any other frame on a closed 391 stream. 392 """ 393 assert self.stream_closed_by is not None 394 395 if self.stream_closed_by == StreamClosedBy.SEND_END_STREAM: 396 return [] 397 return self.recv_on_closed_stream(previous_state) 398 399 def reset_on_closed_stream(self, previous_state): 400 """ 401 Called when a RST_STREAM frame is received on an already-closed stream. 402 403 If we sent an END_STREAM frame, we just ignore the frame, as instructed 404 in RFC 7540 Section 5.1. Technically we should eventually consider 405 RST_STREAM in this state an error, but we don't have access to a clock 406 so we just always allow it. If we closed the stream for any other 407 reason, we behave as we do for receiving any other frame on a closed 408 stream. 409 """ 410 assert self.stream_closed_by is not None 411 412 if self.stream_closed_by is StreamClosedBy.SEND_END_STREAM: 413 return [] 414 return self.recv_on_closed_stream(previous_state) 415 416 def send_informational_response(self, previous_state): 417 """ 418 Called when an informational header block is sent (that is, a block 419 where the :status header has a 1XX value). 420 421 Only enforces that these are sent *before* final headers are sent. 422 """ 423 if self.headers_sent: 424 raise ProtocolError("Information response after final response") 425 426 event = _ResponseSent() 427 return [event] 428 429 def recv_informational_response(self, previous_state): 430 """ 431 Called when an informational header block is received (that is, a block 432 where the :status header has a 1XX value). 433 """ 434 if self.headers_received: 435 raise ProtocolError("Informational response after final response") 436 437 event = InformationalResponseReceived() 438 event.stream_id = self.stream_id 439 return [event] 440 441 def recv_alt_svc(self, previous_state): 442 """ 443 Called when receiving an ALTSVC frame. 444 445 RFC 7838 allows us to receive ALTSVC frames at any stream state, which 446 is really absurdly overzealous. For that reason, we want to limit the 447 states in which we can actually receive it. It's really only sensible 448 to receive it after we've sent our own headers and before the server 449 has sent its header block: the server can't guarantee that we have any 450 state around after it completes its header block, and the server 451 doesn't know what origin we're talking about before we've sent ours. 452 453 For that reason, this function applies a few extra checks on both state 454 and some of the little state variables we keep around. If those suggest 455 an unreasonable situation for the ALTSVC frame to have been sent in, 456 we quietly ignore it (as RFC 7838 suggests). 457 458 This function is also *not* always called by the state machine. In some 459 states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it, 460 because we know the frame cannot be valid in that state (IDLE because 461 the server cannot know what origin the stream applies to, CLOSED 462 because the server cannot assume we still have state around, 463 RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL 464 state then *we* are the server). 465 """ 466 # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore 467 # them. 468 if self.client is False: 469 return [] 470 471 # If we've received the response headers from the server they can't 472 # guarantee we still have any state around. Other implementations 473 # (like nghttp2) ignore ALTSVC in this state, so we will too. 474 if self.headers_received: 475 return [] 476 477 # Otherwise, this is a sensible enough frame to have received. Return 478 # the event and let it get populated. 479 return [AlternativeServiceAvailable()] 480 481 def send_alt_svc(self, previous_state): 482 """ 483 Called when sending an ALTSVC frame on this stream. 484 485 For consistency with the restrictions we apply on receiving ALTSVC 486 frames in ``recv_alt_svc``, we want to restrict when users can send 487 ALTSVC frames to the situations when we ourselves would accept them. 488 489 That means: when we are a server, when we have received the request 490 headers, and when we have not yet sent our own response headers. 491 """ 492 # We should not send ALTSVC after we've sent response headers, as the 493 # client may have disposed of its state. 494 if self.headers_sent: 495 raise ProtocolError( 496 "Cannot send ALTSVC after sending response headers." 497 ) 498 499 return 500 501 502# STATE MACHINE 503# 504# The stream state machine is defined here to avoid the need to allocate it 505# repeatedly for each stream. It cannot be defined in the stream class because 506# it needs to be able to reference the callbacks defined on the class, but 507# because Python's scoping rules are weird the class object is not actually in 508# scope during the body of the class object. 509# 510# For the sake of clarity, we reproduce the RFC 7540 state machine here: 511# 512# +--------+ 513# send PP | | recv PP 514# ,--------| idle |--------. 515# / | | \ 516# v +--------+ v 517# +----------+ | +----------+ 518# | | | send H / | | 519# ,------| reserved | | recv H | reserved |------. 520# | | (local) | | | (remote) | | 521# | +----------+ v +----------+ | 522# | | +--------+ | | 523# | | recv ES | | send ES | | 524# | send H | ,-------| open |-------. | recv H | 525# | | / | | \ | | 526# | v v +--------+ v v | 527# | +----------+ | +----------+ | 528# | | half | | | half | | 529# | | closed | | send R / | closed | | 530# | | (remote) | | recv R | (local) | | 531# | +----------+ | +----------+ | 532# | | | | | 533# | | send ES / | recv ES / | | 534# | | send R / v send R / | | 535# | | recv R +--------+ recv R | | 536# | send R / `----------->| |<-----------' send R / | 537# | recv R | closed | recv R | 538# `----------------------->| |<----------------------' 539# +--------+ 540# 541# send: endpoint sends this frame 542# recv: endpoint receives this frame 543# 544# H: HEADERS frame (with implied CONTINUATIONs) 545# PP: PUSH_PROMISE frame (with implied CONTINUATIONs) 546# ES: END_STREAM flag 547# R: RST_STREAM frame 548# 549# For the purposes of this state machine we treat HEADERS and their 550# associated CONTINUATION frames as a single jumbo frame. The protocol 551# allows/requires this by preventing other frames from being interleved in 552# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is 553# received without a prior HEADERS frame, it *will* be passed to this state 554# machine. The state machine should always reject that frame, either as an 555# invalid transition or because the stream is closed. 556# 557# There is a confusing relationship around PUSH_PROMISE frames. The state 558# machine above considers them to be frames belonging to the new stream, 559# which is *somewhat* true. However, they are sent with the stream ID of 560# their related stream, and are only sendable in some cases. 561# For this reason, our state machine implementation below allows for 562# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also 563# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states. 564# Essentially, for hyper-h2, PUSH_PROMISE frames are effectively sent on 565# two streams. 566# 567# The _transitions dictionary contains a mapping of tuples of 568# (state, input) to tuples of (side_effect_function, end_state). This 569# map contains all allowed transitions: anything not in this map is 570# invalid and immediately causes a transition to ``closed``. 571_transitions = { 572 # State: idle 573 (StreamState.IDLE, StreamInputs.SEND_HEADERS): 574 (H2StreamStateMachine.request_sent, StreamState.OPEN), 575 (StreamState.IDLE, StreamInputs.RECV_HEADERS): 576 (H2StreamStateMachine.request_received, StreamState.OPEN), 577 (StreamState.IDLE, StreamInputs.RECV_DATA): 578 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 579 (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE): 580 (H2StreamStateMachine.send_new_pushed_stream, 581 StreamState.RESERVED_LOCAL), 582 (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE): 583 (H2StreamStateMachine.recv_new_pushed_stream, 584 StreamState.RESERVED_REMOTE), 585 (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE): 586 (None, StreamState.IDLE), 587 (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT): 588 (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL), 589 (StreamState.IDLE, StreamInputs.UPGRADE_SERVER): 590 (H2StreamStateMachine.request_received, 591 StreamState.HALF_CLOSED_REMOTE), 592 593 # State: reserved local 594 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS): 595 (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), 596 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA): 597 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 598 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): 599 (None, StreamState.RESERVED_LOCAL), 600 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): 601 (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL), 602 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM): 603 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 604 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM): 605 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 606 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): 607 (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL), 608 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): 609 (None, StreamState.RESERVED_LOCAL), 610 611 # State: reserved remote 612 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS): 613 (H2StreamStateMachine.response_received, 614 StreamState.HALF_CLOSED_LOCAL), 615 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA): 616 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 617 (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): 618 (None, StreamState.RESERVED_REMOTE), 619 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): 620 (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE), 621 (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM): 622 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 623 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM): 624 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 625 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): 626 (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE), 627 628 # State: open 629 (StreamState.OPEN, StreamInputs.SEND_HEADERS): 630 (H2StreamStateMachine.response_sent, StreamState.OPEN), 631 (StreamState.OPEN, StreamInputs.RECV_HEADERS): 632 (H2StreamStateMachine.response_received, StreamState.OPEN), 633 (StreamState.OPEN, StreamInputs.SEND_DATA): 634 (None, StreamState.OPEN), 635 (StreamState.OPEN, StreamInputs.RECV_DATA): 636 (H2StreamStateMachine.data_received, StreamState.OPEN), 637 (StreamState.OPEN, StreamInputs.SEND_END_STREAM): 638 (None, StreamState.HALF_CLOSED_LOCAL), 639 (StreamState.OPEN, StreamInputs.RECV_END_STREAM): 640 (H2StreamStateMachine.stream_half_closed, 641 StreamState.HALF_CLOSED_REMOTE), 642 (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE): 643 (None, StreamState.OPEN), 644 (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE): 645 (H2StreamStateMachine.window_updated, StreamState.OPEN), 646 (StreamState.OPEN, StreamInputs.SEND_RST_STREAM): 647 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 648 (StreamState.OPEN, StreamInputs.RECV_RST_STREAM): 649 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 650 (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE): 651 (H2StreamStateMachine.send_push_promise, StreamState.OPEN), 652 (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE): 653 (H2StreamStateMachine.recv_push_promise, StreamState.OPEN), 654 (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS): 655 (H2StreamStateMachine.send_informational_response, StreamState.OPEN), 656 (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS): 657 (H2StreamStateMachine.recv_informational_response, StreamState.OPEN), 658 (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE): 659 (H2StreamStateMachine.send_alt_svc, StreamState.OPEN), 660 (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE): 661 (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN), 662 663 # State: half-closed remote 664 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS): 665 (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), 666 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS): 667 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 668 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA): 669 (None, StreamState.HALF_CLOSED_REMOTE), 670 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA): 671 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 672 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM): 673 (H2StreamStateMachine.send_end_stream, StreamState.CLOSED), 674 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): 675 (None, StreamState.HALF_CLOSED_REMOTE), 676 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): 677 (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE), 678 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM): 679 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 680 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM): 681 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 682 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE): 683 (H2StreamStateMachine.send_push_promise, 684 StreamState.HALF_CLOSED_REMOTE), 685 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE): 686 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 687 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS): 688 (H2StreamStateMachine.send_informational_response, 689 StreamState.HALF_CLOSED_REMOTE), 690 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE): 691 (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE), 692 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): 693 (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE), 694 695 # State: half-closed local 696 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS): 697 (H2StreamStateMachine.response_received, 698 StreamState.HALF_CLOSED_LOCAL), 699 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA): 700 (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL), 701 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM): 702 (H2StreamStateMachine.stream_ended, StreamState.CLOSED), 703 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): 704 (None, StreamState.HALF_CLOSED_LOCAL), 705 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): 706 (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL), 707 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM): 708 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 709 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM): 710 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 711 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE): 712 (H2StreamStateMachine.recv_push_promise, 713 StreamState.HALF_CLOSED_LOCAL), 714 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS): 715 (H2StreamStateMachine.recv_informational_response, 716 StreamState.HALF_CLOSED_LOCAL), 717 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): 718 (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL), 719 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): 720 (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL), 721 722 # State: closed 723 (StreamState.CLOSED, StreamInputs.RECV_END_STREAM): 724 (None, StreamState.CLOSED), 725 (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE): 726 (None, StreamState.CLOSED), 727 728 # RFC 7540 Section 5.1 defines how the end point should react when 729 # receiving a frame on a closed stream with the following statements: 730 # 731 # > An endpoint that receives any frame other than PRIORITY after receiving 732 # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED. 733 # > An endpoint that receives any frames after receiving a frame with the 734 # > END_STREAM flag set MUST treat that as a connection error of type 735 # > STREAM_CLOSED. 736 (StreamState.CLOSED, StreamInputs.RECV_HEADERS): 737 (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), 738 (StreamState.CLOSED, StreamInputs.RECV_DATA): 739 (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), 740 741 # > WINDOW_UPDATE or RST_STREAM frames can be received in this state 742 # > for a short period after a DATA or HEADERS frame containing a 743 # > END_STREAM flag is sent. 744 (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE): 745 (H2StreamStateMachine.window_on_closed_stream, StreamState.CLOSED), 746 (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM): 747 (H2StreamStateMachine.reset_on_closed_stream, StreamState.CLOSED), 748 749 # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is 750 # > neither "open" nor "half-closed (local)" as a connection error of type 751 # > PROTOCOL_ERROR. 752 (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE): 753 (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED), 754 755 # Also, users should be forbidden from sending on closed streams. 756 (StreamState.CLOSED, StreamInputs.SEND_HEADERS): 757 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 758 (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE): 759 (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED), 760 (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM): 761 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 762 (StreamState.CLOSED, StreamInputs.SEND_DATA): 763 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 764 (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE): 765 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 766 (StreamState.CLOSED, StreamInputs.SEND_END_STREAM): 767 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 768} 769 770 771class H2Stream(object): 772 """ 773 A low-level HTTP/2 stream object. This handles building and receiving 774 frames and maintains per-stream state. 775 776 This wraps a HTTP/2 Stream state machine implementation, ensuring that 777 frames can only be sent/received when the stream is in a valid state. 778 Attempts to create frames that cannot be sent will raise a 779 ``ProtocolError``. 780 """ 781 def __init__(self, 782 stream_id, 783 config, 784 inbound_window_size, 785 outbound_window_size): 786 self.state_machine = H2StreamStateMachine(stream_id) 787 self.stream_id = stream_id 788 self.max_outbound_frame_size = None 789 self.request_method = None 790 791 # The curent value of the outbound stream flow control window 792 self.outbound_flow_control_window = outbound_window_size 793 794 # The flow control manager. 795 self._inbound_window_manager = WindowManager(inbound_window_size) 796 797 # The expected content length, if any. 798 self._expected_content_length = None 799 800 # The actual received content length. Always tracked. 801 self._actual_content_length = 0 802 803 # The authority we believe this stream belongs to. 804 self._authority = None 805 806 # The configuration for this stream. 807 self.config = config 808 809 def __repr__(self): 810 return "<%s id:%d state:%r>" % ( 811 type(self).__name__, 812 self.stream_id, 813 self.state_machine.state 814 ) 815 816 @property 817 def inbound_flow_control_window(self): 818 """ 819 The size of the inbound flow control window for the stream. This is 820 rarely publicly useful: instead, use :meth:`remote_flow_control_window 821 <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is 822 largely present to provide a shortcut to this data. 823 """ 824 return self._inbound_window_manager.current_window_size 825 826 @property 827 def open(self): 828 """ 829 Whether the stream is 'open' in any sense: that is, whether it counts 830 against the number of concurrent streams. 831 """ 832 # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either 833 # the OPEN state or either of the HALF_CLOSED states. Perplexingly, 834 # this excludes the reserved states. 835 # For more detail on why we're doing this in this slightly weird way, 836 # see the comment on ``STREAM_OPEN`` at the top of the file. 837 return STREAM_OPEN[self.state_machine.state] 838 839 @property 840 def closed(self): 841 """ 842 Whether the stream is closed. 843 """ 844 return self.state_machine.state == StreamState.CLOSED 845 846 @property 847 def closed_by(self): 848 """ 849 Returns how the stream was closed, as one of StreamClosedBy. 850 """ 851 return self.state_machine.stream_closed_by 852 853 def upgrade(self, client_side): 854 """ 855 Called by the connection to indicate that this stream is the initial 856 request/response of an upgraded connection. Places the stream into an 857 appropriate state. 858 """ 859 self.config.logger.debug("Upgrading %r", self) 860 861 assert self.stream_id == 1 862 input_ = ( 863 StreamInputs.UPGRADE_CLIENT if client_side 864 else StreamInputs.UPGRADE_SERVER 865 ) 866 867 # This may return events, we deliberately don't want them. 868 self.state_machine.process_input(input_) 869 return 870 871 def send_headers(self, headers, encoder, end_stream=False): 872 """ 873 Returns a list of HEADERS/CONTINUATION frames to emit as either headers 874 or trailers. 875 """ 876 self.config.logger.debug("Send headers %s on %r", headers, self) 877 878 # Because encoding headers makes an irreversible change to the header 879 # compression context, we make the state transition before we encode 880 # them. 881 882 # First, check if we're a client. If we are, no problem: if we aren't, 883 # we need to scan the header block to see if this is an informational 884 # response. 885 input_ = StreamInputs.SEND_HEADERS 886 if ((not self.state_machine.client) and 887 is_informational_response(headers)): 888 if end_stream: 889 raise ProtocolError( 890 "Cannot set END_STREAM on informational responses." 891 ) 892 893 input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS 894 895 events = self.state_machine.process_input(input_) 896 897 hf = HeadersFrame(self.stream_id) 898 hdr_validation_flags = self._build_hdr_validation_flags(events) 899 frames = self._build_headers_frames( 900 headers, encoder, hf, hdr_validation_flags 901 ) 902 903 if end_stream: 904 # Not a bug: the END_STREAM flag is valid on the initial HEADERS 905 # frame, not the CONTINUATION frames that follow. 906 self.state_machine.process_input(StreamInputs.SEND_END_STREAM) 907 frames[0].flags.add('END_STREAM') 908 909 if self.state_machine.trailers_sent and not end_stream: 910 raise ProtocolError("Trailers must have END_STREAM set.") 911 912 if self.state_machine.client and self._authority is None: 913 self._authority = authority_from_headers(headers) 914 915 # store request method for _initialize_content_length 916 self.request_method = extract_method_header(headers) 917 918 return frames 919 920 def push_stream_in_band(self, related_stream_id, headers, encoder): 921 """ 922 Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed 923 stream header. Called on the stream that has the PUSH_PROMISE frame 924 sent on it. 925 """ 926 self.config.logger.debug("Push stream %r", self) 927 928 # Because encoding headers makes an irreversible change to the header 929 # compression context, we make the state transition *first*. 930 931 events = self.state_machine.process_input( 932 StreamInputs.SEND_PUSH_PROMISE 933 ) 934 935 ppf = PushPromiseFrame(self.stream_id) 936 ppf.promised_stream_id = related_stream_id 937 hdr_validation_flags = self._build_hdr_validation_flags(events) 938 frames = self._build_headers_frames( 939 headers, encoder, ppf, hdr_validation_flags 940 ) 941 942 return frames 943 944 def locally_pushed(self): 945 """ 946 Mark this stream as one that was pushed by this peer. Must be called 947 immediately after initialization. Sends no frames, simply updates the 948 state machine. 949 """ 950 # This does not trigger any events. 951 events = self.state_machine.process_input( 952 StreamInputs.SEND_PUSH_PROMISE 953 ) 954 assert not events 955 return [] 956 957 def send_data(self, data, end_stream=False, pad_length=None): 958 """ 959 Prepare some data frames. Optionally end the stream. 960 961 .. warning:: Does not perform flow control checks. 962 """ 963 self.config.logger.debug( 964 "Send data on %r with end stream set to %s", self, end_stream 965 ) 966 967 self.state_machine.process_input(StreamInputs.SEND_DATA) 968 969 df = DataFrame(self.stream_id) 970 df.data = data 971 if end_stream: 972 self.state_machine.process_input(StreamInputs.SEND_END_STREAM) 973 df.flags.add('END_STREAM') 974 if pad_length is not None: 975 df.flags.add('PADDED') 976 df.pad_length = pad_length 977 978 # Subtract flow_controlled_length to account for possible padding 979 self.outbound_flow_control_window -= df.flow_controlled_length 980 assert self.outbound_flow_control_window >= 0 981 982 return [df] 983 984 def end_stream(self): 985 """ 986 End a stream without sending data. 987 """ 988 self.config.logger.debug("End stream %r", self) 989 990 self.state_machine.process_input(StreamInputs.SEND_END_STREAM) 991 df = DataFrame(self.stream_id) 992 df.flags.add('END_STREAM') 993 return [df] 994 995 def advertise_alternative_service(self, field_value): 996 """ 997 Advertise an RFC 7838 alternative service. The semantics of this are 998 better documented in the ``H2Connection`` class. 999 """ 1000 self.config.logger.debug( 1001 "Advertise alternative service of %r for %r", field_value, self 1002 ) 1003 self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE) 1004 asf = AltSvcFrame(self.stream_id) 1005 asf.field = field_value 1006 return [asf] 1007 1008 def increase_flow_control_window(self, increment): 1009 """ 1010 Increase the size of the flow control window for the remote side. 1011 """ 1012 self.config.logger.debug( 1013 "Increase flow control window for %r by %d", 1014 self, increment 1015 ) 1016 self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE) 1017 self._inbound_window_manager.window_opened(increment) 1018 1019 wuf = WindowUpdateFrame(self.stream_id) 1020 wuf.window_increment = increment 1021 return [wuf] 1022 1023 def receive_push_promise_in_band(self, 1024 promised_stream_id, 1025 headers, 1026 header_encoding): 1027 """ 1028 Receives a push promise frame sent on this stream, pushing a remote 1029 stream. This is called on the stream that has the PUSH_PROMISE sent 1030 on it. 1031 """ 1032 self.config.logger.debug( 1033 "Receive Push Promise on %r for remote stream %d", 1034 self, promised_stream_id 1035 ) 1036 events = self.state_machine.process_input( 1037 StreamInputs.RECV_PUSH_PROMISE 1038 ) 1039 events[0].pushed_stream_id = promised_stream_id 1040 1041 hdr_validation_flags = self._build_hdr_validation_flags(events) 1042 events[0].headers = self._process_received_headers( 1043 headers, hdr_validation_flags, header_encoding 1044 ) 1045 return [], events 1046 1047 def remotely_pushed(self, pushed_headers): 1048 """ 1049 Mark this stream as one that was pushed by the remote peer. Must be 1050 called immediately after initialization. Sends no frames, simply 1051 updates the state machine. 1052 """ 1053 self.config.logger.debug("%r pushed by remote peer", self) 1054 events = self.state_machine.process_input( 1055 StreamInputs.RECV_PUSH_PROMISE 1056 ) 1057 self._authority = authority_from_headers(pushed_headers) 1058 return [], events 1059 1060 def receive_headers(self, headers, end_stream, header_encoding): 1061 """ 1062 Receive a set of headers (or trailers). 1063 """ 1064 if is_informational_response(headers): 1065 if end_stream: 1066 raise ProtocolError( 1067 "Cannot set END_STREAM on informational responses" 1068 ) 1069 input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS 1070 else: 1071 input_ = StreamInputs.RECV_HEADERS 1072 1073 events = self.state_machine.process_input(input_) 1074 1075 if end_stream: 1076 es_events = self.state_machine.process_input( 1077 StreamInputs.RECV_END_STREAM 1078 ) 1079 events[0].stream_ended = es_events[0] 1080 events += es_events 1081 1082 self._initialize_content_length(headers) 1083 1084 if isinstance(events[0], TrailersReceived): 1085 if not end_stream: 1086 raise ProtocolError("Trailers must have END_STREAM set") 1087 1088 hdr_validation_flags = self._build_hdr_validation_flags(events) 1089 events[0].headers = self._process_received_headers( 1090 headers, hdr_validation_flags, header_encoding 1091 ) 1092 return [], events 1093 1094 def receive_data(self, data, end_stream, flow_control_len): 1095 """ 1096 Receive some data. 1097 """ 1098 self.config.logger.debug( 1099 "Receive data on %r with end stream %s and flow control length " 1100 "set to %d", self, end_stream, flow_control_len 1101 ) 1102 events = self.state_machine.process_input(StreamInputs.RECV_DATA) 1103 self._inbound_window_manager.window_consumed(flow_control_len) 1104 self._track_content_length(len(data), end_stream) 1105 1106 if end_stream: 1107 es_events = self.state_machine.process_input( 1108 StreamInputs.RECV_END_STREAM 1109 ) 1110 events[0].stream_ended = es_events[0] 1111 events.extend(es_events) 1112 1113 events[0].data = data 1114 events[0].flow_controlled_length = flow_control_len 1115 return [], events 1116 1117 def receive_window_update(self, increment): 1118 """ 1119 Handle a WINDOW_UPDATE increment. 1120 """ 1121 self.config.logger.debug( 1122 "Receive Window Update on %r for increment of %d", 1123 self, increment 1124 ) 1125 events = self.state_machine.process_input( 1126 StreamInputs.RECV_WINDOW_UPDATE 1127 ) 1128 frames = [] 1129 1130 # If we encounter a problem with incrementing the flow control window, 1131 # this should be treated as a *stream* error, not a *connection* error. 1132 # That means we need to catch the error and forcibly close the stream. 1133 if events: 1134 events[0].delta = increment 1135 try: 1136 self.outbound_flow_control_window = guard_increment_window( 1137 self.outbound_flow_control_window, 1138 increment 1139 ) 1140 except FlowControlError: 1141 # Ok, this is bad. We're going to need to perform a local 1142 # reset. 1143 event = StreamReset() 1144 event.stream_id = self.stream_id 1145 event.error_code = ErrorCodes.FLOW_CONTROL_ERROR 1146 event.remote_reset = False 1147 1148 events = [event] 1149 frames = self.reset_stream(event.error_code) 1150 1151 return frames, events 1152 1153 def receive_continuation(self): 1154 """ 1155 A naked CONTINUATION frame has been received. This is always an error, 1156 but the type of error it is depends on the state of the stream and must 1157 transition the state of the stream, so we need to handle it. 1158 """ 1159 self.config.logger.debug("Receive Continuation frame on %r", self) 1160 self.state_machine.process_input( 1161 StreamInputs.RECV_CONTINUATION 1162 ) 1163 assert False, "Should not be reachable" 1164 1165 def receive_alt_svc(self, frame): 1166 """ 1167 An Alternative Service frame was received on the stream. This frame 1168 inherits the origin associated with this stream. 1169 """ 1170 self.config.logger.debug( 1171 "Receive Alternative Service frame on stream %r", self 1172 ) 1173 1174 # If the origin is present, RFC 7838 says we have to ignore it. 1175 if frame.origin: 1176 return [], [] 1177 1178 events = self.state_machine.process_input( 1179 StreamInputs.RECV_ALTERNATIVE_SERVICE 1180 ) 1181 1182 # There are lots of situations where we want to ignore the ALTSVC 1183 # frame. If we need to pay attention, we'll have an event and should 1184 # fill it out. 1185 if events: 1186 assert isinstance(events[0], AlternativeServiceAvailable) 1187 events[0].origin = self._authority 1188 events[0].field_value = frame.field 1189 1190 return [], events 1191 1192 def reset_stream(self, error_code=0): 1193 """ 1194 Close the stream locally. Reset the stream with an error code. 1195 """ 1196 self.config.logger.debug( 1197 "Local reset %r with error code: %d", self, error_code 1198 ) 1199 self.state_machine.process_input(StreamInputs.SEND_RST_STREAM) 1200 1201 rsf = RstStreamFrame(self.stream_id) 1202 rsf.error_code = error_code 1203 return [rsf] 1204 1205 def stream_reset(self, frame): 1206 """ 1207 Handle a stream being reset remotely. 1208 """ 1209 self.config.logger.debug( 1210 "Remote reset %r with error code: %d", self, frame.error_code 1211 ) 1212 events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM) 1213 1214 if events: 1215 # We don't fire an event if this stream is already closed. 1216 events[0].error_code = _error_code_from_int(frame.error_code) 1217 1218 return [], events 1219 1220 def acknowledge_received_data(self, acknowledged_size): 1221 """ 1222 The user has informed us that they've processed some amount of data 1223 that was received on this stream. Pass that to the window manager and 1224 potentially return some WindowUpdate frames. 1225 """ 1226 self.config.logger.debug( 1227 "Acknowledge received data with size %d on %r", 1228 acknowledged_size, self 1229 ) 1230 increment = self._inbound_window_manager.process_bytes( 1231 acknowledged_size 1232 ) 1233 if increment: 1234 f = WindowUpdateFrame(self.stream_id) 1235 f.window_increment = increment 1236 return [f] 1237 1238 return [] 1239 1240 def _build_hdr_validation_flags(self, events): 1241 """ 1242 Constructs a set of header validation flags for use when normalizing 1243 and validating header blocks. 1244 """ 1245 is_trailer = isinstance( 1246 events[0], (_TrailersSent, TrailersReceived) 1247 ) 1248 is_response_header = isinstance( 1249 events[0], 1250 ( 1251 _ResponseSent, 1252 ResponseReceived, 1253 InformationalResponseReceived 1254 ) 1255 ) 1256 is_push_promise = isinstance( 1257 events[0], (PushedStreamReceived, _PushedRequestSent) 1258 ) 1259 1260 return HeaderValidationFlags( 1261 is_client=self.state_machine.client, 1262 is_trailer=is_trailer, 1263 is_response_header=is_response_header, 1264 is_push_promise=is_push_promise, 1265 ) 1266 1267 def _build_headers_frames(self, 1268 headers, 1269 encoder, 1270 first_frame, 1271 hdr_validation_flags): 1272 """ 1273 Helper method to build headers or push promise frames. 1274 """ 1275 # We need to lowercase the header names, and to ensure that secure 1276 # header fields are kept out of compression contexts. 1277 if self.config.normalize_outbound_headers: 1278 headers = normalize_outbound_headers( 1279 headers, hdr_validation_flags 1280 ) 1281 if self.config.validate_outbound_headers: 1282 headers = validate_outbound_headers( 1283 headers, hdr_validation_flags 1284 ) 1285 1286 encoded_headers = encoder.encode(headers) 1287 1288 # Slice into blocks of max_outbound_frame_size. Be careful with this: 1289 # it only works right because we never send padded frames or priority 1290 # information on the frames. Revisit this if we do. 1291 header_blocks = [ 1292 encoded_headers[i:i+self.max_outbound_frame_size] 1293 for i in range( 1294 0, len(encoded_headers), self.max_outbound_frame_size 1295 ) 1296 ] 1297 1298 frames = [] 1299 first_frame.data = header_blocks[0] 1300 frames.append(first_frame) 1301 1302 for block in header_blocks[1:]: 1303 cf = ContinuationFrame(self.stream_id) 1304 cf.data = block 1305 frames.append(cf) 1306 1307 frames[-1].flags.add('END_HEADERS') 1308 return frames 1309 1310 def _process_received_headers(self, 1311 headers, 1312 header_validation_flags, 1313 header_encoding): 1314 """ 1315 When headers have been received from the remote peer, run a processing 1316 pipeline on them to transform them into the appropriate form for 1317 attaching to an event. 1318 """ 1319 if self.config.normalize_inbound_headers: 1320 headers = normalize_inbound_headers( 1321 headers, header_validation_flags 1322 ) 1323 1324 if self.config.validate_inbound_headers: 1325 headers = validate_headers(headers, header_validation_flags) 1326 1327 if header_encoding: 1328 headers = _decode_headers(headers, header_encoding) 1329 1330 # The above steps are all generators, so we need to concretize the 1331 # headers now. 1332 return list(headers) 1333 1334 def _initialize_content_length(self, headers): 1335 """ 1336 Checks the headers for a content-length header and initializes the 1337 _expected_content_length field from it. It's not an error for no 1338 Content-Length header to be present. 1339 """ 1340 if self.request_method == b'HEAD': 1341 self._expected_content_length = 0 1342 return 1343 1344 for n, v in headers: 1345 if n == b'content-length': 1346 try: 1347 self._expected_content_length = int(v, 10) 1348 except ValueError: 1349 raise ProtocolError( 1350 "Invalid content-length header: %s" % v 1351 ) 1352 1353 return 1354 1355 def _track_content_length(self, length, end_stream): 1356 """ 1357 Update the expected content length in response to data being received. 1358 Validates that the appropriate amount of data is sent. Always updates 1359 the received data, but only validates the length against the 1360 content-length header if one was sent. 1361 1362 :param length: The length of the body chunk received. 1363 :param end_stream: If this is the last body chunk received. 1364 """ 1365 self._actual_content_length += length 1366 actual = self._actual_content_length 1367 expected = self._expected_content_length 1368 1369 if expected is not None: 1370 if expected < actual: 1371 raise InvalidBodyLengthError(expected, actual) 1372 1373 if end_stream and expected != actual: 1374 raise InvalidBodyLengthError(expected, actual) 1375 1376 def _inbound_flow_control_change_from_settings(self, delta): 1377 """ 1378 We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to 1379 update the target window size for flow control. For our flow control 1380 strategy, this means we need to do two things: we need to adjust the 1381 current window size, but we also need to set the target maximum window 1382 size to the new value. 1383 """ 1384 new_max_size = self._inbound_window_manager.max_window_size + delta 1385 self._inbound_window_manager.window_opened(delta) 1386 self._inbound_window_manager.max_window_size = new_max_size 1387 1388 1389def _decode_headers(headers, encoding): 1390 """ 1391 Given an iterable of header two-tuples and an encoding, decodes those 1392 headers using that encoding while preserving the type of the header tuple. 1393 This ensures that the use of ``HeaderTuple`` is preserved. 1394 """ 1395 for header in headers: 1396 # This function expects to work on decoded headers, which are always 1397 # HeaderTuple objects. 1398 assert isinstance(header, HeaderTuple) 1399 1400 name, value = header 1401 name = name.decode(encoding) 1402 value = value.decode(encoding) 1403 yield header.__class__(name, value) 1404