1# -*- coding: utf-8 -*- 2""" 3h2/stream 4~~~~~~~~~ 5 6An implementation of a HTTP/2 stream. 7""" 8import warnings 9 10from enum import Enum, IntEnum 11from hpack import HeaderTuple 12from hyperframe.frame import ( 13 HeadersFrame, ContinuationFrame, DataFrame, WindowUpdateFrame, 14 RstStreamFrame, PushPromiseFrame, AltSvcFrame 15) 16 17from .errors import ErrorCodes, _error_code_from_int 18from .events import ( 19 RequestReceived, ResponseReceived, DataReceived, WindowUpdated, 20 StreamEnded, PushedStreamReceived, StreamReset, TrailersReceived, 21 InformationalResponseReceived, AlternativeServiceAvailable, 22 _ResponseSent, _RequestSent, _TrailersSent, _PushedRequestSent 23) 24from .exceptions import ( 25 ProtocolError, StreamClosedError, InvalidBodyLengthError, FlowControlError 26) 27from .utilities import ( 28 guard_increment_window, is_informational_response, authority_from_headers, 29 validate_headers, validate_outbound_headers, normalize_outbound_headers, 30 HeaderValidationFlags, extract_method_header 31) 32from .windows import WindowManager 33 34 35class StreamState(IntEnum): 36 IDLE = 0 37 RESERVED_REMOTE = 1 38 RESERVED_LOCAL = 2 39 OPEN = 3 40 HALF_CLOSED_REMOTE = 4 41 HALF_CLOSED_LOCAL = 5 42 CLOSED = 6 43 44 45class StreamInputs(Enum): 46 SEND_HEADERS = 0 47 SEND_PUSH_PROMISE = 1 48 SEND_RST_STREAM = 2 49 SEND_DATA = 3 50 SEND_WINDOW_UPDATE = 4 51 SEND_END_STREAM = 5 52 RECV_HEADERS = 6 53 RECV_PUSH_PROMISE = 7 54 RECV_RST_STREAM = 8 55 RECV_DATA = 9 56 RECV_WINDOW_UPDATE = 10 57 RECV_END_STREAM = 11 58 RECV_CONTINUATION = 12 # Added in 2.0.0 59 SEND_INFORMATIONAL_HEADERS = 13 # Added in 2.2.0 60 RECV_INFORMATIONAL_HEADERS = 14 # Added in 2.2.0 61 SEND_ALTERNATIVE_SERVICE = 15 # Added in 2.3.0 62 RECV_ALTERNATIVE_SERVICE = 16 # Added in 2.3.0 63 UPGRADE_CLIENT = 17 # Added 2.3.0 64 UPGRADE_SERVER = 18 # Added 2.3.0 65 66 67class StreamClosedBy(Enum): 68 SEND_END_STREAM = 0 69 RECV_END_STREAM = 1 70 SEND_RST_STREAM = 2 71 RECV_RST_STREAM = 3 72 73 74# This array is initialized once, and is indexed by the stream states above. 75# It indicates whether a stream in the given state is open. The reason we do 76# this is that we potentially check whether a stream in a given state is open 77# quite frequently: given that we check so often, we should do so in the 78# fastest and most performant way possible. 79STREAM_OPEN = [False for _ in range(0, len(StreamState))] 80STREAM_OPEN[StreamState.OPEN] = True 81STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True 82STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True 83 84 85class H2StreamStateMachine(object): 86 """ 87 A single HTTP/2 stream state machine. 88 89 This stream object implements basically the state machine described in 90 RFC 7540 section 5.1. 91 92 :param stream_id: The stream ID of this stream. This is stored primarily 93 for logging purposes. 94 """ 95 def __init__(self, stream_id): 96 self.state = StreamState.IDLE 97 self.stream_id = stream_id 98 99 #: Whether this peer is the client side of this stream. 100 self.client = None 101 102 # Whether trailers have been sent/received on this stream or not. 103 self.headers_sent = None 104 self.trailers_sent = None 105 self.headers_received = None 106 self.trailers_received = None 107 108 # How the stream was closed. One of StreamClosedBy. 109 self.stream_closed_by = None 110 111 def process_input(self, input_): 112 """ 113 Process a specific input in the state machine. 114 """ 115 if not isinstance(input_, StreamInputs): 116 raise ValueError("Input must be an instance of StreamInputs") 117 118 try: 119 func, target_state = _transitions[(self.state, input_)] 120 except KeyError: 121 old_state = self.state 122 self.state = StreamState.CLOSED 123 raise ProtocolError( 124 "Invalid input %s in state %s" % (input_, old_state) 125 ) 126 else: 127 previous_state = self.state 128 self.state = target_state 129 if func is not None: 130 try: 131 return func(self, previous_state) 132 except ProtocolError: 133 self.state = StreamState.CLOSED 134 raise 135 except AssertionError as e: # pragma: no cover 136 self.state = StreamState.CLOSED 137 raise ProtocolError(e) 138 139 return [] 140 141 def request_sent(self, previous_state): 142 """ 143 Fires when a request is sent. 144 """ 145 self.client = True 146 self.headers_sent = True 147 event = _RequestSent() 148 149 return [event] 150 151 def response_sent(self, previous_state): 152 """ 153 Fires when something that should be a response is sent. This 'response' 154 may actually be trailers. 155 """ 156 if not self.headers_sent: 157 if self.client is True or self.client is None: 158 raise ProtocolError("Client cannot send responses.") 159 self.headers_sent = True 160 event = _ResponseSent() 161 else: 162 assert not self.trailers_sent 163 self.trailers_sent = True 164 event = _TrailersSent() 165 166 return [event] 167 168 def request_received(self, previous_state): 169 """ 170 Fires when a request is received. 171 """ 172 assert not self.headers_received 173 assert not self.trailers_received 174 175 self.client = False 176 self.headers_received = True 177 event = RequestReceived() 178 179 event.stream_id = self.stream_id 180 return [event] 181 182 def response_received(self, previous_state): 183 """ 184 Fires when a response is received. Also disambiguates between responses 185 and trailers. 186 """ 187 if not self.headers_received: 188 assert self.client is True 189 self.headers_received = True 190 event = ResponseReceived() 191 else: 192 assert not self.trailers_received 193 self.trailers_received = True 194 event = TrailersReceived() 195 196 event.stream_id = self.stream_id 197 return [event] 198 199 def data_received(self, previous_state): 200 """ 201 Fires when data is received. 202 """ 203 event = DataReceived() 204 event.stream_id = self.stream_id 205 return [event] 206 207 def window_updated(self, previous_state): 208 """ 209 Fires when a window update frame is received. 210 """ 211 event = WindowUpdated() 212 event.stream_id = self.stream_id 213 return [event] 214 215 def stream_half_closed(self, previous_state): 216 """ 217 Fires when an END_STREAM flag is received in the OPEN state, 218 transitioning this stream to a HALF_CLOSED_REMOTE state. 219 """ 220 event = StreamEnded() 221 event.stream_id = self.stream_id 222 return [event] 223 224 def stream_ended(self, previous_state): 225 """ 226 Fires when a stream is cleanly ended. 227 """ 228 self.stream_closed_by = StreamClosedBy.RECV_END_STREAM 229 event = StreamEnded() 230 event.stream_id = self.stream_id 231 return [event] 232 233 def stream_reset(self, previous_state): 234 """ 235 Fired when a stream is forcefully reset. 236 """ 237 self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM 238 event = StreamReset() 239 event.stream_id = self.stream_id 240 return [event] 241 242 def send_new_pushed_stream(self, previous_state): 243 """ 244 Fires on the newly pushed stream, when pushed by the local peer. 245 246 No event here, but definitionally this peer must be a server. 247 """ 248 assert self.client is None 249 self.client = False 250 self.headers_received = True 251 return [] 252 253 def recv_new_pushed_stream(self, previous_state): 254 """ 255 Fires on the newly pushed stream, when pushed by the remote peer. 256 257 No event here, but definitionally this peer must be a client. 258 """ 259 assert self.client is None 260 self.client = True 261 self.headers_sent = True 262 return [] 263 264 def send_push_promise(self, previous_state): 265 """ 266 Fires on the already-existing stream when a PUSH_PROMISE frame is sent. 267 We may only send PUSH_PROMISE frames if we're a server. 268 """ 269 if self.client is True: 270 raise ProtocolError("Cannot push streams from client peers.") 271 272 event = _PushedRequestSent() 273 return [event] 274 275 def recv_push_promise(self, previous_state): 276 """ 277 Fires on the already-existing stream when a PUSH_PROMISE frame is 278 received. We may only receive PUSH_PROMISE frames if we're a client. 279 280 Fires a PushedStreamReceived event. 281 """ 282 if not self.client: 283 if self.client is None: # pragma: no cover 284 msg = "Idle streams cannot receive pushes" 285 else: # pragma: no cover 286 msg = "Cannot receive pushed streams as a server" 287 raise ProtocolError(msg) 288 289 event = PushedStreamReceived() 290 event.parent_stream_id = self.stream_id 291 return [event] 292 293 def send_end_stream(self, previous_state): 294 """ 295 Called when an attempt is made to send END_STREAM in the 296 HALF_CLOSED_REMOTE state. 297 """ 298 self.stream_closed_by = StreamClosedBy.SEND_END_STREAM 299 300 def send_reset_stream(self, previous_state): 301 """ 302 Called when an attempt is made to send RST_STREAM in a non-closed 303 stream state. 304 """ 305 self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM 306 307 def reset_stream_on_error(self, previous_state): 308 """ 309 Called when we need to forcefully emit another RST_STREAM frame on 310 behalf of the state machine. 311 312 If this is the first time we've done this, we should also hang an event 313 off the StreamClosedError so that the user can be informed. We know 314 it's the first time we've done this if the stream is currently in a 315 state other than CLOSED. 316 """ 317 self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM 318 319 error = StreamClosedError(self.stream_id) 320 321 event = StreamReset() 322 event.stream_id = self.stream_id 323 event.error_code = ErrorCodes.STREAM_CLOSED 324 event.remote_reset = False 325 error._events = [event] 326 raise error 327 328 def recv_on_closed_stream(self, previous_state): 329 """ 330 Called when an unexpected frame is received on an already-closed 331 stream. 332 333 An endpoint that receives an unexpected frame should treat it as 334 a stream error or connection error with type STREAM_CLOSED, depending 335 on the specific frame. The error handling is done at a higher level: 336 this just raises the appropriate error. 337 """ 338 raise StreamClosedError(self.stream_id) 339 340 def send_on_closed_stream(self, previous_state): 341 """ 342 Called when an attempt is made to send data on an already-closed 343 stream. 344 345 This essentially overrides the standard logic by throwing a 346 more-specific error: StreamClosedError. This is a ProtocolError, so it 347 matches the standard API of the state machine, but provides more detail 348 to the user. 349 """ 350 raise StreamClosedError(self.stream_id) 351 352 def recv_push_on_closed_stream(self, previous_state): 353 """ 354 Called when a PUSH_PROMISE frame is received on a full stop 355 stream. 356 357 If the stream was closed by us sending a RST_STREAM frame, then we 358 presume that the PUSH_PROMISE was in flight when we reset the parent 359 stream. Rathen than accept the new stream, we just reset it. 360 Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a 361 naturally closed stream is a real problem because it creates a brand 362 new stream that the remote peer now believes exists. 363 """ 364 assert self.stream_closed_by is not None 365 366 if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM: 367 raise StreamClosedError(self.stream_id) 368 else: 369 raise ProtocolError("Attempted to push on closed stream.") 370 371 def send_push_on_closed_stream(self, previous_state): 372 """ 373 Called when an attempt is made to push on an already-closed stream. 374 375 This essentially overrides the standard logic by providing a more 376 useful error message. It's necessary because simply indicating that the 377 stream is closed is not enough: there is now a new stream that is not 378 allowed to be there. The only recourse is to tear the whole connection 379 down. 380 """ 381 raise ProtocolError("Attempted to push on closed stream.") 382 383 def window_on_closed_stream(self, previous_state): 384 """ 385 Called when a WINDOW_UPDATE frame is received on an already-closed 386 stream. 387 388 If we sent an END_STREAM frame, we just ignore the frame, as instructed 389 in RFC 7540 Section 5.1. Technically we should eventually consider 390 WINDOW_UPDATE in this state an error, but we don't have access to a 391 clock so we just always allow it. If we closed the stream for any other 392 reason, we behave as we do for receiving any other frame on a closed 393 stream. 394 """ 395 assert self.stream_closed_by is not None 396 397 if self.stream_closed_by == StreamClosedBy.SEND_END_STREAM: 398 return [] 399 return self.recv_on_closed_stream(previous_state) 400 401 def reset_on_closed_stream(self, previous_state): 402 """ 403 Called when a RST_STREAM frame is received on an already-closed stream. 404 405 If we sent an END_STREAM frame, we just ignore the frame, as instructed 406 in RFC 7540 Section 5.1. Technically we should eventually consider 407 RST_STREAM in this state an error, but we don't have access to a clock 408 so we just always allow it. If we closed the stream for any other 409 reason, we behave as we do for receiving any other frame on a closed 410 stream. 411 """ 412 assert self.stream_closed_by is not None 413 414 if self.stream_closed_by is StreamClosedBy.SEND_END_STREAM: 415 return [] 416 return self.recv_on_closed_stream(previous_state) 417 418 def send_informational_response(self, previous_state): 419 """ 420 Called when an informational header block is sent (that is, a block 421 where the :status header has a 1XX value). 422 423 Only enforces that these are sent *before* final headers are sent. 424 """ 425 if self.headers_sent: 426 raise ProtocolError("Information response after final response") 427 428 event = _ResponseSent() 429 return [event] 430 431 def recv_informational_response(self, previous_state): 432 """ 433 Called when an informational header block is received (that is, a block 434 where the :status header has a 1XX value). 435 """ 436 if self.headers_received: 437 raise ProtocolError("Informational response after final response") 438 439 event = InformationalResponseReceived() 440 event.stream_id = self.stream_id 441 return [event] 442 443 def recv_alt_svc(self, previous_state): 444 """ 445 Called when receiving an ALTSVC frame. 446 447 RFC 7838 allows us to receive ALTSVC frames at any stream state, which 448 is really absurdly overzealous. For that reason, we want to limit the 449 states in which we can actually receive it. It's really only sensible 450 to receive it after we've sent our own headers and before the server 451 has sent its header block: the server can't guarantee that we have any 452 state around after it completes its header block, and the server 453 doesn't know what origin we're talking about before we've sent ours. 454 455 For that reason, this function applies a few extra checks on both state 456 and some of the little state variables we keep around. If those suggest 457 an unreasonable situation for the ALTSVC frame to have been sent in, 458 we quietly ignore it (as RFC 7838 suggests). 459 460 This function is also *not* always called by the state machine. In some 461 states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it, 462 because we know the frame cannot be valid in that state (IDLE because 463 the server cannot know what origin the stream applies to, CLOSED 464 because the server cannot assume we still have state around, 465 RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL 466 state then *we* are the server). 467 """ 468 # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore 469 # them. 470 if self.client is False: 471 return [] 472 473 # If we've received the response headers from the server they can't 474 # guarantee we still have any state around. Other implementations 475 # (like nghttp2) ignore ALTSVC in this state, so we will too. 476 if self.headers_received: 477 return [] 478 479 # Otherwise, this is a sensible enough frame to have received. Return 480 # the event and let it get populated. 481 return [AlternativeServiceAvailable()] 482 483 def send_alt_svc(self, previous_state): 484 """ 485 Called when sending an ALTSVC frame on this stream. 486 487 For consistency with the restrictions we apply on receiving ALTSVC 488 frames in ``recv_alt_svc``, we want to restrict when users can send 489 ALTSVC frames to the situations when we ourselves would accept them. 490 491 That means: when we are a server, when we have received the request 492 headers, and when we have not yet sent our own response headers. 493 """ 494 # We should not send ALTSVC after we've sent response headers, as the 495 # client may have disposed of its state. 496 if self.headers_sent: 497 raise ProtocolError( 498 "Cannot send ALTSVC after sending response headers." 499 ) 500 501 return 502 503 504# STATE MACHINE 505# 506# The stream state machine is defined here to avoid the need to allocate it 507# repeatedly for each stream. It cannot be defined in the stream class because 508# it needs to be able to reference the callbacks defined on the class, but 509# because Python's scoping rules are weird the class object is not actually in 510# scope during the body of the class object. 511# 512# For the sake of clarity, we reproduce the RFC 7540 state machine here: 513# 514# +--------+ 515# send PP | | recv PP 516# ,--------| idle |--------. 517# / | | \ 518# v +--------+ v 519# +----------+ | +----------+ 520# | | | send H / | | 521# ,------| reserved | | recv H | reserved |------. 522# | | (local) | | | (remote) | | 523# | +----------+ v +----------+ | 524# | | +--------+ | | 525# | | recv ES | | send ES | | 526# | send H | ,-------| open |-------. | recv H | 527# | | / | | \ | | 528# | v v +--------+ v v | 529# | +----------+ | +----------+ | 530# | | half | | | half | | 531# | | closed | | send R / | closed | | 532# | | (remote) | | recv R | (local) | | 533# | +----------+ | +----------+ | 534# | | | | | 535# | | send ES / | recv ES / | | 536# | | send R / v send R / | | 537# | | recv R +--------+ recv R | | 538# | send R / `----------->| |<-----------' send R / | 539# | recv R | closed | recv R | 540# `----------------------->| |<----------------------' 541# +--------+ 542# 543# send: endpoint sends this frame 544# recv: endpoint receives this frame 545# 546# H: HEADERS frame (with implied CONTINUATIONs) 547# PP: PUSH_PROMISE frame (with implied CONTINUATIONs) 548# ES: END_STREAM flag 549# R: RST_STREAM frame 550# 551# For the purposes of this state machine we treat HEADERS and their 552# associated CONTINUATION frames as a single jumbo frame. The protocol 553# allows/requires this by preventing other frames from being interleved in 554# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is 555# received without a prior HEADERS frame, it *will* be passed to this state 556# machine. The state machine should always reject that frame, either as an 557# invalid transition or because the stream is closed. 558# 559# There is a confusing relationship around PUSH_PROMISE frames. The state 560# machine above considers them to be frames belonging to the new stream, 561# which is *somewhat* true. However, they are sent with the stream ID of 562# their related stream, and are only sendable in some cases. 563# For this reason, our state machine implementation below allows for 564# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also 565# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states. 566# Essentially, for hyper-h2, PUSH_PROMISE frames are effectively sent on 567# two streams. 568# 569# The _transitions dictionary contains a mapping of tuples of 570# (state, input) to tuples of (side_effect_function, end_state). This 571# map contains all allowed transitions: anything not in this map is 572# invalid and immediately causes a transition to ``closed``. 573_transitions = { 574 # State: idle 575 (StreamState.IDLE, StreamInputs.SEND_HEADERS): 576 (H2StreamStateMachine.request_sent, StreamState.OPEN), 577 (StreamState.IDLE, StreamInputs.RECV_HEADERS): 578 (H2StreamStateMachine.request_received, StreamState.OPEN), 579 (StreamState.IDLE, StreamInputs.RECV_DATA): 580 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 581 (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE): 582 (H2StreamStateMachine.send_new_pushed_stream, 583 StreamState.RESERVED_LOCAL), 584 (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE): 585 (H2StreamStateMachine.recv_new_pushed_stream, 586 StreamState.RESERVED_REMOTE), 587 (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE): 588 (None, StreamState.IDLE), 589 (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT): 590 (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL), 591 (StreamState.IDLE, StreamInputs.UPGRADE_SERVER): 592 (H2StreamStateMachine.request_received, 593 StreamState.HALF_CLOSED_REMOTE), 594 595 # State: reserved local 596 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS): 597 (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), 598 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA): 599 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 600 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): 601 (None, StreamState.RESERVED_LOCAL), 602 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): 603 (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL), 604 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM): 605 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 606 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM): 607 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 608 (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): 609 (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL), 610 (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): 611 (None, StreamState.RESERVED_LOCAL), 612 613 # State: reserved remote 614 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS): 615 (H2StreamStateMachine.response_received, 616 StreamState.HALF_CLOSED_LOCAL), 617 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA): 618 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 619 (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): 620 (None, StreamState.RESERVED_REMOTE), 621 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): 622 (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE), 623 (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM): 624 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 625 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM): 626 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 627 (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): 628 (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE), 629 630 # State: open 631 (StreamState.OPEN, StreamInputs.SEND_HEADERS): 632 (H2StreamStateMachine.response_sent, StreamState.OPEN), 633 (StreamState.OPEN, StreamInputs.RECV_HEADERS): 634 (H2StreamStateMachine.response_received, StreamState.OPEN), 635 (StreamState.OPEN, StreamInputs.SEND_DATA): 636 (None, StreamState.OPEN), 637 (StreamState.OPEN, StreamInputs.RECV_DATA): 638 (H2StreamStateMachine.data_received, StreamState.OPEN), 639 (StreamState.OPEN, StreamInputs.SEND_END_STREAM): 640 (None, StreamState.HALF_CLOSED_LOCAL), 641 (StreamState.OPEN, StreamInputs.RECV_END_STREAM): 642 (H2StreamStateMachine.stream_half_closed, 643 StreamState.HALF_CLOSED_REMOTE), 644 (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE): 645 (None, StreamState.OPEN), 646 (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE): 647 (H2StreamStateMachine.window_updated, StreamState.OPEN), 648 (StreamState.OPEN, StreamInputs.SEND_RST_STREAM): 649 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 650 (StreamState.OPEN, StreamInputs.RECV_RST_STREAM): 651 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 652 (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE): 653 (H2StreamStateMachine.send_push_promise, StreamState.OPEN), 654 (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE): 655 (H2StreamStateMachine.recv_push_promise, StreamState.OPEN), 656 (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS): 657 (H2StreamStateMachine.send_informational_response, StreamState.OPEN), 658 (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS): 659 (H2StreamStateMachine.recv_informational_response, StreamState.OPEN), 660 (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE): 661 (H2StreamStateMachine.send_alt_svc, StreamState.OPEN), 662 (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE): 663 (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN), 664 665 # State: half-closed remote 666 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS): 667 (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), 668 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS): 669 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 670 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA): 671 (None, StreamState.HALF_CLOSED_REMOTE), 672 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA): 673 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 674 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM): 675 (H2StreamStateMachine.send_end_stream, StreamState.CLOSED), 676 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): 677 (None, StreamState.HALF_CLOSED_REMOTE), 678 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): 679 (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE), 680 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM): 681 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 682 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM): 683 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 684 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE): 685 (H2StreamStateMachine.send_push_promise, 686 StreamState.HALF_CLOSED_REMOTE), 687 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE): 688 (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), 689 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS): 690 (H2StreamStateMachine.send_informational_response, 691 StreamState.HALF_CLOSED_REMOTE), 692 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE): 693 (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE), 694 (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): 695 (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE), 696 697 # State: half-closed local 698 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS): 699 (H2StreamStateMachine.response_received, 700 StreamState.HALF_CLOSED_LOCAL), 701 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA): 702 (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL), 703 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM): 704 (H2StreamStateMachine.stream_ended, StreamState.CLOSED), 705 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): 706 (None, StreamState.HALF_CLOSED_LOCAL), 707 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): 708 (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL), 709 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM): 710 (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), 711 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM): 712 (H2StreamStateMachine.stream_reset, StreamState.CLOSED), 713 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE): 714 (H2StreamStateMachine.recv_push_promise, 715 StreamState.HALF_CLOSED_LOCAL), 716 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS): 717 (H2StreamStateMachine.recv_informational_response, 718 StreamState.HALF_CLOSED_LOCAL), 719 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): 720 (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL), 721 (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): 722 (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL), 723 724 # State: closed 725 (StreamState.CLOSED, StreamInputs.RECV_END_STREAM): 726 (None, StreamState.CLOSED), 727 (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE): 728 (None, StreamState.CLOSED), 729 730 # RFC 7540 Section 5.1 defines how the end point should react when 731 # receiving a frame on a closed stream with the following statements: 732 # 733 # > An endpoint that receives any frame other than PRIORITY after receiving 734 # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED. 735 # > An endpoint that receives any frames after receiving a frame with the 736 # > END_STREAM flag set MUST treat that as a connection error of type 737 # > STREAM_CLOSED. 738 (StreamState.CLOSED, StreamInputs.RECV_HEADERS): 739 (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), 740 (StreamState.CLOSED, StreamInputs.RECV_DATA): 741 (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), 742 743 # > WINDOW_UPDATE or RST_STREAM frames can be received in this state 744 # > for a short period after a DATA or HEADERS frame containing a 745 # > END_STREAM flag is sent. 746 (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE): 747 (H2StreamStateMachine.window_on_closed_stream, StreamState.CLOSED), 748 (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM): 749 (H2StreamStateMachine.reset_on_closed_stream, StreamState.CLOSED), 750 751 # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is 752 # > neither "open" nor "half-closed (local)" as a connection error of type 753 # > PROTOCOL_ERROR. 754 (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE): 755 (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED), 756 757 # Also, users should be forbidden from sending on closed streams. 758 (StreamState.CLOSED, StreamInputs.SEND_HEADERS): 759 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 760 (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE): 761 (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED), 762 (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM): 763 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 764 (StreamState.CLOSED, StreamInputs.SEND_DATA): 765 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 766 (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE): 767 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 768 (StreamState.CLOSED, StreamInputs.SEND_END_STREAM): 769 (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), 770} 771 772 773class H2Stream(object): 774 """ 775 A low-level HTTP/2 stream object. This handles building and receiving 776 frames and maintains per-stream state. 777 778 This wraps a HTTP/2 Stream state machine implementation, ensuring that 779 frames can only be sent/received when the stream is in a valid state. 780 Attempts to create frames that cannot be sent will raise a 781 ``ProtocolError``. 782 """ 783 def __init__(self, 784 stream_id, 785 config, 786 inbound_window_size, 787 outbound_window_size): 788 self.state_machine = H2StreamStateMachine(stream_id) 789 self.stream_id = stream_id 790 self.max_outbound_frame_size = None 791 self.request_method = None 792 793 # The curent value of the outbound stream flow control window 794 self.outbound_flow_control_window = outbound_window_size 795 796 # The flow control manager. 797 self._inbound_window_manager = WindowManager(inbound_window_size) 798 799 # The expected content length, if any. 800 self._expected_content_length = None 801 802 # The actual received content length. Always tracked. 803 self._actual_content_length = 0 804 805 # The authority we believe this stream belongs to. 806 self._authority = None 807 808 # The configuration for this stream. 809 self.config = config 810 811 def __repr__(self): 812 return "<%s id:%d state:%r>" % ( 813 type(self).__name__, 814 self.stream_id, 815 self.state_machine.state 816 ) 817 818 @property 819 def inbound_flow_control_window(self): 820 """ 821 The size of the inbound flow control window for the stream. This is 822 rarely publicly useful: instead, use :meth:`remote_flow_control_window 823 <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is 824 largely present to provide a shortcut to this data. 825 """ 826 return self._inbound_window_manager.current_window_size 827 828 @property 829 def open(self): 830 """ 831 Whether the stream is 'open' in any sense: that is, whether it counts 832 against the number of concurrent streams. 833 """ 834 # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either 835 # the OPEN state or either of the HALF_CLOSED states. Perplexingly, 836 # this excludes the reserved states. 837 # For more detail on why we're doing this in this slightly weird way, 838 # see the comment on ``STREAM_OPEN`` at the top of the file. 839 return STREAM_OPEN[self.state_machine.state] 840 841 @property 842 def closed(self): 843 """ 844 Whether the stream is closed. 845 """ 846 return self.state_machine.state == StreamState.CLOSED 847 848 @property 849 def closed_by(self): 850 """ 851 Returns how the stream was closed, as one of StreamClosedBy. 852 """ 853 return self.state_machine.stream_closed_by 854 855 def upgrade(self, client_side): 856 """ 857 Called by the connection to indicate that this stream is the initial 858 request/response of an upgraded connection. Places the stream into an 859 appropriate state. 860 """ 861 self.config.logger.debug("Upgrading %r", self) 862 863 assert self.stream_id == 1 864 input_ = ( 865 StreamInputs.UPGRADE_CLIENT if client_side 866 else StreamInputs.UPGRADE_SERVER 867 ) 868 869 # This may return events, we deliberately don't want them. 870 self.state_machine.process_input(input_) 871 return 872 873 def send_headers(self, headers, encoder, end_stream=False): 874 """ 875 Returns a list of HEADERS/CONTINUATION frames to emit as either headers 876 or trailers. 877 """ 878 self.config.logger.debug("Send headers %s on %r", headers, self) 879 # Convert headers to two-tuples. 880 # FIXME: The fallback for dictionary headers is to be removed in 3.0. 881 try: 882 headers = headers.items() 883 warnings.warn( 884 "Implicit conversion of dictionaries to two-tuples for " 885 "headers is deprecated and will be removed in 3.0.", 886 DeprecationWarning 887 ) 888 except AttributeError: 889 headers = headers 890 891 # Because encoding headers makes an irreversible change to the header 892 # compression context, we make the state transition before we encode 893 # them. 894 895 # First, check if we're a client. If we are, no problem: if we aren't, 896 # we need to scan the header block to see if this is an informational 897 # response. 898 input_ = StreamInputs.SEND_HEADERS 899 if ((not self.state_machine.client) and 900 is_informational_response(headers)): 901 if end_stream: 902 raise ProtocolError( 903 "Cannot set END_STREAM on informational responses." 904 ) 905 906 input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS 907 908 events = self.state_machine.process_input(input_) 909 910 hf = HeadersFrame(self.stream_id) 911 hdr_validation_flags = self._build_hdr_validation_flags(events) 912 frames = self._build_headers_frames( 913 headers, encoder, hf, hdr_validation_flags 914 ) 915 916 if end_stream: 917 # Not a bug: the END_STREAM flag is valid on the initial HEADERS 918 # frame, not the CONTINUATION frames that follow. 919 self.state_machine.process_input(StreamInputs.SEND_END_STREAM) 920 frames[0].flags.add('END_STREAM') 921 922 if self.state_machine.trailers_sent and not end_stream: 923 raise ProtocolError("Trailers must have END_STREAM set.") 924 925 if self.state_machine.client and self._authority is None: 926 self._authority = authority_from_headers(headers) 927 928 # store request method for _initialize_content_length 929 self.request_method = extract_method_header(headers) 930 931 return frames 932 933 def push_stream_in_band(self, related_stream_id, headers, encoder): 934 """ 935 Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed 936 stream header. Called on the stream that has the PUSH_PROMISE frame 937 sent on it. 938 """ 939 self.config.logger.debug("Push stream %r", self) 940 941 # Because encoding headers makes an irreversible change to the header 942 # compression context, we make the state transition *first*. 943 944 events = self.state_machine.process_input( 945 StreamInputs.SEND_PUSH_PROMISE 946 ) 947 948 ppf = PushPromiseFrame(self.stream_id) 949 ppf.promised_stream_id = related_stream_id 950 hdr_validation_flags = self._build_hdr_validation_flags(events) 951 frames = self._build_headers_frames( 952 headers, encoder, ppf, hdr_validation_flags 953 ) 954 955 return frames 956 957 def locally_pushed(self): 958 """ 959 Mark this stream as one that was pushed by this peer. Must be called 960 immediately after initialization. Sends no frames, simply updates the 961 state machine. 962 """ 963 # This does not trigger any events. 964 events = self.state_machine.process_input( 965 StreamInputs.SEND_PUSH_PROMISE 966 ) 967 assert not events 968 return [] 969 970 def send_data(self, data, end_stream=False, pad_length=None): 971 """ 972 Prepare some data frames. Optionally end the stream. 973 974 .. warning:: Does not perform flow control checks. 975 """ 976 self.config.logger.debug( 977 "Send data on %r with end stream set to %s", self, end_stream 978 ) 979 980 self.state_machine.process_input(StreamInputs.SEND_DATA) 981 982 df = DataFrame(self.stream_id) 983 df.data = data 984 if end_stream: 985 self.state_machine.process_input(StreamInputs.SEND_END_STREAM) 986 df.flags.add('END_STREAM') 987 if pad_length is not None: 988 df.flags.add('PADDED') 989 df.pad_length = pad_length 990 991 # Subtract flow_controlled_length to account for possible padding 992 self.outbound_flow_control_window -= df.flow_controlled_length 993 assert self.outbound_flow_control_window >= 0 994 995 return [df] 996 997 def end_stream(self): 998 """ 999 End a stream without sending data. 1000 """ 1001 self.config.logger.debug("End stream %r", self) 1002 1003 self.state_machine.process_input(StreamInputs.SEND_END_STREAM) 1004 df = DataFrame(self.stream_id) 1005 df.flags.add('END_STREAM') 1006 return [df] 1007 1008 def advertise_alternative_service(self, field_value): 1009 """ 1010 Advertise an RFC 7838 alternative service. The semantics of this are 1011 better documented in the ``H2Connection`` class. 1012 """ 1013 self.config.logger.debug( 1014 "Advertise alternative service of %r for %r", field_value, self 1015 ) 1016 self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE) 1017 asf = AltSvcFrame(self.stream_id) 1018 asf.field = field_value 1019 return [asf] 1020 1021 def increase_flow_control_window(self, increment): 1022 """ 1023 Increase the size of the flow control window for the remote side. 1024 """ 1025 self.config.logger.debug( 1026 "Increase flow control window for %r by %d", 1027 self, increment 1028 ) 1029 self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE) 1030 self._inbound_window_manager.window_opened(increment) 1031 1032 wuf = WindowUpdateFrame(self.stream_id) 1033 wuf.window_increment = increment 1034 return [wuf] 1035 1036 def receive_push_promise_in_band(self, 1037 promised_stream_id, 1038 headers, 1039 header_encoding): 1040 """ 1041 Receives a push promise frame sent on this stream, pushing a remote 1042 stream. This is called on the stream that has the PUSH_PROMISE sent 1043 on it. 1044 """ 1045 self.config.logger.debug( 1046 "Receive Push Promise on %r for remote stream %d", 1047 self, promised_stream_id 1048 ) 1049 events = self.state_machine.process_input( 1050 StreamInputs.RECV_PUSH_PROMISE 1051 ) 1052 events[0].pushed_stream_id = promised_stream_id 1053 1054 if self.config.validate_inbound_headers: 1055 hdr_validation_flags = self._build_hdr_validation_flags(events) 1056 headers = validate_headers(headers, hdr_validation_flags) 1057 1058 if header_encoding: 1059 headers = list(_decode_headers(headers, header_encoding)) 1060 events[0].headers = headers 1061 return [], events 1062 1063 def remotely_pushed(self, pushed_headers): 1064 """ 1065 Mark this stream as one that was pushed by the remote peer. Must be 1066 called immediately after initialization. Sends no frames, simply 1067 updates the state machine. 1068 """ 1069 self.config.logger.debug("%r pushed by remote peer", self) 1070 events = self.state_machine.process_input( 1071 StreamInputs.RECV_PUSH_PROMISE 1072 ) 1073 self._authority = authority_from_headers(pushed_headers) 1074 return [], events 1075 1076 def receive_headers(self, headers, end_stream, header_encoding): 1077 """ 1078 Receive a set of headers (or trailers). 1079 """ 1080 if is_informational_response(headers): 1081 if end_stream: 1082 raise ProtocolError( 1083 "Cannot set END_STREAM on informational responses" 1084 ) 1085 input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS 1086 else: 1087 input_ = StreamInputs.RECV_HEADERS 1088 1089 events = self.state_machine.process_input(input_) 1090 1091 if end_stream: 1092 es_events = self.state_machine.process_input( 1093 StreamInputs.RECV_END_STREAM 1094 ) 1095 events[0].stream_ended = es_events[0] 1096 events += es_events 1097 1098 self._initialize_content_length(headers) 1099 1100 if isinstance(events[0], TrailersReceived): 1101 if not end_stream: 1102 raise ProtocolError("Trailers must have END_STREAM set") 1103 1104 if self.config.validate_inbound_headers: 1105 hdr_validation_flags = self._build_hdr_validation_flags(events) 1106 headers = validate_headers(headers, hdr_validation_flags) 1107 1108 if header_encoding: 1109 headers = list(_decode_headers(headers, header_encoding)) 1110 1111 events[0].headers = headers 1112 return [], events 1113 1114 def receive_data(self, data, end_stream, flow_control_len): 1115 """ 1116 Receive some data. 1117 """ 1118 self.config.logger.debug( 1119 "Receive data on %r with end stream %s and flow control length " 1120 "set to %d", self, end_stream, flow_control_len 1121 ) 1122 events = self.state_machine.process_input(StreamInputs.RECV_DATA) 1123 self._inbound_window_manager.window_consumed(flow_control_len) 1124 self._track_content_length(len(data), end_stream) 1125 1126 if end_stream: 1127 es_events = self.state_machine.process_input( 1128 StreamInputs.RECV_END_STREAM 1129 ) 1130 events[0].stream_ended = es_events[0] 1131 events.extend(es_events) 1132 1133 events[0].data = data 1134 events[0].flow_controlled_length = flow_control_len 1135 return [], events 1136 1137 def receive_window_update(self, increment): 1138 """ 1139 Handle a WINDOW_UPDATE increment. 1140 """ 1141 self.config.logger.debug( 1142 "Receive Window Update on %r for increment of %d", 1143 self, increment 1144 ) 1145 events = self.state_machine.process_input( 1146 StreamInputs.RECV_WINDOW_UPDATE 1147 ) 1148 frames = [] 1149 1150 # If we encounter a problem with incrementing the flow control window, 1151 # this should be treated as a *stream* error, not a *connection* error. 1152 # That means we need to catch the error and forcibly close the stream. 1153 if events: 1154 events[0].delta = increment 1155 try: 1156 self.outbound_flow_control_window = guard_increment_window( 1157 self.outbound_flow_control_window, 1158 increment 1159 ) 1160 except FlowControlError: 1161 # Ok, this is bad. We're going to need to perform a local 1162 # reset. 1163 event = StreamReset() 1164 event.stream_id = self.stream_id 1165 event.error_code = ErrorCodes.FLOW_CONTROL_ERROR 1166 event.remote_reset = False 1167 1168 events = [event] 1169 frames = self.reset_stream(event.error_code) 1170 1171 return frames, events 1172 1173 def receive_continuation(self): 1174 """ 1175 A naked CONTINUATION frame has been received. This is always an error, 1176 but the type of error it is depends on the state of the stream and must 1177 transition the state of the stream, so we need to handle it. 1178 """ 1179 self.config.logger.debug("Receive Continuation frame on %r", self) 1180 self.state_machine.process_input( 1181 StreamInputs.RECV_CONTINUATION 1182 ) 1183 assert False, "Should not be reachable" 1184 1185 def receive_alt_svc(self, frame): 1186 """ 1187 An Alternative Service frame was received on the stream. This frame 1188 inherits the origin associated with this stream. 1189 """ 1190 self.config.logger.debug( 1191 "Receive Alternative Service frame on stream %r", self 1192 ) 1193 1194 # If the origin is present, RFC 7838 says we have to ignore it. 1195 if frame.origin: 1196 return [], [] 1197 1198 events = self.state_machine.process_input( 1199 StreamInputs.RECV_ALTERNATIVE_SERVICE 1200 ) 1201 1202 # There are lots of situations where we want to ignore the ALTSVC 1203 # frame. If we need to pay attention, we'll have an event and should 1204 # fill it out. 1205 if events: 1206 assert isinstance(events[0], AlternativeServiceAvailable) 1207 events[0].origin = self._authority 1208 events[0].field_value = frame.field 1209 1210 return [], events 1211 1212 def reset_stream(self, error_code=0): 1213 """ 1214 Close the stream locally. Reset the stream with an error code. 1215 """ 1216 self.config.logger.debug( 1217 "Local reset %r with error code: %d", self, error_code 1218 ) 1219 self.state_machine.process_input(StreamInputs.SEND_RST_STREAM) 1220 1221 rsf = RstStreamFrame(self.stream_id) 1222 rsf.error_code = error_code 1223 return [rsf] 1224 1225 def stream_reset(self, frame): 1226 """ 1227 Handle a stream being reset remotely. 1228 """ 1229 self.config.logger.debug( 1230 "Remote reset %r with error code: %d", self, frame.error_code 1231 ) 1232 events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM) 1233 1234 if events: 1235 # We don't fire an event if this stream is already closed. 1236 events[0].error_code = _error_code_from_int(frame.error_code) 1237 1238 return [], events 1239 1240 def acknowledge_received_data(self, acknowledged_size): 1241 """ 1242 The user has informed us that they've processed some amount of data 1243 that was received on this stream. Pass that to the window manager and 1244 potentially return some WindowUpdate frames. 1245 """ 1246 self.config.logger.debug( 1247 "Acknowledge received data with size %d on %r", 1248 acknowledged_size, self 1249 ) 1250 increment = self._inbound_window_manager.process_bytes( 1251 acknowledged_size 1252 ) 1253 if increment: 1254 f = WindowUpdateFrame(self.stream_id) 1255 f.window_increment = increment 1256 return [f] 1257 1258 return [] 1259 1260 def _build_hdr_validation_flags(self, events): 1261 """ 1262 Constructs a set of header validation flags for use when normalizing 1263 and validating header blocks. 1264 """ 1265 is_trailer = isinstance( 1266 events[0], (_TrailersSent, TrailersReceived) 1267 ) 1268 is_response_header = isinstance( 1269 events[0], 1270 ( 1271 _ResponseSent, 1272 ResponseReceived, 1273 InformationalResponseReceived 1274 ) 1275 ) 1276 is_push_promise = isinstance( 1277 events[0], (PushedStreamReceived, _PushedRequestSent) 1278 ) 1279 1280 return HeaderValidationFlags( 1281 is_client=self.state_machine.client, 1282 is_trailer=is_trailer, 1283 is_response_header=is_response_header, 1284 is_push_promise=is_push_promise, 1285 ) 1286 1287 def _build_headers_frames(self, 1288 headers, 1289 encoder, 1290 first_frame, 1291 hdr_validation_flags): 1292 """ 1293 Helper method to build headers or push promise frames. 1294 """ 1295 # We need to lowercase the header names, and to ensure that secure 1296 # header fields are kept out of compression contexts. 1297 if self.config.normalize_outbound_headers: 1298 headers = normalize_outbound_headers( 1299 headers, hdr_validation_flags 1300 ) 1301 if self.config.validate_outbound_headers: 1302 headers = validate_outbound_headers( 1303 headers, hdr_validation_flags 1304 ) 1305 1306 encoded_headers = encoder.encode(headers) 1307 1308 # Slice into blocks of max_outbound_frame_size. Be careful with this: 1309 # it only works right because we never send padded frames or priority 1310 # information on the frames. Revisit this if we do. 1311 header_blocks = [ 1312 encoded_headers[i:i+self.max_outbound_frame_size] 1313 for i in range( 1314 0, len(encoded_headers), self.max_outbound_frame_size 1315 ) 1316 ] 1317 1318 frames = [] 1319 first_frame.data = header_blocks[0] 1320 frames.append(first_frame) 1321 1322 for block in header_blocks[1:]: 1323 cf = ContinuationFrame(self.stream_id) 1324 cf.data = block 1325 frames.append(cf) 1326 1327 frames[-1].flags.add('END_HEADERS') 1328 return frames 1329 1330 def _initialize_content_length(self, headers): 1331 """ 1332 Checks the headers for a content-length header and initializes the 1333 _expected_content_length field from it. It's not an error for no 1334 Content-Length header to be present. 1335 """ 1336 if self.request_method == b'HEAD': 1337 self._expected_content_length = 0 1338 return 1339 1340 for n, v in headers: 1341 if n == b'content-length': 1342 try: 1343 self._expected_content_length = int(v, 10) 1344 except ValueError: 1345 raise ProtocolError( 1346 "Invalid content-length header: %s" % v 1347 ) 1348 1349 return 1350 1351 def _track_content_length(self, length, end_stream): 1352 """ 1353 Update the expected content length in response to data being received. 1354 Validates that the appropriate amount of data is sent. Always updates 1355 the received data, but only validates the length against the 1356 content-length header if one was sent. 1357 1358 :param length: The length of the body chunk received. 1359 :param end_stream: If this is the last body chunk received. 1360 """ 1361 self._actual_content_length += length 1362 actual = self._actual_content_length 1363 expected = self._expected_content_length 1364 1365 if expected is not None: 1366 if expected < actual: 1367 raise InvalidBodyLengthError(expected, actual) 1368 1369 if end_stream and expected != actual: 1370 raise InvalidBodyLengthError(expected, actual) 1371 1372 def _inbound_flow_control_change_from_settings(self, delta): 1373 """ 1374 We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to 1375 update the target window size for flow control. For our flow control 1376 strategy, this means we need to do two things: we need to adjust the 1377 current window size, but we also need to set the target maximum window 1378 size to the new value. 1379 """ 1380 new_max_size = self._inbound_window_manager.max_window_size + delta 1381 self._inbound_window_manager.window_opened(delta) 1382 self._inbound_window_manager.max_window_size = new_max_size 1383 1384 1385def _decode_headers(headers, encoding): 1386 """ 1387 Given an iterable of header two-tuples and an encoding, decodes those 1388 headers using that encoding while preserving the type of the header tuple. 1389 This ensures that the use of ``HeaderTuple`` is preserved. 1390 """ 1391 for header in headers: 1392 # This function expects to work on decoded headers, which are always 1393 # HeaderTuple objects. 1394 assert isinstance(header, HeaderTuple) 1395 1396 name, value = header 1397 name = name.decode(encoding) 1398 value = value.decode(encoding) 1399 yield header.__class__(name, value) 1400