1# -*- coding: utf-8 -*-
2"""
3test_closed_streams
4~~~~~~~~~~~~~~~~~~~
5
6Tests that we handle closed streams correctly.
7"""
8import pytest
9
10import h2.config
11import h2.connection
12import h2.errors
13import h2.events
14import h2.exceptions
15
16
17class TestClosedStreams(object):
18    example_request_headers = [
19        (':authority', 'example.com'),
20        (':path', '/'),
21        (':scheme', 'https'),
22        (':method', 'GET'),
23    ]
24    example_response_headers = [
25        (':status', '200'),
26        ('server', 'fake-serv/0.1.0')
27    ]
28    server_config = h2.config.H2Configuration(client_side=False)
29
30    def test_can_receive_multiple_rst_stream_frames(self, frame_factory):
31        """
32        Multiple RST_STREAM frames can be received, either at once or well
33        after one another. Only the first fires an event.
34        """
35        c = h2.connection.H2Connection()
36        c.initiate_connection()
37        c.send_headers(1, self.example_request_headers, end_stream=True)
38
39        f = frame_factory.build_rst_stream_frame(stream_id=1)
40        events = c.receive_data(f.serialize() * 3)
41
42        # Force an iteration over all the streams to remove them.
43        c.open_outbound_streams
44
45        # Receive more data.
46        events += c.receive_data(f.serialize() * 3)
47
48        assert len(events) == 1
49        event = events[0]
50
51        assert isinstance(event, h2.events.StreamReset)
52
53    def test_receiving_low_stream_id_causes_goaway(self, frame_factory):
54        """
55        The remote peer creating a stream with a lower ID than one we've seen
56        causes a GOAWAY frame.
57        """
58        c = h2.connection.H2Connection(config=self.server_config)
59        c.receive_data(frame_factory.preamble())
60        c.initiate_connection()
61
62        f = frame_factory.build_headers_frame(
63            self.example_request_headers,
64            stream_id=3,
65        )
66        c.receive_data(f.serialize())
67        c.clear_outbound_data_buffer()
68
69        f = frame_factory.build_headers_frame(
70            self.example_request_headers,
71            stream_id=1,
72        )
73
74        with pytest.raises(h2.exceptions.StreamIDTooLowError) as e:
75            c.receive_data(f.serialize())
76
77        assert e.value.stream_id == 1
78        assert e.value.max_stream_id == 3
79
80        f = frame_factory.build_goaway_frame(
81            last_stream_id=3,
82            error_code=h2.errors.ErrorCodes.PROTOCOL_ERROR,
83        )
84        assert c.data_to_send() == f.serialize()
85
86    def test_closed_stream_not_present_in_streams_dict(self, frame_factory):
87        """
88        When streams have been closed, they get removed from the streams
89        dictionary the next time we count the open streams.
90        """
91        c = h2.connection.H2Connection(config=self.server_config)
92        c.receive_data(frame_factory.preamble())
93        c.initiate_connection()
94
95        f = frame_factory.build_headers_frame(self.example_request_headers)
96        c.receive_data(f.serialize())
97        c.push_stream(1, 2, self.example_request_headers)
98        c.reset_stream(1)
99        c.clear_outbound_data_buffer()
100
101        f = frame_factory.build_rst_stream_frame(stream_id=2)
102        c.receive_data(f.serialize())
103
104        # Force a count of the streams.
105        assert not c.open_outbound_streams
106
107        # The streams dictionary should be empty.
108        assert not c.streams
109
110    def test_receive_rst_stream_on_closed_stream(self, frame_factory):
111        """
112        RST_STREAM frame should be ignored if stream is in a closed state.
113        See RFC 7540 Section 5.1 (closed state)
114        """
115        c = h2.connection.H2Connection()
116        c.initiate_connection()
117
118        # Client sends request
119        c.send_headers(1, self.example_request_headers)
120
121        # Some time passes and client sends DATA frame and closes stream,
122        # so it is in a half-closed state
123        c.send_data(1, b'some data', end_stream=True)
124
125        # Server received HEADERS frame but DATA frame is still on the way.
126        # Stream is in open state on the server-side. In this state server is
127        # allowed to end stream and reset it - this trick helps immediately
128        # close stream on the server-side.
129        headers_frame = frame_factory.build_headers_frame(
130            [(':status', '200')],
131            flags=['END_STREAM'],
132            stream_id=1,
133        )
134        events = c.receive_data(headers_frame.serialize())
135        assert len(events) == 2
136        response_received, stream_ended = events
137        assert isinstance(response_received, h2.events.ResponseReceived)
138        assert isinstance(stream_ended, h2.events.StreamEnded)
139
140        rst_stream_frame = frame_factory.build_rst_stream_frame(stream_id=1)
141        events = c.receive_data(rst_stream_frame.serialize())
142        assert not events
143
144    def test_receive_window_update_on_closed_stream(self, frame_factory):
145        """
146        WINDOW_UPDATE frame should be ignored if stream is in a closed state.
147        See RFC 7540 Section 5.1 (closed state)
148        """
149        c = h2.connection.H2Connection()
150        c.initiate_connection()
151
152        # Client sends request
153        c.send_headers(1, self.example_request_headers)
154
155        # Some time passes and client sends DATA frame and closes stream,
156        # so it is in a half-closed state
157        c.send_data(1, b'some data', end_stream=True)
158
159        # Server received HEADERS frame but DATA frame is still on the way.
160        # Stream is in open state on the server-side. In this state server is
161        # allowed to end stream and after that acknowledge received data by
162        # sending WINDOW_UPDATE frames.
163        headers_frame = frame_factory.build_headers_frame(
164            [(':status', '200')],
165            flags=['END_STREAM'],
166            stream_id=1,
167        )
168        events = c.receive_data(headers_frame.serialize())
169        assert len(events) == 2
170        response_received, stream_ended = events
171        assert isinstance(response_received, h2.events.ResponseReceived)
172        assert isinstance(stream_ended, h2.events.StreamEnded)
173
174        window_update_frame = frame_factory.build_window_update_frame(
175            stream_id=1,
176            increment=1,
177        )
178        events = c.receive_data(window_update_frame.serialize())
179        assert not events
180
181
182class TestStreamsClosedByEndStream(object):
183    example_request_headers = [
184        (':authority', 'example.com'),
185        (':path', '/'),
186        (':scheme', 'https'),
187        (':method', 'GET'),
188    ]
189    example_response_headers = [
190        (':status', '200'),
191        ('server', 'fake-serv/0.1.0')
192    ]
193    server_config = h2.config.H2Configuration(client_side=False)
194
195    @pytest.mark.parametrize(
196        "frame",
197        [
198            lambda self, ff: ff.build_headers_frame(
199                self.example_request_headers, flags=['END_STREAM']),
200            lambda self, ff: ff.build_headers_frame(
201                self.example_request_headers),
202        ]
203    )
204    @pytest.mark.parametrize("clear_streams", [True, False])
205    def test_frames_after_recv_end_will_error(self,
206                                              frame_factory,
207                                              frame,
208                                              clear_streams):
209        """
210        A stream that is closed by receiving END_STREAM raises
211        ProtocolError when it receives an unexpected frame.
212        """
213        c = h2.connection.H2Connection(config=self.server_config)
214        c.receive_data(frame_factory.preamble())
215        c.initiate_connection()
216
217        f = frame_factory.build_headers_frame(
218            self.example_request_headers, flags=['END_STREAM']
219        )
220        c.receive_data(f.serialize())
221        c.send_headers(
222            stream_id=1,
223            headers=self.example_response_headers,
224            end_stream=True
225        )
226
227        if clear_streams:
228            # Call open_inbound_streams to force the connection to clean
229            # closed streams.
230            c.open_inbound_streams
231
232        c.clear_outbound_data_buffer()
233
234        f = frame(self, frame_factory)
235        with pytest.raises(h2.exceptions.ProtocolError):
236            c.receive_data(f.serialize())
237
238        f = frame_factory.build_goaway_frame(
239            last_stream_id=1,
240            error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
241        )
242        assert c.data_to_send() == f.serialize()
243
244    @pytest.mark.parametrize(
245        "frame",
246        [
247            lambda self, ff: ff.build_headers_frame(
248                self.example_response_headers, flags=['END_STREAM']),
249            lambda self, ff: ff.build_headers_frame(
250                self.example_response_headers),
251        ]
252    )
253    @pytest.mark.parametrize("clear_streams", [True, False])
254    def test_frames_after_send_end_will_error(self,
255                                              frame_factory,
256                                              frame,
257                                              clear_streams):
258        """
259        A stream that is closed by sending END_STREAM raises
260        ProtocolError when it receives an unexpected frame.
261        """
262        c = h2.connection.H2Connection()
263        c.initiate_connection()
264        c.send_headers(stream_id=1, headers=self.example_request_headers,
265                       end_stream=True)
266
267        f = frame_factory.build_headers_frame(
268            self.example_response_headers, flags=['END_STREAM']
269        )
270        c.receive_data(f.serialize())
271
272        if clear_streams:
273            # Call open_outbound_streams to force the connection to clean
274            # closed streams.
275            c.open_outbound_streams
276
277        c.clear_outbound_data_buffer()
278
279        f = frame(self, frame_factory)
280        with pytest.raises(h2.exceptions.ProtocolError):
281            c.receive_data(f.serialize())
282
283        f = frame_factory.build_goaway_frame(
284            last_stream_id=0,
285            error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
286        )
287        assert c.data_to_send() == f.serialize()
288
289    @pytest.mark.parametrize(
290        "frame",
291        [
292            lambda self, ff: ff.build_window_update_frame(1, 1),
293            lambda self, ff: ff.build_rst_stream_frame(1)
294        ]
295    )
296    def test_frames_after_send_end_will_be_ignored(self,
297                                                   frame_factory,
298                                                   frame):
299        """
300        A stream that is closed by sending END_STREAM will raise
301        ProtocolError when received unexpected frame.
302        """
303        c = h2.connection.H2Connection(config=self.server_config)
304        c.receive_data(frame_factory.preamble())
305        c.initiate_connection()
306
307        f = frame_factory.build_headers_frame(
308            self.example_request_headers, flags=['END_STREAM']
309        )
310        c.receive_data(f.serialize())
311        c.send_headers(
312            stream_id=1,
313            headers=self.example_response_headers,
314            end_stream=True
315        )
316
317        c.clear_outbound_data_buffer()
318
319        f = frame(self, frame_factory)
320        events = c.receive_data(f.serialize())
321
322        assert not events
323
324
325class TestStreamsClosedByRstStream(object):
326    example_request_headers = [
327        (':authority', 'example.com'),
328        (':path', '/'),
329        (':scheme', 'https'),
330        (':method', 'GET'),
331    ]
332    example_response_headers = [
333        (':status', '200'),
334        ('server', 'fake-serv/0.1.0')
335    ]
336    server_config = h2.config.H2Configuration(client_side=False)
337
338    @pytest.mark.parametrize(
339        "frame",
340        [
341            lambda self, ff: ff.build_headers_frame(
342                self.example_request_headers),
343            lambda self, ff: ff.build_headers_frame(
344                self.example_request_headers, flags=['END_STREAM']),
345        ]
346    )
347    def test_resets_further_frames_after_recv_reset(self,
348                                                    frame_factory,
349                                                    frame):
350        """
351        A stream that is closed by receive RST_STREAM can receive further
352        frames: it simply sends RST_STREAM for it, and additionally
353        WINDOW_UPDATE for DATA frames.
354        """
355        c = h2.connection.H2Connection(config=self.server_config)
356        c.receive_data(frame_factory.preamble())
357        c.initiate_connection()
358
359        header_frame = frame_factory.build_headers_frame(
360            self.example_request_headers, flags=['END_STREAM']
361        )
362        c.receive_data(header_frame.serialize())
363
364        c.send_headers(
365            stream_id=1,
366            headers=self.example_response_headers,
367            end_stream=False
368        )
369
370        rst_frame = frame_factory.build_rst_stream_frame(
371            1, h2.errors.ErrorCodes.STREAM_CLOSED
372        )
373        c.receive_data(rst_frame.serialize())
374        c.clear_outbound_data_buffer()
375
376        f = frame(self, frame_factory)
377        events = c.receive_data(f.serialize())
378
379        rst_frame = frame_factory.build_rst_stream_frame(
380            1, h2.errors.ErrorCodes.STREAM_CLOSED
381        )
382        assert not events
383        assert c.data_to_send() == rst_frame.serialize()
384
385        events = c.receive_data(f.serialize() * 3)
386        assert not events
387        assert c.data_to_send() == rst_frame.serialize() * 3
388
389        # Iterate over the streams to make sure it's gone, then confirm the
390        # behaviour is unchanged.
391        c.open_outbound_streams
392
393        events = c.receive_data(f.serialize() * 3)
394        assert not events
395        assert c.data_to_send() == rst_frame.serialize() * 3
396
397    def test_resets_further_data_frames_after_recv_reset(self,
398                                                         frame_factory):
399        """
400        A stream that is closed by receive RST_STREAM can receive further
401        DATA frames: it simply sends WINDOW_UPDATE for the connection flow
402        window, and RST_STREAM for the stream.
403        """
404        c = h2.connection.H2Connection(config=self.server_config)
405        c.receive_data(frame_factory.preamble())
406        c.initiate_connection()
407
408        header_frame = frame_factory.build_headers_frame(
409            self.example_request_headers, flags=['END_STREAM']
410        )
411        c.receive_data(header_frame.serialize())
412
413        c.send_headers(
414            stream_id=1,
415            headers=self.example_response_headers,
416            end_stream=False
417        )
418
419        rst_frame = frame_factory.build_rst_stream_frame(
420            1, h2.errors.ErrorCodes.STREAM_CLOSED
421        )
422        c.receive_data(rst_frame.serialize())
423        c.clear_outbound_data_buffer()
424
425        f = frame_factory.build_data_frame(
426            data=b'some data'
427        )
428
429        events = c.receive_data(f.serialize())
430        assert not events
431
432        expected = frame_factory.build_rst_stream_frame(
433            stream_id=1,
434            error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
435        ).serialize()
436        assert c.data_to_send() == expected
437
438        events = c.receive_data(f.serialize() * 3)
439        assert not events
440        assert c.data_to_send() == expected * 3
441
442        # Iterate over the streams to make sure it's gone, then confirm the
443        # behaviour is unchanged.
444        c.open_outbound_streams
445
446        events = c.receive_data(f.serialize() * 3)
447        assert not events
448        assert c.data_to_send() == expected * 3
449
450    @pytest.mark.parametrize(
451        "frame",
452        [
453            lambda self, ff: ff.build_headers_frame(
454                self.example_request_headers),
455            lambda self, ff: ff.build_headers_frame(
456                self.example_request_headers, flags=['END_STREAM']),
457        ]
458    )
459    def test_resets_further_frames_after_send_reset(self,
460                                                    frame_factory,
461                                                    frame):
462        """
463        A stream that is closed by sent RST_STREAM can receive further frames:
464        it simply sends RST_STREAM for it.
465        """
466        c = h2.connection.H2Connection(config=self.server_config)
467        c.receive_data(frame_factory.preamble())
468        c.initiate_connection()
469
470        header_frame = frame_factory.build_headers_frame(
471            self.example_request_headers, flags=['END_STREAM']
472        )
473        c.receive_data(header_frame.serialize())
474
475        c.send_headers(
476            stream_id=1,
477            headers=self.example_response_headers,
478            end_stream=False
479        )
480
481        c.reset_stream(1, h2.errors.ErrorCodes.INTERNAL_ERROR)
482
483        rst_frame = frame_factory.build_rst_stream_frame(
484            1, h2.errors.ErrorCodes.STREAM_CLOSED
485        )
486        c.clear_outbound_data_buffer()
487
488        f = frame(self, frame_factory)
489        events = c.receive_data(f.serialize())
490
491        rst_frame = frame_factory.build_rst_stream_frame(
492            1, h2.errors.ErrorCodes.STREAM_CLOSED
493        )
494        assert not events
495        assert c.data_to_send() == rst_frame.serialize()
496
497        events = c.receive_data(f.serialize() * 3)
498        assert not events
499        assert c.data_to_send() == rst_frame.serialize() * 3
500
501        # Iterate over the streams to make sure it's gone, then confirm the
502        # behaviour is unchanged.
503        c.open_outbound_streams
504
505        events = c.receive_data(f.serialize() * 3)
506        assert not events
507        assert c.data_to_send() == rst_frame.serialize() * 3
508
509    def test_resets_further_data_frames_after_send_reset(self,
510                                                         frame_factory):
511        """
512        A stream that is closed by sent RST_STREAM can receive further
513        data frames: it simply sends WINDOW_UPDATE and RST_STREAM for it.
514        """
515        c = h2.connection.H2Connection(config=self.server_config)
516        c.receive_data(frame_factory.preamble())
517        c.initiate_connection()
518
519        header_frame = frame_factory.build_headers_frame(
520            self.example_request_headers, flags=['END_STREAM']
521        )
522        c.receive_data(header_frame.serialize())
523
524        c.send_headers(
525            stream_id=1,
526            headers=self.example_response_headers,
527            end_stream=False
528        )
529
530        c.reset_stream(1, h2.errors.ErrorCodes.INTERNAL_ERROR)
531
532        c.clear_outbound_data_buffer()
533
534        f = frame_factory.build_data_frame(
535            data=b'some data'
536        )
537        events = c.receive_data(f.serialize())
538        assert not events
539        expected = frame_factory.build_rst_stream_frame(
540            stream_id=1,
541            error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
542        ).serialize()
543        assert c.data_to_send() == expected
544
545        events = c.receive_data(f.serialize() * 3)
546        assert not events
547        assert c.data_to_send() == expected * 3
548
549        # Iterate over the streams to make sure it's gone, then confirm the
550        # behaviour is unchanged.
551        c.open_outbound_streams
552
553        events = c.receive_data(f.serialize() * 3)
554        assert not events
555        assert c.data_to_send() == expected * 3
556