1# -*- coding: utf-8 -*-
2"""
3test_flow_control
4~~~~~~~~~~~~~~~~~
5
6Tests of the flow control management in h2
7"""
8import pytest
9
10from hypothesis import given
11from hypothesis.strategies import integers
12
13import h2.config
14import h2.connection
15import h2.errors
16import h2.events
17import h2.exceptions
18import h2.settings
19
20
21class TestFlowControl(object):
22    """
23    Tests of the flow control management in the connection objects.
24    """
25    example_request_headers = [
26        (':authority', 'example.com'),
27        (':path', '/'),
28        (':scheme', 'https'),
29        (':method', 'GET'),
30    ]
31    server_config = h2.config.H2Configuration(client_side=False)
32
33    DEFAULT_FLOW_WINDOW = 65535
34
35    def test_flow_control_initializes_properly(self):
36        """
37        The flow control window for a stream should initially be the default
38        flow control value.
39        """
40        c = h2.connection.H2Connection()
41        c.send_headers(1, self.example_request_headers)
42
43        assert c.local_flow_control_window(1) == self.DEFAULT_FLOW_WINDOW
44        assert c.remote_flow_control_window(1) == self.DEFAULT_FLOW_WINDOW
45
46    def test_flow_control_decreases_with_sent_data(self):
47        """
48        When data is sent on a stream, the flow control window should drop.
49        """
50        c = h2.connection.H2Connection()
51        c.send_headers(1, self.example_request_headers)
52        c.send_data(1, b'some data')
53
54        remaining_length = self.DEFAULT_FLOW_WINDOW - len(b'some data')
55        assert (c.local_flow_control_window(1) == remaining_length)
56
57    @pytest.mark.parametrize("pad_length", [5, 0])
58    def test_flow_control_decreases_with_sent_data_with_padding(self,
59                                                                pad_length):
60        """
61        When padded data is sent on a stream, the flow control window drops
62        by the length of the padding plus 1 for the 1-byte padding length
63        field.
64        """
65        c = h2.connection.H2Connection()
66        c.send_headers(1, self.example_request_headers)
67
68        c.send_data(1, b'some data', pad_length=pad_length)
69        remaining_length = (
70            self.DEFAULT_FLOW_WINDOW - len(b'some data') - pad_length - 1
71        )
72        assert c.local_flow_control_window(1) == remaining_length
73
74    def test_flow_control_decreases_with_received_data(self, frame_factory):
75        """
76        When data is received on a stream, the remote flow control window
77        should drop.
78        """
79        c = h2.connection.H2Connection(config=self.server_config)
80        c.receive_data(frame_factory.preamble())
81        f1 = frame_factory.build_headers_frame(self.example_request_headers)
82        f2 = frame_factory.build_data_frame(b'some data')
83
84        c.receive_data(f1.serialize() + f2.serialize())
85
86        remaining_length = self.DEFAULT_FLOW_WINDOW - len(b'some data')
87        assert (c.remote_flow_control_window(1) == remaining_length)
88
89    def test_flow_control_decreases_with_padded_data(self, frame_factory):
90        """
91        When padded data is received on a stream, the remote flow control
92        window drops by an amount that includes the padding.
93        """
94        c = h2.connection.H2Connection(config=self.server_config)
95        c.receive_data(frame_factory.preamble())
96        f1 = frame_factory.build_headers_frame(self.example_request_headers)
97        f2 = frame_factory.build_data_frame(b'some data', padding_len=10)
98
99        c.receive_data(f1.serialize() + f2.serialize())
100
101        remaining_length = (
102            self.DEFAULT_FLOW_WINDOW - len(b'some data') - 10 - 1
103        )
104        assert (c.remote_flow_control_window(1) == remaining_length)
105
106    def test_flow_control_is_limited_by_connection(self):
107        """
108        The flow control window is limited by the flow control of the
109        connection.
110        """
111        c = h2.connection.H2Connection()
112        c.send_headers(1, self.example_request_headers)
113        c.send_data(1, b'some data')
114        c.send_headers(3, self.example_request_headers)
115
116        remaining_length = self.DEFAULT_FLOW_WINDOW - len(b'some data')
117        assert (c.local_flow_control_window(3) == remaining_length)
118
119    def test_remote_flow_control_is_limited_by_connection(self, frame_factory):
120        """
121        The remote flow control window is limited by the flow control of the
122        connection.
123        """
124        c = h2.connection.H2Connection(config=self.server_config)
125        c.receive_data(frame_factory.preamble())
126        f1 = frame_factory.build_headers_frame(self.example_request_headers)
127        f2 = frame_factory.build_data_frame(b'some data')
128        f3 = frame_factory.build_headers_frame(
129            self.example_request_headers,
130            stream_id=3,
131        )
132        c.receive_data(f1.serialize() + f2.serialize() + f3.serialize())
133
134        remaining_length = self.DEFAULT_FLOW_WINDOW - len(b'some data')
135        assert (c.remote_flow_control_window(3) == remaining_length)
136
137    def test_cannot_send_more_data_than_window(self):
138        """
139        Sending more data than the remaining flow control window raises a
140        FlowControlError.
141        """
142        c = h2.connection.H2Connection()
143        c.send_headers(1, self.example_request_headers)
144        c.outbound_flow_control_window = 5
145
146        with pytest.raises(h2.exceptions.FlowControlError):
147            c.send_data(1, b'some data')
148
149    def test_increasing_connection_window_allows_sending(self, frame_factory):
150        """
151        Confirm that sending a WindowUpdate frame on the connection frees
152        up space for further frames.
153        """
154        c = h2.connection.H2Connection()
155        c.send_headers(1, self.example_request_headers)
156        c.outbound_flow_control_window = 5
157
158        with pytest.raises(h2.exceptions.FlowControlError):
159            c.send_data(1, b'some data')
160
161        f = frame_factory.build_window_update_frame(
162            stream_id=0,
163            increment=5,
164        )
165        c.receive_data(f.serialize())
166
167        c.clear_outbound_data_buffer()
168        c.send_data(1, b'some data')
169        assert c.data_to_send()
170
171    def test_increasing_stream_window_allows_sending(self, frame_factory):
172        """
173        Confirm that sending a WindowUpdate frame on the connection frees
174        up space for further frames.
175        """
176        c = h2.connection.H2Connection()
177        c.send_headers(1, self.example_request_headers)
178        c._get_stream_by_id(1).outbound_flow_control_window = 5
179
180        with pytest.raises(h2.exceptions.FlowControlError):
181            c.send_data(1, b'some data')
182
183        f = frame_factory.build_window_update_frame(
184            stream_id=1,
185            increment=5,
186        )
187        c.receive_data(f.serialize())
188
189        c.clear_outbound_data_buffer()
190        c.send_data(1, b'some data')
191        assert c.data_to_send()
192
193    def test_flow_control_shrinks_in_response_to_settings(self, frame_factory):
194        """
195        Acknowledging SETTINGS_INITIAL_WINDOW_SIZE shrinks the flow control
196        window.
197        """
198        c = h2.connection.H2Connection()
199        c.send_headers(1, self.example_request_headers)
200
201        assert c.local_flow_control_window(1) == 65535
202
203        f = frame_factory.build_settings_frame(
204            settings={h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 1280}
205        )
206        c.receive_data(f.serialize())
207
208        assert c.local_flow_control_window(1) == 1280
209
210    def test_flow_control_grows_in_response_to_settings(self, frame_factory):
211        """
212        Acknowledging SETTINGS_INITIAL_WINDOW_SIZE grows the flow control
213        window.
214        """
215        c = h2.connection.H2Connection()
216        c.send_headers(1, self.example_request_headers)
217
218        # Greatly increase the connection flow control window.
219        f = frame_factory.build_window_update_frame(
220            stream_id=0, increment=128000
221        )
222        c.receive_data(f.serialize())
223
224        # The stream flow control window is the bottleneck here.
225        assert c.local_flow_control_window(1) == 65535
226
227        f = frame_factory.build_settings_frame(
228            settings={h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 128000}
229        )
230        c.receive_data(f.serialize())
231
232        # The stream window is still the bottleneck, but larger now.
233        assert c.local_flow_control_window(1) == 128000
234
235    def test_flow_control_settings_blocked_by_conn_window(self, frame_factory):
236        """
237        Changing SETTINGS_INITIAL_WINDOW_SIZE does not affect the effective
238        flow control window if the connection window isn't changed.
239        """
240        c = h2.connection.H2Connection()
241        c.send_headers(1, self.example_request_headers)
242
243        assert c.local_flow_control_window(1) == 65535
244
245        f = frame_factory.build_settings_frame(
246            settings={h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 128000}
247        )
248        c.receive_data(f.serialize())
249
250        assert c.local_flow_control_window(1) == 65535
251
252    def test_new_streams_have_flow_control_per_settings(self, frame_factory):
253        """
254        After a SETTINGS_INITIAL_WINDOW_SIZE change is received, new streams
255        have appropriate new flow control windows.
256        """
257        c = h2.connection.H2Connection()
258
259        f = frame_factory.build_settings_frame(
260            settings={h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 128000}
261        )
262        c.receive_data(f.serialize())
263
264        # Greatly increase the connection flow control window.
265        f = frame_factory.build_window_update_frame(
266            stream_id=0, increment=128000
267        )
268        c.receive_data(f.serialize())
269
270        c.send_headers(1, self.example_request_headers)
271        assert c.local_flow_control_window(1) == 128000
272
273    def test_window_update_no_stream(self, frame_factory):
274        """
275        WindowUpdate frames received without streams fire an appropriate
276        WindowUpdated event.
277        """
278        c = h2.connection.H2Connection(config=self.server_config)
279        c.receive_data(frame_factory.preamble())
280
281        f = frame_factory.build_window_update_frame(
282            stream_id=0,
283            increment=5
284        )
285        events = c.receive_data(f.serialize())
286
287        assert len(events) == 1
288        event = events[0]
289
290        assert isinstance(event, h2.events.WindowUpdated)
291        assert event.stream_id == 0
292        assert event.delta == 5
293
294    def test_window_update_with_stream(self, frame_factory):
295        """
296        WindowUpdate frames received with streams fire an appropriate
297        WindowUpdated event.
298        """
299        c = h2.connection.H2Connection(config=self.server_config)
300        c.receive_data(frame_factory.preamble())
301
302        f1 = frame_factory.build_headers_frame(self.example_request_headers)
303        f2 = frame_factory.build_window_update_frame(
304            stream_id=1,
305            increment=66
306        )
307        data = b''.join(map(lambda f: f.serialize(), [f1, f2]))
308        events = c.receive_data(data)
309
310        assert len(events) == 2
311        event = events[1]
312
313        assert isinstance(event, h2.events.WindowUpdated)
314        assert event.stream_id == 1
315        assert event.delta == 66
316
317    def test_we_can_increment_stream_flow_control(self, frame_factory):
318        """
319        It is possible for the user to increase the flow control window for
320        streams.
321        """
322        c = h2.connection.H2Connection()
323        c.initiate_connection()
324        c.send_headers(1, self.example_request_headers, end_stream=True)
325        c.clear_outbound_data_buffer()
326
327        expected_frame = frame_factory.build_window_update_frame(
328            stream_id=1,
329            increment=5
330        )
331
332        events = c.increment_flow_control_window(increment=5, stream_id=1)
333        assert not events
334        assert c.data_to_send() == expected_frame.serialize()
335
336    def test_we_can_increment_connection_flow_control(self, frame_factory):
337        """
338        It is possible for the user to increase the flow control window for
339        the entire connection.
340        """
341        c = h2.connection.H2Connection()
342        c.initiate_connection()
343        c.send_headers(1, self.example_request_headers, end_stream=True)
344        c.clear_outbound_data_buffer()
345
346        expected_frame = frame_factory.build_window_update_frame(
347            stream_id=0,
348            increment=5
349        )
350
351        events = c.increment_flow_control_window(increment=5)
352        assert not events
353        assert c.data_to_send() == expected_frame.serialize()
354
355    def test_we_enforce_our_flow_control_window(self, frame_factory):
356        """
357        The user can set a low flow control window, which leads to connection
358        teardown if violated.
359        """
360        c = h2.connection.H2Connection(config=self.server_config)
361        c.receive_data(frame_factory.preamble())
362
363        # Change the flow control window to 80 bytes.
364        c.update_settings(
365            {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 80}
366        )
367        f = frame_factory.build_settings_frame({}, ack=True)
368        c.receive_data(f.serialize())
369
370        # Receive a new stream.
371        f = frame_factory.build_headers_frame(self.example_request_headers)
372        c.receive_data(f.serialize())
373
374        # Attempt to violate the flow control window.
375        c.clear_outbound_data_buffer()
376        f = frame_factory.build_data_frame(b'\x01' * 100)
377
378        with pytest.raises(h2.exceptions.FlowControlError):
379            c.receive_data(f.serialize())
380
381        # Verify we tear down appropriately.
382        expected_frame = frame_factory.build_goaway_frame(
383            last_stream_id=1,
384            error_code=h2.errors.ErrorCodes.FLOW_CONTROL_ERROR,
385        )
386        assert c.data_to_send() == expected_frame.serialize()
387
388    def test_shrink_remote_flow_control_settings(self, frame_factory):
389        """
390        The remote peer acknowledging our SETTINGS_INITIAL_WINDOW_SIZE shrinks
391        the flow control window.
392        """
393        c = h2.connection.H2Connection()
394        c.send_headers(1, self.example_request_headers)
395
396        assert c.remote_flow_control_window(1) == 65535
397
398        c.update_settings({h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 1280})
399
400        f = frame_factory.build_settings_frame({}, ack=True)
401        c.receive_data(f.serialize())
402
403        assert c.remote_flow_control_window(1) == 1280
404
405    def test_grow_remote_flow_control_settings(self, frame_factory):
406        """
407        The remote peer acknowledging our SETTINGS_INITIAL_WINDOW_SIZE grows
408        the flow control window.
409        """
410        c = h2.connection.H2Connection()
411        c.send_headers(1, self.example_request_headers)
412
413        # Increase the connection flow control window greatly.
414        c.increment_flow_control_window(increment=128000)
415
416        assert c.remote_flow_control_window(1) == 65535
417
418        c.update_settings(
419            {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 128000}
420        )
421        f = frame_factory.build_settings_frame({}, ack=True)
422        c.receive_data(f.serialize())
423
424        assert c.remote_flow_control_window(1) == 128000
425
426    def test_new_streams_have_remote_flow_control(self, frame_factory):
427        """
428        After a SETTINGS_INITIAL_WINDOW_SIZE change is acknowledged by the
429        remote peer, new streams have appropriate new flow control windows.
430        """
431        c = h2.connection.H2Connection()
432
433        c.update_settings(
434            {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 128000}
435        )
436        f = frame_factory.build_settings_frame({}, ack=True)
437        c.receive_data(f.serialize())
438
439        # Increase the connection flow control window greatly.
440        c.increment_flow_control_window(increment=128000)
441
442        c.send_headers(1, self.example_request_headers)
443        assert c.remote_flow_control_window(1) == 128000
444
445    @pytest.mark.parametrize(
446        'increment', [0, -15, 2**31]
447    )
448    def test_reject_bad_attempts_to_increment_flow_control(self, increment):
449        """
450        Attempting to increment a flow control increment outside the valid
451        range causes a ValueError to be raised.
452        """
453        c = h2.connection.H2Connection()
454        c.initiate_connection()
455        c.send_headers(1, self.example_request_headers, end_stream=True)
456        c.clear_outbound_data_buffer()
457
458        # Fails both on and off streams.
459        with pytest.raises(ValueError):
460            c.increment_flow_control_window(increment=increment, stream_id=1)
461
462        with pytest.raises(ValueError):
463            c.increment_flow_control_window(increment=increment)
464
465    @pytest.mark.parametrize('stream_id', [0, 1])
466    def test_reject_bad_remote_increments(self, frame_factory, stream_id):
467        """
468        Remote peers attempting to increment flow control outside the valid
469        range cause connection errors of type PROTOCOL_ERROR.
470        """
471        # The only number that can be encoded in a WINDOW_UPDATE frame but
472        # isn't valid is 0.
473        c = h2.connection.H2Connection()
474        c.initiate_connection()
475        c.send_headers(1, self.example_request_headers, end_stream=True)
476        c.clear_outbound_data_buffer()
477
478        f = frame_factory.build_window_update_frame(
479            stream_id=stream_id, increment=0
480        )
481
482        with pytest.raises(h2.exceptions.ProtocolError):
483            c.receive_data(f.serialize())
484
485        expected_frame = frame_factory.build_goaway_frame(
486            last_stream_id=0,
487            error_code=h2.errors.ErrorCodes.PROTOCOL_ERROR,
488        )
489        assert c.data_to_send() == expected_frame.serialize()
490
491    def test_reject_increasing_connection_window_too_far(self, frame_factory):
492        """
493        Attempts by the remote peer to increase the connection flow control
494        window beyond 2**31 - 1 are rejected.
495        """
496        c = h2.connection.H2Connection()
497        c.initiate_connection()
498        c.clear_outbound_data_buffer()
499
500        increment = 2**31 - c.outbound_flow_control_window
501
502        f = frame_factory.build_window_update_frame(
503            stream_id=0, increment=increment
504        )
505
506        with pytest.raises(h2.exceptions.FlowControlError):
507            c.receive_data(f.serialize())
508
509        expected_frame = frame_factory.build_goaway_frame(
510            last_stream_id=0,
511            error_code=h2.errors.ErrorCodes.FLOW_CONTROL_ERROR,
512        )
513        assert c.data_to_send() == expected_frame.serialize()
514
515    def test_reject_increasing_stream_window_too_far(self, frame_factory):
516        """
517        Attempts by the remote peer to increase the stream flow control window
518        beyond 2**31 - 1 are rejected.
519        """
520        c = h2.connection.H2Connection()
521        c.initiate_connection()
522        c.send_headers(1, self.example_request_headers)
523        c.clear_outbound_data_buffer()
524
525        increment = 2**31 - c.outbound_flow_control_window
526
527        f = frame_factory.build_window_update_frame(
528            stream_id=1, increment=increment
529        )
530
531        events = c.receive_data(f.serialize())
532        assert len(events) == 1
533
534        event = events[0]
535        assert isinstance(event, h2.events.StreamReset)
536        assert event.stream_id == 1
537        assert event.error_code == h2.errors.ErrorCodes.FLOW_CONTROL_ERROR
538        assert not event.remote_reset
539
540        expected_frame = frame_factory.build_rst_stream_frame(
541            stream_id=1,
542            error_code=h2.errors.ErrorCodes.FLOW_CONTROL_ERROR,
543        )
544        assert c.data_to_send() == expected_frame.serialize()
545
546    def test_reject_overlarge_conn_window_settings(self, frame_factory):
547        """
548        SETTINGS frames cannot change the size of the connection flow control
549        window.
550        """
551        c = h2.connection.H2Connection()
552        c.initiate_connection()
553
554        # Go one byte smaller than the limit.
555        increment = 2**31 - 1 - c.outbound_flow_control_window
556
557        f = frame_factory.build_window_update_frame(
558            stream_id=0, increment=increment
559        )
560        c.receive_data(f.serialize())
561
562        # Receive an increment to the initial window size.
563        f = frame_factory.build_settings_frame(
564            settings={
565                h2.settings.SettingCodes.INITIAL_WINDOW_SIZE:
566                    self.DEFAULT_FLOW_WINDOW + 1
567            }
568        )
569        c.clear_outbound_data_buffer()
570
571        # No error is encountered.
572        events = c.receive_data(f.serialize())
573        assert len(events) == 1
574        assert isinstance(events[0], h2.events.RemoteSettingsChanged)
575
576        expected_frame = frame_factory.build_settings_frame(
577            settings={},
578            ack=True
579        )
580        assert c.data_to_send() == expected_frame.serialize()
581
582    def test_reject_overlarge_stream_window_settings(self, frame_factory):
583        """
584        Remote attempts to create overlarge stream windows via SETTINGS frames
585        are rejected.
586        """
587        c = h2.connection.H2Connection()
588        c.initiate_connection()
589        c.send_headers(1, self.example_request_headers)
590
591        # Go one byte smaller than the limit.
592        increment = 2**31 - 1 - c.outbound_flow_control_window
593
594        f = frame_factory.build_window_update_frame(
595            stream_id=1, increment=increment
596        )
597        c.receive_data(f.serialize())
598
599        # Receive an increment to the initial window size.
600        f = frame_factory.build_settings_frame(
601            settings={
602                h2.settings.SettingCodes.INITIAL_WINDOW_SIZE:
603                    self.DEFAULT_FLOW_WINDOW + 1
604            }
605        )
606        c.clear_outbound_data_buffer()
607        with pytest.raises(h2.exceptions.FlowControlError):
608            c.receive_data(f.serialize())
609
610        expected_frame = frame_factory.build_goaway_frame(
611            last_stream_id=0,
612            error_code=h2.errors.ErrorCodes.FLOW_CONTROL_ERROR,
613        )
614        assert c.data_to_send() == expected_frame.serialize()
615
616    def test_reject_local_overlarge_increase_connection_window(self):
617        """
618        Local attempts to increase the connection window too far are rejected.
619        """
620        c = h2.connection.H2Connection()
621        c.initiate_connection()
622
623        increment = 2**31 - c.inbound_flow_control_window
624
625        with pytest.raises(h2.exceptions.FlowControlError):
626            c.increment_flow_control_window(increment=increment)
627
628    def test_reject_local_overlarge_increase_stream_window(self):
629        """
630        Local attempts to increase the connection window too far are rejected.
631        """
632        c = h2.connection.H2Connection()
633        c.initiate_connection()
634        c.send_headers(1, self.example_request_headers)
635
636        increment = 2**31 - c.inbound_flow_control_window
637
638        with pytest.raises(h2.exceptions.FlowControlError):
639            c.increment_flow_control_window(increment=increment, stream_id=1)
640
641    def test_send_update_on_closed_streams(self, frame_factory):
642        c = h2.connection.H2Connection()
643        c.initiate_connection()
644        c.send_headers(1, self.example_request_headers)
645        c.reset_stream(1)
646
647        c.clear_outbound_data_buffer()
648        c.open_outbound_streams
649        c.open_inbound_streams
650
651        f = frame_factory.build_data_frame(b'some data'*1500)
652        events = c.receive_data(f.serialize()*3)
653        assert not events
654
655        expected = frame_factory.build_rst_stream_frame(
656            stream_id=1,
657            error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
658        ).serialize() * 2 + frame_factory.build_window_update_frame(
659            stream_id=0,
660            increment=40500,
661        ).serialize() + frame_factory.build_rst_stream_frame(
662            stream_id=1,
663            error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
664        ).serialize()
665        assert c.data_to_send() == expected
666
667        f = frame_factory.build_data_frame(b'')
668        events = c.receive_data(f.serialize())
669        assert not events
670
671        expected = frame_factory.build_rst_stream_frame(
672            stream_id=1,
673            error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
674        ).serialize()
675        assert c.data_to_send() == expected
676
677
678class TestAutomaticFlowControl(object):
679    """
680    Tests for the automatic flow control logic.
681    """
682    example_request_headers = [
683        (':authority', 'example.com'),
684        (':path', '/'),
685        (':scheme', 'https'),
686        (':method', 'GET'),
687    ]
688    server_config = h2.config.H2Configuration(client_side=False)
689
690    DEFAULT_FLOW_WINDOW = 65535
691
692    def _setup_connection_and_send_headers(self, frame_factory):
693        """
694        Setup a server-side H2Connection and send a headers frame, and then
695        clear the outbound data buffer. Also increase the maximum frame size.
696        """
697        c = h2.connection.H2Connection(config=self.server_config)
698        c.initiate_connection()
699        c.receive_data(frame_factory.preamble())
700
701        c.update_settings(
702            {h2.settings.SettingCodes.MAX_FRAME_SIZE: self.DEFAULT_FLOW_WINDOW}
703        )
704        settings_frame = frame_factory.build_settings_frame(
705            settings={}, ack=True
706        )
707        c.receive_data(settings_frame.serialize())
708        c.clear_outbound_data_buffer()
709
710        headers_frame = frame_factory.build_headers_frame(
711            headers=self.example_request_headers
712        )
713        c.receive_data(headers_frame.serialize())
714        c.clear_outbound_data_buffer()
715        return c
716
717    @given(stream_id=integers(max_value=0))
718    def test_must_acknowledge_for_stream(self, frame_factory, stream_id):
719        """
720        Flow control acknowledgements must be done on a stream ID that is
721        greater than zero.
722        """
723        # We need to refresh the encoder because hypothesis has a problem with
724        # integrating with py.test, meaning that we use the same frame factory
725        # for all tests.
726        # See https://github.com/HypothesisWorks/hypothesis-python/issues/377
727        frame_factory.refresh_encoder()
728
729        # Create a connection in a state that might actually accept
730        # data acknolwedgement.
731        c = self._setup_connection_and_send_headers(frame_factory)
732        data_frame = frame_factory.build_data_frame(
733            b'some data', flags=['END_STREAM']
734        )
735        c.receive_data(data_frame.serialize())
736
737        with pytest.raises(ValueError):
738            c.acknowledge_received_data(
739                acknowledged_size=5, stream_id=stream_id
740            )
741
742    @given(size=integers(max_value=-1))
743    def test_cannot_acknowledge_less_than_zero(self, frame_factory, size):
744        """
745        The user must acknowledge at least 0 bytes.
746        """
747        # We need to refresh the encoder because hypothesis has a problem with
748        # integrating with py.test, meaning that we use the same frame factory
749        # for all tests.
750        # See https://github.com/HypothesisWorks/hypothesis-python/issues/377
751        frame_factory.refresh_encoder()
752
753        # Create a connection in a state that might actually accept
754        # data acknolwedgement.
755        c = self._setup_connection_and_send_headers(frame_factory)
756        data_frame = frame_factory.build_data_frame(
757            b'some data', flags=['END_STREAM']
758        )
759        c.receive_data(data_frame.serialize())
760
761        with pytest.raises(ValueError):
762            c.acknowledge_received_data(acknowledged_size=size, stream_id=1)
763
764    def test_acknowledging_small_chunks_does_nothing(self, frame_factory):
765        """
766        When a small amount of data is received and acknowledged, no window
767        update is emitted.
768        """
769        c = self._setup_connection_and_send_headers(frame_factory)
770
771        data_frame = frame_factory.build_data_frame(
772            b'some data', flags=['END_STREAM']
773        )
774        data_event = c.receive_data(data_frame.serialize())[0]
775
776        c.acknowledge_received_data(
777            data_event.flow_controlled_length, stream_id=1
778        )
779
780        assert not c.data_to_send()
781
782    def test_acknowledging_no_data_does_nothing(self, frame_factory):
783        """
784        If a user accidentally acknowledges no data, nothing happens.
785        """
786        c = self._setup_connection_and_send_headers(frame_factory)
787
788        # Send an empty data frame, just to give the user impetus to ack the
789        # data.
790        data_frame = frame_factory.build_data_frame(b'')
791        c.receive_data(data_frame.serialize())
792
793        c.acknowledge_received_data(0, stream_id=1)
794        assert not c.data_to_send()
795
796    @pytest.mark.parametrize('force_cleanup', (True, False))
797    def test_acknowledging_data_on_closed_stream(self,
798                                                 frame_factory,
799                                                 force_cleanup):
800        """
801        When acknowledging data on a stream that has just been closed, no
802        acknowledgement is given for that stream, only for the connection.
803        """
804        c = self._setup_connection_and_send_headers(frame_factory)
805
806        data_to_send = b'\x00' * self.DEFAULT_FLOW_WINDOW
807        data_frame = frame_factory.build_data_frame(data_to_send)
808        c.receive_data(data_frame.serialize())
809
810        rst_frame = frame_factory.build_rst_stream_frame(
811            stream_id=1
812        )
813        c.receive_data(rst_frame.serialize())
814        c.clear_outbound_data_buffer()
815
816        if force_cleanup:
817            # Check how many streams are open to force the old one to be
818            # cleaned up.
819            assert c.open_outbound_streams == 0
820
821        c.acknowledge_received_data(2048, stream_id=1)
822
823        expected = frame_factory.build_window_update_frame(
824            stream_id=0, increment=2048
825        )
826        assert c.data_to_send() == expected.serialize()
827
828    def test_acknowledging_streams_we_never_saw(self, frame_factory):
829        """
830        If the user acknowledges a stream ID we've never seen, that raises a
831        NoSuchStreamError.
832        """
833        c = self._setup_connection_and_send_headers(frame_factory)
834        c.clear_outbound_data_buffer()
835
836        with pytest.raises(h2.exceptions.NoSuchStreamError):
837            c.acknowledge_received_data(2048, stream_id=101)
838
839    @given(integers(min_value=1025, max_value=DEFAULT_FLOW_WINDOW))
840    def test_acknowledging_1024_bytes_when_empty_increments(self,
841                                                            frame_factory,
842                                                            increment):
843        """
844        If the flow control window is empty and we acknowledge 1024 bytes or
845        more, we will emit a WINDOW_UPDATE frame just to move the connection
846        forward.
847        """
848        # We need to refresh the encoder because hypothesis has a problem with
849        # integrating with py.test, meaning that we use the same frame factory
850        # for all tests.
851        # See https://github.com/HypothesisWorks/hypothesis-python/issues/377
852        frame_factory.refresh_encoder()
853
854        c = self._setup_connection_and_send_headers(frame_factory)
855
856        data_to_send = b'\x00' * self.DEFAULT_FLOW_WINDOW
857        data_frame = frame_factory.build_data_frame(data_to_send)
858        c.receive_data(data_frame.serialize())
859
860        c.acknowledge_received_data(increment, stream_id=1)
861
862        first_expected = frame_factory.build_window_update_frame(
863            stream_id=0, increment=increment
864        )
865        second_expected = frame_factory.build_window_update_frame(
866            stream_id=1, increment=increment
867        )
868        expected_data = b''.join(
869            [first_expected.serialize(), second_expected.serialize()]
870        )
871        assert c.data_to_send() == expected_data
872
873    # This test needs to use a lower cap, because otherwise the algo will
874    # increment the stream window anyway.
875    @given(integers(min_value=1025, max_value=(DEFAULT_FLOW_WINDOW // 4) - 1))
876    def test_connection_only_empty(self, frame_factory, increment):
877        """
878        If the connection flow control window is empty, but the stream flow
879        control windows aren't, and 1024 bytes or more are acknowledged by the
880        user, we increment the connection window only.
881        """
882        # We need to refresh the encoder because hypothesis has a problem with
883        # integrating with py.test, meaning that we use the same frame factory
884        # for all tests.
885        # See https://github.com/HypothesisWorks/hypothesis-python/issues/377
886        frame_factory.refresh_encoder()
887
888        # Here we'll use 4 streams. Set them up.
889        c = self._setup_connection_and_send_headers(frame_factory)
890
891        for stream_id in [3, 5, 7]:
892            f = frame_factory.build_headers_frame(
893                headers=self.example_request_headers, stream_id=stream_id
894            )
895            c.receive_data(f.serialize())
896
897        # Now we send 1/4 of the connection window per stream. Annoyingly,
898        # that's an odd number, so we need to round the last frame up.
899        data_to_send = b'\x00' * (self.DEFAULT_FLOW_WINDOW // 4)
900        for stream_id in [1, 3, 5]:
901            f = frame_factory.build_data_frame(
902                data_to_send, stream_id=stream_id
903            )
904            c.receive_data(f.serialize())
905
906        data_to_send = b'\x00' * c.remote_flow_control_window(7)
907        data_frame = frame_factory.build_data_frame(data_to_send, stream_id=7)
908        c.receive_data(data_frame.serialize())
909
910        # Ok, now the actual test.
911        c.acknowledge_received_data(increment, stream_id=1)
912
913        expected_data = frame_factory.build_window_update_frame(
914            stream_id=0, increment=increment
915        ).serialize()
916        assert c.data_to_send() == expected_data
917
918    @given(integers(min_value=1025, max_value=DEFAULT_FLOW_WINDOW))
919    def test_mixing_update_forms(self, frame_factory, increment):
920        """
921        If the user mixes ackowledging data with manually incrementing windows,
922        we still keep track of what's going on.
923        """
924        # We need to refresh the encoder because hypothesis has a problem with
925        # integrating with py.test, meaning that we use the same frame factory
926        # for all tests.
927        # See https://github.com/HypothesisWorks/hypothesis-python/issues/377
928        frame_factory.refresh_encoder()
929
930        # Empty the flow control window.
931        c = self._setup_connection_and_send_headers(frame_factory)
932        data_to_send = b'\x00' * self.DEFAULT_FLOW_WINDOW
933        data_frame = frame_factory.build_data_frame(data_to_send)
934        c.receive_data(data_frame.serialize())
935
936        # Manually increment the connection flow control window back to fully
937        # open, but leave the stream window closed.
938        c.increment_flow_control_window(
939            stream_id=None, increment=self.DEFAULT_FLOW_WINDOW
940        )
941        c.clear_outbound_data_buffer()
942
943        # Now, acknowledge the receipt of that data. This should cause the
944        # stream window to be widened, but not the connection window, because
945        # it is already open.
946        c.acknowledge_received_data(increment, stream_id=1)
947
948        # We expect to see one window update frame only, for the stream.
949        expected_data = frame_factory.build_window_update_frame(
950            stream_id=1, increment=increment
951        ).serialize()
952        assert c.data_to_send() == expected_data
953