1# -*- coding: utf-8 -*- 2""" 3test_flow_control 4~~~~~~~~~~~~~~~~~ 5 6Tests of the flow control management in h2 7""" 8import pytest 9 10from hypothesis import given 11from hypothesis.strategies import integers 12 13import h2.config 14import h2.connection 15import h2.errors 16import h2.events 17import h2.exceptions 18import h2.settings 19 20 21class TestFlowControl(object): 22 """ 23 Tests of the flow control management in the connection objects. 24 """ 25 example_request_headers = [ 26 (':authority', 'example.com'), 27 (':path', '/'), 28 (':scheme', 'https'), 29 (':method', 'GET'), 30 ] 31 server_config = h2.config.H2Configuration(client_side=False) 32 33 DEFAULT_FLOW_WINDOW = 65535 34 35 def test_flow_control_initializes_properly(self): 36 """ 37 The flow control window for a stream should initially be the default 38 flow control value. 39 """ 40 c = h2.connection.H2Connection() 41 c.send_headers(1, self.example_request_headers) 42 43 assert c.local_flow_control_window(1) == self.DEFAULT_FLOW_WINDOW 44 assert c.remote_flow_control_window(1) == self.DEFAULT_FLOW_WINDOW 45 46 def test_flow_control_decreases_with_sent_data(self): 47 """ 48 When data is sent on a stream, the flow control window should drop. 49 """ 50 c = h2.connection.H2Connection() 51 c.send_headers(1, self.example_request_headers) 52 c.send_data(1, b'some data') 53 54 remaining_length = self.DEFAULT_FLOW_WINDOW - len(b'some data') 55 assert (c.local_flow_control_window(1) == remaining_length) 56 57 @pytest.mark.parametrize("pad_length", [5, 0]) 58 def test_flow_control_decreases_with_sent_data_with_padding(self, 59 pad_length): 60 """ 61 When padded data is sent on a stream, the flow control window drops 62 by the length of the padding plus 1 for the 1-byte padding length 63 field. 64 """ 65 c = h2.connection.H2Connection() 66 c.send_headers(1, self.example_request_headers) 67 68 c.send_data(1, b'some data', pad_length=pad_length) 69 remaining_length = ( 70 self.DEFAULT_FLOW_WINDOW - len(b'some data') - pad_length - 1 71 ) 72 assert c.local_flow_control_window(1) == remaining_length 73 74 def test_flow_control_decreases_with_received_data(self, frame_factory): 75 """ 76 When data is received on a stream, the remote flow control window 77 should drop. 78 """ 79 c = h2.connection.H2Connection(config=self.server_config) 80 c.receive_data(frame_factory.preamble()) 81 f1 = frame_factory.build_headers_frame(self.example_request_headers) 82 f2 = frame_factory.build_data_frame(b'some data') 83 84 c.receive_data(f1.serialize() + f2.serialize()) 85 86 remaining_length = self.DEFAULT_FLOW_WINDOW - len(b'some data') 87 assert (c.remote_flow_control_window(1) == remaining_length) 88 89 def test_flow_control_decreases_with_padded_data(self, frame_factory): 90 """ 91 When padded data is received on a stream, the remote flow control 92 window drops by an amount that includes the padding. 93 """ 94 c = h2.connection.H2Connection(config=self.server_config) 95 c.receive_data(frame_factory.preamble()) 96 f1 = frame_factory.build_headers_frame(self.example_request_headers) 97 f2 = frame_factory.build_data_frame(b'some data', padding_len=10) 98 99 c.receive_data(f1.serialize() + f2.serialize()) 100 101 remaining_length = ( 102 self.DEFAULT_FLOW_WINDOW - len(b'some data') - 10 - 1 103 ) 104 assert (c.remote_flow_control_window(1) == remaining_length) 105 106 def test_flow_control_is_limited_by_connection(self): 107 """ 108 The flow control window is limited by the flow control of the 109 connection. 110 """ 111 c = h2.connection.H2Connection() 112 c.send_headers(1, self.example_request_headers) 113 c.send_data(1, b'some data') 114 c.send_headers(3, self.example_request_headers) 115 116 remaining_length = self.DEFAULT_FLOW_WINDOW - len(b'some data') 117 assert (c.local_flow_control_window(3) == remaining_length) 118 119 def test_remote_flow_control_is_limited_by_connection(self, frame_factory): 120 """ 121 The remote flow control window is limited by the flow control of the 122 connection. 123 """ 124 c = h2.connection.H2Connection(config=self.server_config) 125 c.receive_data(frame_factory.preamble()) 126 f1 = frame_factory.build_headers_frame(self.example_request_headers) 127 f2 = frame_factory.build_data_frame(b'some data') 128 f3 = frame_factory.build_headers_frame( 129 self.example_request_headers, 130 stream_id=3, 131 ) 132 c.receive_data(f1.serialize() + f2.serialize() + f3.serialize()) 133 134 remaining_length = self.DEFAULT_FLOW_WINDOW - len(b'some data') 135 assert (c.remote_flow_control_window(3) == remaining_length) 136 137 def test_cannot_send_more_data_than_window(self): 138 """ 139 Sending more data than the remaining flow control window raises a 140 FlowControlError. 141 """ 142 c = h2.connection.H2Connection() 143 c.send_headers(1, self.example_request_headers) 144 c.outbound_flow_control_window = 5 145 146 with pytest.raises(h2.exceptions.FlowControlError): 147 c.send_data(1, b'some data') 148 149 def test_increasing_connection_window_allows_sending(self, frame_factory): 150 """ 151 Confirm that sending a WindowUpdate frame on the connection frees 152 up space for further frames. 153 """ 154 c = h2.connection.H2Connection() 155 c.send_headers(1, self.example_request_headers) 156 c.outbound_flow_control_window = 5 157 158 with pytest.raises(h2.exceptions.FlowControlError): 159 c.send_data(1, b'some data') 160 161 f = frame_factory.build_window_update_frame( 162 stream_id=0, 163 increment=5, 164 ) 165 c.receive_data(f.serialize()) 166 167 c.clear_outbound_data_buffer() 168 c.send_data(1, b'some data') 169 assert c.data_to_send() 170 171 def test_increasing_stream_window_allows_sending(self, frame_factory): 172 """ 173 Confirm that sending a WindowUpdate frame on the connection frees 174 up space for further frames. 175 """ 176 c = h2.connection.H2Connection() 177 c.send_headers(1, self.example_request_headers) 178 c._get_stream_by_id(1).outbound_flow_control_window = 5 179 180 with pytest.raises(h2.exceptions.FlowControlError): 181 c.send_data(1, b'some data') 182 183 f = frame_factory.build_window_update_frame( 184 stream_id=1, 185 increment=5, 186 ) 187 c.receive_data(f.serialize()) 188 189 c.clear_outbound_data_buffer() 190 c.send_data(1, b'some data') 191 assert c.data_to_send() 192 193 def test_flow_control_shrinks_in_response_to_settings(self, frame_factory): 194 """ 195 Acknowledging SETTINGS_INITIAL_WINDOW_SIZE shrinks the flow control 196 window. 197 """ 198 c = h2.connection.H2Connection() 199 c.send_headers(1, self.example_request_headers) 200 201 assert c.local_flow_control_window(1) == 65535 202 203 f = frame_factory.build_settings_frame( 204 settings={h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 1280} 205 ) 206 c.receive_data(f.serialize()) 207 208 assert c.local_flow_control_window(1) == 1280 209 210 def test_flow_control_grows_in_response_to_settings(self, frame_factory): 211 """ 212 Acknowledging SETTINGS_INITIAL_WINDOW_SIZE grows the flow control 213 window. 214 """ 215 c = h2.connection.H2Connection() 216 c.send_headers(1, self.example_request_headers) 217 218 # Greatly increase the connection flow control window. 219 f = frame_factory.build_window_update_frame( 220 stream_id=0, increment=128000 221 ) 222 c.receive_data(f.serialize()) 223 224 # The stream flow control window is the bottleneck here. 225 assert c.local_flow_control_window(1) == 65535 226 227 f = frame_factory.build_settings_frame( 228 settings={h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 128000} 229 ) 230 c.receive_data(f.serialize()) 231 232 # The stream window is still the bottleneck, but larger now. 233 assert c.local_flow_control_window(1) == 128000 234 235 def test_flow_control_settings_blocked_by_conn_window(self, frame_factory): 236 """ 237 Changing SETTINGS_INITIAL_WINDOW_SIZE does not affect the effective 238 flow control window if the connection window isn't changed. 239 """ 240 c = h2.connection.H2Connection() 241 c.send_headers(1, self.example_request_headers) 242 243 assert c.local_flow_control_window(1) == 65535 244 245 f = frame_factory.build_settings_frame( 246 settings={h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 128000} 247 ) 248 c.receive_data(f.serialize()) 249 250 assert c.local_flow_control_window(1) == 65535 251 252 def test_new_streams_have_flow_control_per_settings(self, frame_factory): 253 """ 254 After a SETTINGS_INITIAL_WINDOW_SIZE change is received, new streams 255 have appropriate new flow control windows. 256 """ 257 c = h2.connection.H2Connection() 258 259 f = frame_factory.build_settings_frame( 260 settings={h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 128000} 261 ) 262 c.receive_data(f.serialize()) 263 264 # Greatly increase the connection flow control window. 265 f = frame_factory.build_window_update_frame( 266 stream_id=0, increment=128000 267 ) 268 c.receive_data(f.serialize()) 269 270 c.send_headers(1, self.example_request_headers) 271 assert c.local_flow_control_window(1) == 128000 272 273 def test_window_update_no_stream(self, frame_factory): 274 """ 275 WindowUpdate frames received without streams fire an appropriate 276 WindowUpdated event. 277 """ 278 c = h2.connection.H2Connection(config=self.server_config) 279 c.receive_data(frame_factory.preamble()) 280 281 f = frame_factory.build_window_update_frame( 282 stream_id=0, 283 increment=5 284 ) 285 events = c.receive_data(f.serialize()) 286 287 assert len(events) == 1 288 event = events[0] 289 290 assert isinstance(event, h2.events.WindowUpdated) 291 assert event.stream_id == 0 292 assert event.delta == 5 293 294 def test_window_update_with_stream(self, frame_factory): 295 """ 296 WindowUpdate frames received with streams fire an appropriate 297 WindowUpdated event. 298 """ 299 c = h2.connection.H2Connection(config=self.server_config) 300 c.receive_data(frame_factory.preamble()) 301 302 f1 = frame_factory.build_headers_frame(self.example_request_headers) 303 f2 = frame_factory.build_window_update_frame( 304 stream_id=1, 305 increment=66 306 ) 307 data = b''.join(map(lambda f: f.serialize(), [f1, f2])) 308 events = c.receive_data(data) 309 310 assert len(events) == 2 311 event = events[1] 312 313 assert isinstance(event, h2.events.WindowUpdated) 314 assert event.stream_id == 1 315 assert event.delta == 66 316 317 def test_we_can_increment_stream_flow_control(self, frame_factory): 318 """ 319 It is possible for the user to increase the flow control window for 320 streams. 321 """ 322 c = h2.connection.H2Connection() 323 c.initiate_connection() 324 c.send_headers(1, self.example_request_headers, end_stream=True) 325 c.clear_outbound_data_buffer() 326 327 expected_frame = frame_factory.build_window_update_frame( 328 stream_id=1, 329 increment=5 330 ) 331 332 events = c.increment_flow_control_window(increment=5, stream_id=1) 333 assert not events 334 assert c.data_to_send() == expected_frame.serialize() 335 336 def test_we_can_increment_connection_flow_control(self, frame_factory): 337 """ 338 It is possible for the user to increase the flow control window for 339 the entire connection. 340 """ 341 c = h2.connection.H2Connection() 342 c.initiate_connection() 343 c.send_headers(1, self.example_request_headers, end_stream=True) 344 c.clear_outbound_data_buffer() 345 346 expected_frame = frame_factory.build_window_update_frame( 347 stream_id=0, 348 increment=5 349 ) 350 351 events = c.increment_flow_control_window(increment=5) 352 assert not events 353 assert c.data_to_send() == expected_frame.serialize() 354 355 def test_we_enforce_our_flow_control_window(self, frame_factory): 356 """ 357 The user can set a low flow control window, which leads to connection 358 teardown if violated. 359 """ 360 c = h2.connection.H2Connection(config=self.server_config) 361 c.receive_data(frame_factory.preamble()) 362 363 # Change the flow control window to 80 bytes. 364 c.update_settings( 365 {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 80} 366 ) 367 f = frame_factory.build_settings_frame({}, ack=True) 368 c.receive_data(f.serialize()) 369 370 # Receive a new stream. 371 f = frame_factory.build_headers_frame(self.example_request_headers) 372 c.receive_data(f.serialize()) 373 374 # Attempt to violate the flow control window. 375 c.clear_outbound_data_buffer() 376 f = frame_factory.build_data_frame(b'\x01' * 100) 377 378 with pytest.raises(h2.exceptions.FlowControlError): 379 c.receive_data(f.serialize()) 380 381 # Verify we tear down appropriately. 382 expected_frame = frame_factory.build_goaway_frame( 383 last_stream_id=1, 384 error_code=h2.errors.ErrorCodes.FLOW_CONTROL_ERROR, 385 ) 386 assert c.data_to_send() == expected_frame.serialize() 387 388 def test_shrink_remote_flow_control_settings(self, frame_factory): 389 """ 390 The remote peer acknowledging our SETTINGS_INITIAL_WINDOW_SIZE shrinks 391 the flow control window. 392 """ 393 c = h2.connection.H2Connection() 394 c.send_headers(1, self.example_request_headers) 395 396 assert c.remote_flow_control_window(1) == 65535 397 398 c.update_settings({h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 1280}) 399 400 f = frame_factory.build_settings_frame({}, ack=True) 401 c.receive_data(f.serialize()) 402 403 assert c.remote_flow_control_window(1) == 1280 404 405 def test_grow_remote_flow_control_settings(self, frame_factory): 406 """ 407 The remote peer acknowledging our SETTINGS_INITIAL_WINDOW_SIZE grows 408 the flow control window. 409 """ 410 c = h2.connection.H2Connection() 411 c.send_headers(1, self.example_request_headers) 412 413 # Increase the connection flow control window greatly. 414 c.increment_flow_control_window(increment=128000) 415 416 assert c.remote_flow_control_window(1) == 65535 417 418 c.update_settings( 419 {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 128000} 420 ) 421 f = frame_factory.build_settings_frame({}, ack=True) 422 c.receive_data(f.serialize()) 423 424 assert c.remote_flow_control_window(1) == 128000 425 426 def test_new_streams_have_remote_flow_control(self, frame_factory): 427 """ 428 After a SETTINGS_INITIAL_WINDOW_SIZE change is acknowledged by the 429 remote peer, new streams have appropriate new flow control windows. 430 """ 431 c = h2.connection.H2Connection() 432 433 c.update_settings( 434 {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 128000} 435 ) 436 f = frame_factory.build_settings_frame({}, ack=True) 437 c.receive_data(f.serialize()) 438 439 # Increase the connection flow control window greatly. 440 c.increment_flow_control_window(increment=128000) 441 442 c.send_headers(1, self.example_request_headers) 443 assert c.remote_flow_control_window(1) == 128000 444 445 @pytest.mark.parametrize( 446 'increment', [0, -15, 2**31] 447 ) 448 def test_reject_bad_attempts_to_increment_flow_control(self, increment): 449 """ 450 Attempting to increment a flow control increment outside the valid 451 range causes a ValueError to be raised. 452 """ 453 c = h2.connection.H2Connection() 454 c.initiate_connection() 455 c.send_headers(1, self.example_request_headers, end_stream=True) 456 c.clear_outbound_data_buffer() 457 458 # Fails both on and off streams. 459 with pytest.raises(ValueError): 460 c.increment_flow_control_window(increment=increment, stream_id=1) 461 462 with pytest.raises(ValueError): 463 c.increment_flow_control_window(increment=increment) 464 465 @pytest.mark.parametrize('stream_id', [0, 1]) 466 def test_reject_bad_remote_increments(self, frame_factory, stream_id): 467 """ 468 Remote peers attempting to increment flow control outside the valid 469 range cause connection errors of type PROTOCOL_ERROR. 470 """ 471 # The only number that can be encoded in a WINDOW_UPDATE frame but 472 # isn't valid is 0. 473 c = h2.connection.H2Connection() 474 c.initiate_connection() 475 c.send_headers(1, self.example_request_headers, end_stream=True) 476 c.clear_outbound_data_buffer() 477 478 f = frame_factory.build_window_update_frame( 479 stream_id=stream_id, increment=0 480 ) 481 482 with pytest.raises(h2.exceptions.ProtocolError): 483 c.receive_data(f.serialize()) 484 485 expected_frame = frame_factory.build_goaway_frame( 486 last_stream_id=0, 487 error_code=h2.errors.ErrorCodes.PROTOCOL_ERROR, 488 ) 489 assert c.data_to_send() == expected_frame.serialize() 490 491 def test_reject_increasing_connection_window_too_far(self, frame_factory): 492 """ 493 Attempts by the remote peer to increase the connection flow control 494 window beyond 2**31 - 1 are rejected. 495 """ 496 c = h2.connection.H2Connection() 497 c.initiate_connection() 498 c.clear_outbound_data_buffer() 499 500 increment = 2**31 - c.outbound_flow_control_window 501 502 f = frame_factory.build_window_update_frame( 503 stream_id=0, increment=increment 504 ) 505 506 with pytest.raises(h2.exceptions.FlowControlError): 507 c.receive_data(f.serialize()) 508 509 expected_frame = frame_factory.build_goaway_frame( 510 last_stream_id=0, 511 error_code=h2.errors.ErrorCodes.FLOW_CONTROL_ERROR, 512 ) 513 assert c.data_to_send() == expected_frame.serialize() 514 515 def test_reject_increasing_stream_window_too_far(self, frame_factory): 516 """ 517 Attempts by the remote peer to increase the stream flow control window 518 beyond 2**31 - 1 are rejected. 519 """ 520 c = h2.connection.H2Connection() 521 c.initiate_connection() 522 c.send_headers(1, self.example_request_headers) 523 c.clear_outbound_data_buffer() 524 525 increment = 2**31 - c.outbound_flow_control_window 526 527 f = frame_factory.build_window_update_frame( 528 stream_id=1, increment=increment 529 ) 530 531 events = c.receive_data(f.serialize()) 532 assert len(events) == 1 533 534 event = events[0] 535 assert isinstance(event, h2.events.StreamReset) 536 assert event.stream_id == 1 537 assert event.error_code == h2.errors.ErrorCodes.FLOW_CONTROL_ERROR 538 assert not event.remote_reset 539 540 expected_frame = frame_factory.build_rst_stream_frame( 541 stream_id=1, 542 error_code=h2.errors.ErrorCodes.FLOW_CONTROL_ERROR, 543 ) 544 assert c.data_to_send() == expected_frame.serialize() 545 546 def test_reject_overlarge_conn_window_settings(self, frame_factory): 547 """ 548 SETTINGS frames cannot change the size of the connection flow control 549 window. 550 """ 551 c = h2.connection.H2Connection() 552 c.initiate_connection() 553 554 # Go one byte smaller than the limit. 555 increment = 2**31 - 1 - c.outbound_flow_control_window 556 557 f = frame_factory.build_window_update_frame( 558 stream_id=0, increment=increment 559 ) 560 c.receive_data(f.serialize()) 561 562 # Receive an increment to the initial window size. 563 f = frame_factory.build_settings_frame( 564 settings={ 565 h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 566 self.DEFAULT_FLOW_WINDOW + 1 567 } 568 ) 569 c.clear_outbound_data_buffer() 570 571 # No error is encountered. 572 events = c.receive_data(f.serialize()) 573 assert len(events) == 1 574 assert isinstance(events[0], h2.events.RemoteSettingsChanged) 575 576 expected_frame = frame_factory.build_settings_frame( 577 settings={}, 578 ack=True 579 ) 580 assert c.data_to_send() == expected_frame.serialize() 581 582 def test_reject_overlarge_stream_window_settings(self, frame_factory): 583 """ 584 Remote attempts to create overlarge stream windows via SETTINGS frames 585 are rejected. 586 """ 587 c = h2.connection.H2Connection() 588 c.initiate_connection() 589 c.send_headers(1, self.example_request_headers) 590 591 # Go one byte smaller than the limit. 592 increment = 2**31 - 1 - c.outbound_flow_control_window 593 594 f = frame_factory.build_window_update_frame( 595 stream_id=1, increment=increment 596 ) 597 c.receive_data(f.serialize()) 598 599 # Receive an increment to the initial window size. 600 f = frame_factory.build_settings_frame( 601 settings={ 602 h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 603 self.DEFAULT_FLOW_WINDOW + 1 604 } 605 ) 606 c.clear_outbound_data_buffer() 607 with pytest.raises(h2.exceptions.FlowControlError): 608 c.receive_data(f.serialize()) 609 610 expected_frame = frame_factory.build_goaway_frame( 611 last_stream_id=0, 612 error_code=h2.errors.ErrorCodes.FLOW_CONTROL_ERROR, 613 ) 614 assert c.data_to_send() == expected_frame.serialize() 615 616 def test_reject_local_overlarge_increase_connection_window(self): 617 """ 618 Local attempts to increase the connection window too far are rejected. 619 """ 620 c = h2.connection.H2Connection() 621 c.initiate_connection() 622 623 increment = 2**31 - c.inbound_flow_control_window 624 625 with pytest.raises(h2.exceptions.FlowControlError): 626 c.increment_flow_control_window(increment=increment) 627 628 def test_reject_local_overlarge_increase_stream_window(self): 629 """ 630 Local attempts to increase the connection window too far are rejected. 631 """ 632 c = h2.connection.H2Connection() 633 c.initiate_connection() 634 c.send_headers(1, self.example_request_headers) 635 636 increment = 2**31 - c.inbound_flow_control_window 637 638 with pytest.raises(h2.exceptions.FlowControlError): 639 c.increment_flow_control_window(increment=increment, stream_id=1) 640 641 def test_send_update_on_closed_streams(self, frame_factory): 642 c = h2.connection.H2Connection() 643 c.initiate_connection() 644 c.send_headers(1, self.example_request_headers) 645 c.reset_stream(1) 646 647 c.clear_outbound_data_buffer() 648 c.open_outbound_streams 649 c.open_inbound_streams 650 651 f = frame_factory.build_data_frame(b'some data'*1500) 652 events = c.receive_data(f.serialize()*3) 653 assert not events 654 655 expected = frame_factory.build_rst_stream_frame( 656 stream_id=1, 657 error_code=h2.errors.ErrorCodes.STREAM_CLOSED, 658 ).serialize() * 2 + frame_factory.build_window_update_frame( 659 stream_id=0, 660 increment=40500, 661 ).serialize() + frame_factory.build_rst_stream_frame( 662 stream_id=1, 663 error_code=h2.errors.ErrorCodes.STREAM_CLOSED, 664 ).serialize() 665 assert c.data_to_send() == expected 666 667 f = frame_factory.build_data_frame(b'') 668 events = c.receive_data(f.serialize()) 669 assert not events 670 671 expected = frame_factory.build_rst_stream_frame( 672 stream_id=1, 673 error_code=h2.errors.ErrorCodes.STREAM_CLOSED, 674 ).serialize() 675 assert c.data_to_send() == expected 676 677 678class TestAutomaticFlowControl(object): 679 """ 680 Tests for the automatic flow control logic. 681 """ 682 example_request_headers = [ 683 (':authority', 'example.com'), 684 (':path', '/'), 685 (':scheme', 'https'), 686 (':method', 'GET'), 687 ] 688 server_config = h2.config.H2Configuration(client_side=False) 689 690 DEFAULT_FLOW_WINDOW = 65535 691 692 def _setup_connection_and_send_headers(self, frame_factory): 693 """ 694 Setup a server-side H2Connection and send a headers frame, and then 695 clear the outbound data buffer. Also increase the maximum frame size. 696 """ 697 c = h2.connection.H2Connection(config=self.server_config) 698 c.initiate_connection() 699 c.receive_data(frame_factory.preamble()) 700 701 c.update_settings( 702 {h2.settings.SettingCodes.MAX_FRAME_SIZE: self.DEFAULT_FLOW_WINDOW} 703 ) 704 settings_frame = frame_factory.build_settings_frame( 705 settings={}, ack=True 706 ) 707 c.receive_data(settings_frame.serialize()) 708 c.clear_outbound_data_buffer() 709 710 headers_frame = frame_factory.build_headers_frame( 711 headers=self.example_request_headers 712 ) 713 c.receive_data(headers_frame.serialize()) 714 c.clear_outbound_data_buffer() 715 return c 716 717 @given(stream_id=integers(max_value=0)) 718 def test_must_acknowledge_for_stream(self, frame_factory, stream_id): 719 """ 720 Flow control acknowledgements must be done on a stream ID that is 721 greater than zero. 722 """ 723 # We need to refresh the encoder because hypothesis has a problem with 724 # integrating with py.test, meaning that we use the same frame factory 725 # for all tests. 726 # See https://github.com/HypothesisWorks/hypothesis-python/issues/377 727 frame_factory.refresh_encoder() 728 729 # Create a connection in a state that might actually accept 730 # data acknolwedgement. 731 c = self._setup_connection_and_send_headers(frame_factory) 732 data_frame = frame_factory.build_data_frame( 733 b'some data', flags=['END_STREAM'] 734 ) 735 c.receive_data(data_frame.serialize()) 736 737 with pytest.raises(ValueError): 738 c.acknowledge_received_data( 739 acknowledged_size=5, stream_id=stream_id 740 ) 741 742 @given(size=integers(max_value=-1)) 743 def test_cannot_acknowledge_less_than_zero(self, frame_factory, size): 744 """ 745 The user must acknowledge at least 0 bytes. 746 """ 747 # We need to refresh the encoder because hypothesis has a problem with 748 # integrating with py.test, meaning that we use the same frame factory 749 # for all tests. 750 # See https://github.com/HypothesisWorks/hypothesis-python/issues/377 751 frame_factory.refresh_encoder() 752 753 # Create a connection in a state that might actually accept 754 # data acknolwedgement. 755 c = self._setup_connection_and_send_headers(frame_factory) 756 data_frame = frame_factory.build_data_frame( 757 b'some data', flags=['END_STREAM'] 758 ) 759 c.receive_data(data_frame.serialize()) 760 761 with pytest.raises(ValueError): 762 c.acknowledge_received_data(acknowledged_size=size, stream_id=1) 763 764 def test_acknowledging_small_chunks_does_nothing(self, frame_factory): 765 """ 766 When a small amount of data is received and acknowledged, no window 767 update is emitted. 768 """ 769 c = self._setup_connection_and_send_headers(frame_factory) 770 771 data_frame = frame_factory.build_data_frame( 772 b'some data', flags=['END_STREAM'] 773 ) 774 data_event = c.receive_data(data_frame.serialize())[0] 775 776 c.acknowledge_received_data( 777 data_event.flow_controlled_length, stream_id=1 778 ) 779 780 assert not c.data_to_send() 781 782 def test_acknowledging_no_data_does_nothing(self, frame_factory): 783 """ 784 If a user accidentally acknowledges no data, nothing happens. 785 """ 786 c = self._setup_connection_and_send_headers(frame_factory) 787 788 # Send an empty data frame, just to give the user impetus to ack the 789 # data. 790 data_frame = frame_factory.build_data_frame(b'') 791 c.receive_data(data_frame.serialize()) 792 793 c.acknowledge_received_data(0, stream_id=1) 794 assert not c.data_to_send() 795 796 @pytest.mark.parametrize('force_cleanup', (True, False)) 797 def test_acknowledging_data_on_closed_stream(self, 798 frame_factory, 799 force_cleanup): 800 """ 801 When acknowledging data on a stream that has just been closed, no 802 acknowledgement is given for that stream, only for the connection. 803 """ 804 c = self._setup_connection_and_send_headers(frame_factory) 805 806 data_to_send = b'\x00' * self.DEFAULT_FLOW_WINDOW 807 data_frame = frame_factory.build_data_frame(data_to_send) 808 c.receive_data(data_frame.serialize()) 809 810 rst_frame = frame_factory.build_rst_stream_frame( 811 stream_id=1 812 ) 813 c.receive_data(rst_frame.serialize()) 814 c.clear_outbound_data_buffer() 815 816 if force_cleanup: 817 # Check how many streams are open to force the old one to be 818 # cleaned up. 819 assert c.open_outbound_streams == 0 820 821 c.acknowledge_received_data(2048, stream_id=1) 822 823 expected = frame_factory.build_window_update_frame( 824 stream_id=0, increment=2048 825 ) 826 assert c.data_to_send() == expected.serialize() 827 828 def test_acknowledging_streams_we_never_saw(self, frame_factory): 829 """ 830 If the user acknowledges a stream ID we've never seen, that raises a 831 NoSuchStreamError. 832 """ 833 c = self._setup_connection_and_send_headers(frame_factory) 834 c.clear_outbound_data_buffer() 835 836 with pytest.raises(h2.exceptions.NoSuchStreamError): 837 c.acknowledge_received_data(2048, stream_id=101) 838 839 @given(integers(min_value=1025, max_value=DEFAULT_FLOW_WINDOW)) 840 def test_acknowledging_1024_bytes_when_empty_increments(self, 841 frame_factory, 842 increment): 843 """ 844 If the flow control window is empty and we acknowledge 1024 bytes or 845 more, we will emit a WINDOW_UPDATE frame just to move the connection 846 forward. 847 """ 848 # We need to refresh the encoder because hypothesis has a problem with 849 # integrating with py.test, meaning that we use the same frame factory 850 # for all tests. 851 # See https://github.com/HypothesisWorks/hypothesis-python/issues/377 852 frame_factory.refresh_encoder() 853 854 c = self._setup_connection_and_send_headers(frame_factory) 855 856 data_to_send = b'\x00' * self.DEFAULT_FLOW_WINDOW 857 data_frame = frame_factory.build_data_frame(data_to_send) 858 c.receive_data(data_frame.serialize()) 859 860 c.acknowledge_received_data(increment, stream_id=1) 861 862 first_expected = frame_factory.build_window_update_frame( 863 stream_id=0, increment=increment 864 ) 865 second_expected = frame_factory.build_window_update_frame( 866 stream_id=1, increment=increment 867 ) 868 expected_data = b''.join( 869 [first_expected.serialize(), second_expected.serialize()] 870 ) 871 assert c.data_to_send() == expected_data 872 873 # This test needs to use a lower cap, because otherwise the algo will 874 # increment the stream window anyway. 875 @given(integers(min_value=1025, max_value=(DEFAULT_FLOW_WINDOW // 4) - 1)) 876 def test_connection_only_empty(self, frame_factory, increment): 877 """ 878 If the connection flow control window is empty, but the stream flow 879 control windows aren't, and 1024 bytes or more are acknowledged by the 880 user, we increment the connection window only. 881 """ 882 # We need to refresh the encoder because hypothesis has a problem with 883 # integrating with py.test, meaning that we use the same frame factory 884 # for all tests. 885 # See https://github.com/HypothesisWorks/hypothesis-python/issues/377 886 frame_factory.refresh_encoder() 887 888 # Here we'll use 4 streams. Set them up. 889 c = self._setup_connection_and_send_headers(frame_factory) 890 891 for stream_id in [3, 5, 7]: 892 f = frame_factory.build_headers_frame( 893 headers=self.example_request_headers, stream_id=stream_id 894 ) 895 c.receive_data(f.serialize()) 896 897 # Now we send 1/4 of the connection window per stream. Annoyingly, 898 # that's an odd number, so we need to round the last frame up. 899 data_to_send = b'\x00' * (self.DEFAULT_FLOW_WINDOW // 4) 900 for stream_id in [1, 3, 5]: 901 f = frame_factory.build_data_frame( 902 data_to_send, stream_id=stream_id 903 ) 904 c.receive_data(f.serialize()) 905 906 data_to_send = b'\x00' * c.remote_flow_control_window(7) 907 data_frame = frame_factory.build_data_frame(data_to_send, stream_id=7) 908 c.receive_data(data_frame.serialize()) 909 910 # Ok, now the actual test. 911 c.acknowledge_received_data(increment, stream_id=1) 912 913 expected_data = frame_factory.build_window_update_frame( 914 stream_id=0, increment=increment 915 ).serialize() 916 assert c.data_to_send() == expected_data 917 918 @given(integers(min_value=1025, max_value=DEFAULT_FLOW_WINDOW)) 919 def test_mixing_update_forms(self, frame_factory, increment): 920 """ 921 If the user mixes ackowledging data with manually incrementing windows, 922 we still keep track of what's going on. 923 """ 924 # We need to refresh the encoder because hypothesis has a problem with 925 # integrating with py.test, meaning that we use the same frame factory 926 # for all tests. 927 # See https://github.com/HypothesisWorks/hypothesis-python/issues/377 928 frame_factory.refresh_encoder() 929 930 # Empty the flow control window. 931 c = self._setup_connection_and_send_headers(frame_factory) 932 data_to_send = b'\x00' * self.DEFAULT_FLOW_WINDOW 933 data_frame = frame_factory.build_data_frame(data_to_send) 934 c.receive_data(data_frame.serialize()) 935 936 # Manually increment the connection flow control window back to fully 937 # open, but leave the stream window closed. 938 c.increment_flow_control_window( 939 stream_id=None, increment=self.DEFAULT_FLOW_WINDOW 940 ) 941 c.clear_outbound_data_buffer() 942 943 # Now, acknowledge the receipt of that data. This should cause the 944 # stream window to be widened, but not the connection window, because 945 # it is already open. 946 c.acknowledge_received_data(increment, stream_id=1) 947 948 # We expect to see one window update frame only, for the stream. 949 expected_data = frame_factory.build_window_update_frame( 950 stream_id=1, increment=increment 951 ).serialize() 952 assert c.data_to_send() == expected_data 953