1# -*- coding: utf-8 -*- 2""" 3test_closed_streams 4~~~~~~~~~~~~~~~~~~~ 5 6Tests that we handle closed streams correctly. 7""" 8import pytest 9 10import h2.config 11import h2.connection 12import h2.errors 13import h2.events 14import h2.exceptions 15 16 17class TestClosedStreams(object): 18 example_request_headers = [ 19 (':authority', 'example.com'), 20 (':path', '/'), 21 (':scheme', 'https'), 22 (':method', 'GET'), 23 ] 24 example_response_headers = [ 25 (':status', '200'), 26 ('server', 'fake-serv/0.1.0') 27 ] 28 server_config = h2.config.H2Configuration(client_side=False) 29 30 def test_can_receive_multiple_rst_stream_frames(self, frame_factory): 31 """ 32 Multiple RST_STREAM frames can be received, either at once or well 33 after one another. Only the first fires an event. 34 """ 35 c = h2.connection.H2Connection() 36 c.initiate_connection() 37 c.send_headers(1, self.example_request_headers, end_stream=True) 38 39 f = frame_factory.build_rst_stream_frame(stream_id=1) 40 events = c.receive_data(f.serialize() * 3) 41 42 # Force an iteration over all the streams to remove them. 43 c.open_outbound_streams 44 45 # Receive more data. 46 events += c.receive_data(f.serialize() * 3) 47 48 assert len(events) == 1 49 event = events[0] 50 51 assert isinstance(event, h2.events.StreamReset) 52 53 def test_receiving_low_stream_id_causes_goaway(self, frame_factory): 54 """ 55 The remote peer creating a stream with a lower ID than one we've seen 56 causes a GOAWAY frame. 57 """ 58 c = h2.connection.H2Connection(config=self.server_config) 59 c.receive_data(frame_factory.preamble()) 60 c.initiate_connection() 61 62 f = frame_factory.build_headers_frame( 63 self.example_request_headers, 64 stream_id=3, 65 ) 66 c.receive_data(f.serialize()) 67 c.clear_outbound_data_buffer() 68 69 f = frame_factory.build_headers_frame( 70 self.example_request_headers, 71 stream_id=1, 72 ) 73 74 with pytest.raises(h2.exceptions.StreamIDTooLowError) as e: 75 c.receive_data(f.serialize()) 76 77 assert e.value.stream_id == 1 78 assert e.value.max_stream_id == 3 79 80 f = frame_factory.build_goaway_frame( 81 last_stream_id=3, 82 error_code=h2.errors.ErrorCodes.PROTOCOL_ERROR, 83 ) 84 assert c.data_to_send() == f.serialize() 85 86 def test_closed_stream_not_present_in_streams_dict(self, frame_factory): 87 """ 88 When streams have been closed, they get removed from the streams 89 dictionary the next time we count the open streams. 90 """ 91 c = h2.connection.H2Connection(config=self.server_config) 92 c.receive_data(frame_factory.preamble()) 93 c.initiate_connection() 94 95 f = frame_factory.build_headers_frame(self.example_request_headers) 96 c.receive_data(f.serialize()) 97 c.push_stream(1, 2, self.example_request_headers) 98 c.reset_stream(1) 99 c.clear_outbound_data_buffer() 100 101 f = frame_factory.build_rst_stream_frame(stream_id=2) 102 c.receive_data(f.serialize()) 103 104 # Force a count of the streams. 105 assert not c.open_outbound_streams 106 107 # The streams dictionary should be empty. 108 assert not c.streams 109 110 def test_receive_rst_stream_on_closed_stream(self, frame_factory): 111 """ 112 RST_STREAM frame should be ignored if stream is in a closed state. 113 See RFC 7540 Section 5.1 (closed state) 114 """ 115 c = h2.connection.H2Connection() 116 c.initiate_connection() 117 118 # Client sends request 119 c.send_headers(1, self.example_request_headers) 120 121 # Some time passes and client sends DATA frame and closes stream, 122 # so it is in a half-closed state 123 c.send_data(1, b'some data', end_stream=True) 124 125 # Server received HEADERS frame but DATA frame is still on the way. 126 # Stream is in open state on the server-side. In this state server is 127 # allowed to end stream and reset it - this trick helps immediately 128 # close stream on the server-side. 129 headers_frame = frame_factory.build_headers_frame( 130 [(':status', '200')], 131 flags=['END_STREAM'], 132 stream_id=1, 133 ) 134 events = c.receive_data(headers_frame.serialize()) 135 assert len(events) == 2 136 response_received, stream_ended = events 137 assert isinstance(response_received, h2.events.ResponseReceived) 138 assert isinstance(stream_ended, h2.events.StreamEnded) 139 140 rst_stream_frame = frame_factory.build_rst_stream_frame(stream_id=1) 141 events = c.receive_data(rst_stream_frame.serialize()) 142 assert not events 143 144 def test_receive_window_update_on_closed_stream(self, frame_factory): 145 """ 146 WINDOW_UPDATE frame should be ignored if stream is in a closed state. 147 See RFC 7540 Section 5.1 (closed state) 148 """ 149 c = h2.connection.H2Connection() 150 c.initiate_connection() 151 152 # Client sends request 153 c.send_headers(1, self.example_request_headers) 154 155 # Some time passes and client sends DATA frame and closes stream, 156 # so it is in a half-closed state 157 c.send_data(1, b'some data', end_stream=True) 158 159 # Server received HEADERS frame but DATA frame is still on the way. 160 # Stream is in open state on the server-side. In this state server is 161 # allowed to end stream and after that acknowledge received data by 162 # sending WINDOW_UPDATE frames. 163 headers_frame = frame_factory.build_headers_frame( 164 [(':status', '200')], 165 flags=['END_STREAM'], 166 stream_id=1, 167 ) 168 events = c.receive_data(headers_frame.serialize()) 169 assert len(events) == 2 170 response_received, stream_ended = events 171 assert isinstance(response_received, h2.events.ResponseReceived) 172 assert isinstance(stream_ended, h2.events.StreamEnded) 173 174 window_update_frame = frame_factory.build_window_update_frame( 175 stream_id=1, 176 increment=1, 177 ) 178 events = c.receive_data(window_update_frame.serialize()) 179 assert not events 180 181 182class TestStreamsClosedByEndStream(object): 183 example_request_headers = [ 184 (':authority', 'example.com'), 185 (':path', '/'), 186 (':scheme', 'https'), 187 (':method', 'GET'), 188 ] 189 example_response_headers = [ 190 (':status', '200'), 191 ('server', 'fake-serv/0.1.0') 192 ] 193 server_config = h2.config.H2Configuration(client_side=False) 194 195 @pytest.mark.parametrize( 196 "frame", 197 [ 198 lambda self, ff: ff.build_headers_frame( 199 self.example_request_headers, flags=['END_STREAM']), 200 lambda self, ff: ff.build_headers_frame( 201 self.example_request_headers), 202 ] 203 ) 204 @pytest.mark.parametrize("clear_streams", [True, False]) 205 def test_frames_after_recv_end_will_error(self, 206 frame_factory, 207 frame, 208 clear_streams): 209 """ 210 A stream that is closed by receiving END_STREAM raises 211 ProtocolError when it receives an unexpected frame. 212 """ 213 c = h2.connection.H2Connection(config=self.server_config) 214 c.receive_data(frame_factory.preamble()) 215 c.initiate_connection() 216 217 f = frame_factory.build_headers_frame( 218 self.example_request_headers, flags=['END_STREAM'] 219 ) 220 c.receive_data(f.serialize()) 221 c.send_headers( 222 stream_id=1, 223 headers=self.example_response_headers, 224 end_stream=True 225 ) 226 227 if clear_streams: 228 # Call open_inbound_streams to force the connection to clean 229 # closed streams. 230 c.open_inbound_streams 231 232 c.clear_outbound_data_buffer() 233 234 f = frame(self, frame_factory) 235 with pytest.raises(h2.exceptions.ProtocolError): 236 c.receive_data(f.serialize()) 237 238 f = frame_factory.build_goaway_frame( 239 last_stream_id=1, 240 error_code=h2.errors.ErrorCodes.STREAM_CLOSED, 241 ) 242 assert c.data_to_send() == f.serialize() 243 244 @pytest.mark.parametrize( 245 "frame", 246 [ 247 lambda self, ff: ff.build_headers_frame( 248 self.example_response_headers, flags=['END_STREAM']), 249 lambda self, ff: ff.build_headers_frame( 250 self.example_response_headers), 251 ] 252 ) 253 @pytest.mark.parametrize("clear_streams", [True, False]) 254 def test_frames_after_send_end_will_error(self, 255 frame_factory, 256 frame, 257 clear_streams): 258 """ 259 A stream that is closed by sending END_STREAM raises 260 ProtocolError when it receives an unexpected frame. 261 """ 262 c = h2.connection.H2Connection() 263 c.initiate_connection() 264 c.send_headers(stream_id=1, headers=self.example_request_headers, 265 end_stream=True) 266 267 f = frame_factory.build_headers_frame( 268 self.example_response_headers, flags=['END_STREAM'] 269 ) 270 c.receive_data(f.serialize()) 271 272 if clear_streams: 273 # Call open_outbound_streams to force the connection to clean 274 # closed streams. 275 c.open_outbound_streams 276 277 c.clear_outbound_data_buffer() 278 279 f = frame(self, frame_factory) 280 with pytest.raises(h2.exceptions.ProtocolError): 281 c.receive_data(f.serialize()) 282 283 f = frame_factory.build_goaway_frame( 284 last_stream_id=0, 285 error_code=h2.errors.ErrorCodes.STREAM_CLOSED, 286 ) 287 assert c.data_to_send() == f.serialize() 288 289 @pytest.mark.parametrize( 290 "frame", 291 [ 292 lambda self, ff: ff.build_window_update_frame(1, 1), 293 lambda self, ff: ff.build_rst_stream_frame(1) 294 ] 295 ) 296 def test_frames_after_send_end_will_be_ignored(self, 297 frame_factory, 298 frame): 299 """ 300 A stream that is closed by sending END_STREAM will raise 301 ProtocolError when received unexpected frame. 302 """ 303 c = h2.connection.H2Connection(config=self.server_config) 304 c.receive_data(frame_factory.preamble()) 305 c.initiate_connection() 306 307 f = frame_factory.build_headers_frame( 308 self.example_request_headers, flags=['END_STREAM'] 309 ) 310 c.receive_data(f.serialize()) 311 c.send_headers( 312 stream_id=1, 313 headers=self.example_response_headers, 314 end_stream=True 315 ) 316 317 c.clear_outbound_data_buffer() 318 319 f = frame(self, frame_factory) 320 events = c.receive_data(f.serialize()) 321 322 assert not events 323 324 325class TestStreamsClosedByRstStream(object): 326 example_request_headers = [ 327 (':authority', 'example.com'), 328 (':path', '/'), 329 (':scheme', 'https'), 330 (':method', 'GET'), 331 ] 332 example_response_headers = [ 333 (':status', '200'), 334 ('server', 'fake-serv/0.1.0') 335 ] 336 server_config = h2.config.H2Configuration(client_side=False) 337 338 @pytest.mark.parametrize( 339 "frame", 340 [ 341 lambda self, ff: ff.build_headers_frame( 342 self.example_request_headers), 343 lambda self, ff: ff.build_headers_frame( 344 self.example_request_headers, flags=['END_STREAM']), 345 ] 346 ) 347 def test_resets_further_frames_after_recv_reset(self, 348 frame_factory, 349 frame): 350 """ 351 A stream that is closed by receive RST_STREAM can receive further 352 frames: it simply sends RST_STREAM for it, and additionally 353 WINDOW_UPDATE for DATA frames. 354 """ 355 c = h2.connection.H2Connection(config=self.server_config) 356 c.receive_data(frame_factory.preamble()) 357 c.initiate_connection() 358 359 header_frame = frame_factory.build_headers_frame( 360 self.example_request_headers, flags=['END_STREAM'] 361 ) 362 c.receive_data(header_frame.serialize()) 363 364 c.send_headers( 365 stream_id=1, 366 headers=self.example_response_headers, 367 end_stream=False 368 ) 369 370 rst_frame = frame_factory.build_rst_stream_frame( 371 1, h2.errors.ErrorCodes.STREAM_CLOSED 372 ) 373 c.receive_data(rst_frame.serialize()) 374 c.clear_outbound_data_buffer() 375 376 f = frame(self, frame_factory) 377 events = c.receive_data(f.serialize()) 378 379 rst_frame = frame_factory.build_rst_stream_frame( 380 1, h2.errors.ErrorCodes.STREAM_CLOSED 381 ) 382 assert not events 383 assert c.data_to_send() == rst_frame.serialize() 384 385 events = c.receive_data(f.serialize() * 3) 386 assert not events 387 assert c.data_to_send() == rst_frame.serialize() * 3 388 389 # Iterate over the streams to make sure it's gone, then confirm the 390 # behaviour is unchanged. 391 c.open_outbound_streams 392 393 events = c.receive_data(f.serialize() * 3) 394 assert not events 395 assert c.data_to_send() == rst_frame.serialize() * 3 396 397 def test_resets_further_data_frames_after_recv_reset(self, 398 frame_factory): 399 """ 400 A stream that is closed by receive RST_STREAM can receive further 401 DATA frames: it simply sends WINDOW_UPDATE for the connection flow 402 window, and RST_STREAM for the stream. 403 """ 404 c = h2.connection.H2Connection(config=self.server_config) 405 c.receive_data(frame_factory.preamble()) 406 c.initiate_connection() 407 408 header_frame = frame_factory.build_headers_frame( 409 self.example_request_headers, flags=['END_STREAM'] 410 ) 411 c.receive_data(header_frame.serialize()) 412 413 c.send_headers( 414 stream_id=1, 415 headers=self.example_response_headers, 416 end_stream=False 417 ) 418 419 rst_frame = frame_factory.build_rst_stream_frame( 420 1, h2.errors.ErrorCodes.STREAM_CLOSED 421 ) 422 c.receive_data(rst_frame.serialize()) 423 c.clear_outbound_data_buffer() 424 425 f = frame_factory.build_data_frame( 426 data=b'some data' 427 ) 428 429 events = c.receive_data(f.serialize()) 430 assert not events 431 432 expected = frame_factory.build_rst_stream_frame( 433 stream_id=1, 434 error_code=h2.errors.ErrorCodes.STREAM_CLOSED, 435 ).serialize() 436 assert c.data_to_send() == expected 437 438 events = c.receive_data(f.serialize() * 3) 439 assert not events 440 assert c.data_to_send() == expected * 3 441 442 # Iterate over the streams to make sure it's gone, then confirm the 443 # behaviour is unchanged. 444 c.open_outbound_streams 445 446 events = c.receive_data(f.serialize() * 3) 447 assert not events 448 assert c.data_to_send() == expected * 3 449 450 @pytest.mark.parametrize( 451 "frame", 452 [ 453 lambda self, ff: ff.build_headers_frame( 454 self.example_request_headers), 455 lambda self, ff: ff.build_headers_frame( 456 self.example_request_headers, flags=['END_STREAM']), 457 ] 458 ) 459 def test_resets_further_frames_after_send_reset(self, 460 frame_factory, 461 frame): 462 """ 463 A stream that is closed by sent RST_STREAM can receive further frames: 464 it simply sends RST_STREAM for it. 465 """ 466 c = h2.connection.H2Connection(config=self.server_config) 467 c.receive_data(frame_factory.preamble()) 468 c.initiate_connection() 469 470 header_frame = frame_factory.build_headers_frame( 471 self.example_request_headers, flags=['END_STREAM'] 472 ) 473 c.receive_data(header_frame.serialize()) 474 475 c.send_headers( 476 stream_id=1, 477 headers=self.example_response_headers, 478 end_stream=False 479 ) 480 481 c.reset_stream(1, h2.errors.ErrorCodes.INTERNAL_ERROR) 482 483 rst_frame = frame_factory.build_rst_stream_frame( 484 1, h2.errors.ErrorCodes.STREAM_CLOSED 485 ) 486 c.clear_outbound_data_buffer() 487 488 f = frame(self, frame_factory) 489 events = c.receive_data(f.serialize()) 490 491 rst_frame = frame_factory.build_rst_stream_frame( 492 1, h2.errors.ErrorCodes.STREAM_CLOSED 493 ) 494 assert not events 495 assert c.data_to_send() == rst_frame.serialize() 496 497 events = c.receive_data(f.serialize() * 3) 498 assert not events 499 assert c.data_to_send() == rst_frame.serialize() * 3 500 501 # Iterate over the streams to make sure it's gone, then confirm the 502 # behaviour is unchanged. 503 c.open_outbound_streams 504 505 events = c.receive_data(f.serialize() * 3) 506 assert not events 507 assert c.data_to_send() == rst_frame.serialize() * 3 508 509 def test_resets_further_data_frames_after_send_reset(self, 510 frame_factory): 511 """ 512 A stream that is closed by sent RST_STREAM can receive further 513 data frames: it simply sends WINDOW_UPDATE and RST_STREAM for it. 514 """ 515 c = h2.connection.H2Connection(config=self.server_config) 516 c.receive_data(frame_factory.preamble()) 517 c.initiate_connection() 518 519 header_frame = frame_factory.build_headers_frame( 520 self.example_request_headers, flags=['END_STREAM'] 521 ) 522 c.receive_data(header_frame.serialize()) 523 524 c.send_headers( 525 stream_id=1, 526 headers=self.example_response_headers, 527 end_stream=False 528 ) 529 530 c.reset_stream(1, h2.errors.ErrorCodes.INTERNAL_ERROR) 531 532 c.clear_outbound_data_buffer() 533 534 f = frame_factory.build_data_frame( 535 data=b'some data' 536 ) 537 events = c.receive_data(f.serialize()) 538 assert not events 539 expected = frame_factory.build_rst_stream_frame( 540 stream_id=1, 541 error_code=h2.errors.ErrorCodes.STREAM_CLOSED, 542 ).serialize() 543 assert c.data_to_send() == expected 544 545 events = c.receive_data(f.serialize() * 3) 546 assert not events 547 assert c.data_to_send() == expected * 3 548 549 # Iterate over the streams to make sure it's gone, then confirm the 550 # behaviour is unchanged. 551 c.open_outbound_streams 552 553 events = c.receive_data(f.serialize() * 3) 554 assert not events 555 assert c.data_to_send() == expected * 3 556