1import asyncio
2import binascii
3import contextlib
4import io
5import time
6from unittest import TestCase
7
8from aioquic import tls
9from aioquic.buffer import UINT_VAR_MAX, Buffer, encode_uint_var
10from aioquic.quic import events
11from aioquic.quic.configuration import QuicConfiguration
12from aioquic.quic.connection import (
13    QuicConnection,
14    QuicConnectionError,
15    QuicNetworkPath,
16    QuicReceiveContext,
17)
18from aioquic.quic.crypto import CryptoPair
19from aioquic.quic.logger import QuicLogger
20from aioquic.quic.packet import (
21    PACKET_TYPE_INITIAL,
22    QuicErrorCode,
23    QuicFrameType,
24    encode_quic_retry,
25    encode_quic_version_negotiation,
26)
27from aioquic.quic.packet_builder import QuicDeliveryState, QuicPacketBuilder
28from aioquic.quic.recovery import QuicPacketPacer
29
30from .utils import (
31    SERVER_CACERTFILE,
32    SERVER_CERTFILE,
33    SERVER_CERTFILE_WITH_CHAIN,
34    SERVER_KEYFILE,
35)
36
37CLIENT_ADDR = ("1.2.3.4", 1234)
38
39SERVER_ADDR = ("2.3.4.5", 4433)
40
41
42class SessionTicketStore:
43    def __init__(self):
44        self.tickets = {}
45
46    def add(self, ticket):
47        self.tickets[ticket.ticket] = ticket
48
49    def pop(self, label):
50        return self.tickets.pop(label, None)
51
52
53def client_receive_context(client, epoch=tls.Epoch.ONE_RTT):
54    return QuicReceiveContext(
55        epoch=epoch,
56        host_cid=client.host_cid,
57        network_path=client._network_paths[0],
58        quic_logger_frames=[],
59        time=asyncio.get_event_loop().time(),
60    )
61
62
63def consume_events(connection):
64    while True:
65        event = connection.next_event()
66        if event is None:
67            break
68
69
70def create_standalone_client(self, **client_options):
71    client = QuicConnection(
72        configuration=QuicConfiguration(
73            is_client=True, quic_logger=QuicLogger(), **client_options
74        )
75    )
76    client._ack_delay = 0
77
78    # kick-off handshake
79    client.connect(SERVER_ADDR, now=time.time())
80    self.assertEqual(drop(client), 1)
81
82    return client
83
84
85@contextlib.contextmanager
86def client_and_server(
87    client_kwargs={},
88    client_options={},
89    client_patch=lambda x: None,
90    handshake=True,
91    server_kwargs={},
92    server_certfile=SERVER_CERTFILE,
93    server_keyfile=SERVER_KEYFILE,
94    server_options={},
95    server_patch=lambda x: None,
96):
97    client_configuration = QuicConfiguration(
98        is_client=True, quic_logger=QuicLogger(), **client_options
99    )
100    client_configuration.load_verify_locations(cafile=SERVER_CACERTFILE)
101
102    client = QuicConnection(configuration=client_configuration, **client_kwargs)
103    client._ack_delay = 0
104    disable_packet_pacing(client)
105    client_patch(client)
106
107    server_configuration = QuicConfiguration(
108        is_client=False, quic_logger=QuicLogger(), **server_options
109    )
110    server_configuration.load_cert_chain(server_certfile, server_keyfile)
111
112    server = QuicConnection(configuration=server_configuration, **server_kwargs)
113    server._ack_delay = 0
114    disable_packet_pacing(server)
115    server_patch(server)
116
117    # perform handshake
118    if handshake:
119        client.connect(SERVER_ADDR, now=time.time())
120        for i in range(3):
121            roundtrip(client, server)
122
123    yield client, server
124
125    # close
126    client.close()
127    server.close()
128
129
130def disable_packet_pacing(connection):
131    class DummyPacketPacer(QuicPacketPacer):
132        def next_send_time(self, now):
133            return None
134
135    connection._loss._pacer = DummyPacketPacer()
136
137
138def sequence_numbers(connection_ids):
139    return list(map(lambda x: x.sequence_number, connection_ids))
140
141
142def drop(sender):
143    """
144    Drop datagrams from `sender`.
145    """
146    return len(sender.datagrams_to_send(now=time.time()))
147
148
149def roundtrip(sender, receiver):
150    """
151    Send datagrams from `sender` to `receiver` and back.
152    """
153    return (transfer(sender, receiver), transfer(receiver, sender))
154
155
156def transfer(sender, receiver):
157    """
158    Send datagrams from `sender` to `receiver`.
159    """
160    datagrams = 0
161    from_addr = CLIENT_ADDR if sender._is_client else SERVER_ADDR
162    for data, addr in sender.datagrams_to_send(now=time.time()):
163        datagrams += 1
164        receiver.receive_datagram(data, from_addr, now=time.time())
165    return datagrams
166
167
168class QuicConnectionTest(TestCase):
169    def check_handshake(self, client, server, alpn_protocol=None):
170        """
171        Check handshake completed.
172        """
173        event = client.next_event()
174        self.assertEqual(type(event), events.ProtocolNegotiated)
175        self.assertEqual(event.alpn_protocol, alpn_protocol)
176        event = client.next_event()
177        self.assertEqual(type(event), events.HandshakeCompleted)
178        self.assertEqual(event.alpn_protocol, alpn_protocol)
179        self.assertEqual(event.early_data_accepted, False)
180        self.assertEqual(event.session_resumed, False)
181        for i in range(7):
182            self.assertEqual(type(client.next_event()), events.ConnectionIdIssued)
183        self.assertIsNone(client.next_event())
184
185        event = server.next_event()
186        self.assertEqual(type(event), events.ProtocolNegotiated)
187        self.assertEqual(event.alpn_protocol, alpn_protocol)
188        event = server.next_event()
189        self.assertEqual(type(event), events.HandshakeCompleted)
190        self.assertEqual(event.alpn_protocol, alpn_protocol)
191        for i in range(7):
192            self.assertEqual(type(server.next_event()), events.ConnectionIdIssued)
193        self.assertIsNone(server.next_event())
194
195    def test_connect(self):
196        with client_and_server() as (client, server):
197            # check handshake completed
198            self.check_handshake(client=client, server=server)
199
200            # check each endpoint has available connection IDs for the peer
201            self.assertEqual(
202                sequence_numbers(client._peer_cid_available), [1, 2, 3, 4, 5, 6, 7]
203            )
204            self.assertEqual(
205                sequence_numbers(server._peer_cid_available), [1, 2, 3, 4, 5, 6, 7]
206            )
207
208            # client closes the connection
209            client.close()
210            self.assertEqual(transfer(client, server), 1)
211
212            # check connection closes on the client side
213            client.handle_timer(client.get_timer())
214            event = client.next_event()
215            self.assertEqual(type(event), events.ConnectionTerminated)
216            self.assertEqual(event.error_code, QuicErrorCode.NO_ERROR)
217            self.assertEqual(event.frame_type, None)
218            self.assertEqual(event.reason_phrase, "")
219            self.assertIsNone(client.next_event())
220
221            # check connection closes on the server side
222            server.handle_timer(server.get_timer())
223            event = server.next_event()
224            self.assertEqual(type(event), events.ConnectionTerminated)
225            self.assertEqual(event.error_code, QuicErrorCode.NO_ERROR)
226            self.assertEqual(event.frame_type, None)
227            self.assertEqual(event.reason_phrase, "")
228            self.assertIsNone(server.next_event())
229
230            # check client log
231            client_log = client.configuration.quic_logger.to_dict()
232            self.assertGreater(len(client_log["traces"][0]["events"]), 20)
233
234            # check server log
235            server_log = server.configuration.quic_logger.to_dict()
236            self.assertGreater(len(server_log["traces"][0]["events"]), 20)
237
238    def test_connect_with_alpn(self):
239        with client_and_server(
240            client_options={"alpn_protocols": ["h3-25", "hq-25"]},
241            server_options={"alpn_protocols": ["hq-25"]},
242        ) as (client, server):
243            # check handshake completed
244            self.check_handshake(client=client, server=server, alpn_protocol="hq-25")
245
246    def test_connect_with_secrets_log(self):
247        client_log_file = io.StringIO()
248        server_log_file = io.StringIO()
249        with client_and_server(
250            client_options={"secrets_log_file": client_log_file},
251            server_options={"secrets_log_file": server_log_file},
252        ) as (client, server):
253            # check handshake completed
254            self.check_handshake(client=client, server=server)
255
256            # check secrets were logged
257            client_log = client_log_file.getvalue()
258            server_log = server_log_file.getvalue()
259            self.assertEqual(client_log, server_log)
260            labels = []
261            for line in client_log.splitlines():
262                labels.append(line.split()[0])
263            self.assertEqual(
264                labels,
265                [
266                    "QUIC_SERVER_HANDSHAKE_TRAFFIC_SECRET",
267                    "QUIC_CLIENT_HANDSHAKE_TRAFFIC_SECRET",
268                    "QUIC_SERVER_TRAFFIC_SECRET_0",
269                    "QUIC_CLIENT_TRAFFIC_SECRET_0",
270                ],
271            )
272
273    def test_connect_with_cert_chain(self):
274        with client_and_server(server_certfile=SERVER_CERTFILE_WITH_CHAIN) as (
275            client,
276            server,
277        ):
278            # check handshake completed
279            self.check_handshake(client=client, server=server)
280
281    def test_connect_with_loss_1(self):
282        """
283        Check connection is established even in the client's INITIAL is lost.
284        """
285
286        def datagram_sizes(items):
287            return [len(x[0]) for x in items]
288
289        client_configuration = QuicConfiguration(is_client=True)
290        client_configuration.load_verify_locations(cafile=SERVER_CACERTFILE)
291
292        client = QuicConnection(configuration=client_configuration)
293        client._ack_delay = 0
294
295        server_configuration = QuicConfiguration(is_client=False)
296        server_configuration.load_cert_chain(SERVER_CERTFILE, SERVER_KEYFILE)
297
298        server = QuicConnection(configuration=server_configuration)
299        server._ack_delay = 0
300
301        # client sends INITIAL
302        now = 0.0
303        client.connect(SERVER_ADDR, now=now)
304        items = client.datagrams_to_send(now=now)
305        self.assertEqual(datagram_sizes(items), [1280])
306        self.assertEqual(client.get_timer(), 1.0)
307
308        # INITIAL is lost
309        now = 1.0
310        client.handle_timer(now=now)
311        items = client.datagrams_to_send(now=now)
312        self.assertEqual(datagram_sizes(items), [1280])
313        self.assertEqual(client.get_timer(), 3.0)
314
315        # server receives INITIAL, sends INITIAL + HANDSHAKE
316        now = 1.1
317        server.receive_datagram(items[0][0], CLIENT_ADDR, now=now)
318        items = server.datagrams_to_send(now=now)
319        self.assertEqual(datagram_sizes(items), [1280, 1030])
320        self.assertEqual(server.get_timer(), 2.1)
321        self.assertEqual(len(server._loss.spaces[0].sent_packets), 1)
322        self.assertEqual(len(server._loss.spaces[1].sent_packets), 2)
323        self.assertEqual(type(server.next_event()), events.ProtocolNegotiated)
324        self.assertIsNone(server.next_event())
325
326        # handshake continues normally
327        now = 1.2
328        client.receive_datagram(items[0][0], SERVER_ADDR, now=now)
329        client.receive_datagram(items[1][0], SERVER_ADDR, now=now)
330        items = client.datagrams_to_send(now=now)
331        self.assertEqual(datagram_sizes(items), [376])
332        self.assertAlmostEqual(client.get_timer(), 1.825)
333        self.assertEqual(type(client.next_event()), events.ProtocolNegotiated)
334        self.assertEqual(type(client.next_event()), events.HandshakeCompleted)
335        self.assertEqual(type(client.next_event()), events.ConnectionIdIssued)
336
337        now = 1.3
338        server.receive_datagram(items[0][0], CLIENT_ADDR, now=now)
339        items = server.datagrams_to_send(now=now)
340        self.assertEqual(datagram_sizes(items), [229])
341        self.assertAlmostEqual(server.get_timer(), 1.825)
342        self.assertEqual(len(server._loss.spaces[0].sent_packets), 0)
343        self.assertEqual(len(server._loss.spaces[1].sent_packets), 0)
344        self.assertEqual(type(server.next_event()), events.HandshakeCompleted)
345        self.assertEqual(type(server.next_event()), events.ConnectionIdIssued)
346
347        now = 1.4
348        client.receive_datagram(items[0][0], SERVER_ADDR, now=now)
349        items = client.datagrams_to_send(now=now)
350        self.assertEqual(datagram_sizes(items), [32])
351        self.assertAlmostEqual(client.get_timer(), 61.4)  # idle timeout
352
353    def test_connect_with_loss_2(self):
354        def datagram_sizes(items):
355            return [len(x[0]) for x in items]
356
357        client_configuration = QuicConfiguration(is_client=True)
358        client_configuration.load_verify_locations(cafile=SERVER_CACERTFILE)
359
360        client = QuicConnection(configuration=client_configuration)
361        client._ack_delay = 0
362
363        server_configuration = QuicConfiguration(is_client=False)
364        server_configuration.load_cert_chain(SERVER_CERTFILE, SERVER_KEYFILE)
365
366        server = QuicConnection(configuration=server_configuration)
367        server._ack_delay = 0
368
369        # client sends INITIAL
370        now = 0.0
371        client.connect(SERVER_ADDR, now=now)
372        items = client.datagrams_to_send(now=now)
373        self.assertEqual(datagram_sizes(items), [1280])
374        self.assertEqual(client.get_timer(), 1.0)
375
376        # server receives INITIAL, sends INITIAL + HANDSHAKE but second datagram is lost
377        now = 0.1
378        server.receive_datagram(items[0][0], CLIENT_ADDR, now=now)
379        items = server.datagrams_to_send(now=now)
380        self.assertEqual(datagram_sizes(items), [1280, 1030])
381        self.assertEqual(server.get_timer(), 1.1)
382        self.assertEqual(len(server._loss.spaces[0].sent_packets), 1)
383        self.assertEqual(len(server._loss.spaces[1].sent_packets), 2)
384
385        # client only receives first datagram and sends ACKS
386        now = 0.2
387        client.receive_datagram(items[0][0], SERVER_ADDR, now=now)
388        items = client.datagrams_to_send(now=now)
389        self.assertEqual(datagram_sizes(items), [97])
390        self.assertAlmostEqual(client.get_timer(), 0.625)
391        self.assertEqual(type(client.next_event()), events.ProtocolNegotiated)
392        self.assertIsNone(client.next_event())
393
394        # client PTO - HANDSHAKE PING
395        now = client.get_timer()  # ~0.625
396        client.handle_timer(now=now)
397        items = client.datagrams_to_send(now=now)
398        self.assertEqual(datagram_sizes(items), [44])
399        self.assertAlmostEqual(client.get_timer(), 1.875)
400
401        # server receives PING, discards INITIAL and sends ACK
402        now = 0.725
403        server.receive_datagram(items[0][0], CLIENT_ADDR, now=now)
404        items = server.datagrams_to_send(now=now)
405        self.assertEqual(datagram_sizes(items), [48])
406        self.assertAlmostEqual(server.get_timer(), 1.1)
407        self.assertEqual(len(server._loss.spaces[0].sent_packets), 0)
408        self.assertEqual(len(server._loss.spaces[1].sent_packets), 3)
409        self.assertEqual(type(server.next_event()), events.ProtocolNegotiated)
410        self.assertIsNone(server.next_event())
411
412        # ACKs are lost, server retransmits HANDSHAKE
413        now = server.get_timer()
414        server.handle_timer(now=now)
415        items = server.datagrams_to_send(now=now)
416        self.assertEqual(datagram_sizes(items), [1280, 854])
417        self.assertAlmostEqual(server.get_timer(), 3.1)
418        self.assertEqual(len(server._loss.spaces[0].sent_packets), 0)
419        self.assertEqual(len(server._loss.spaces[1].sent_packets), 3)
420        self.assertIsNone(server.next_event())
421
422        # handshake continues normally
423        now = 1.2
424        client.receive_datagram(items[0][0], SERVER_ADDR, now=now)
425        client.receive_datagram(items[1][0], SERVER_ADDR, now=now)
426        items = client.datagrams_to_send(now=now)
427        self.assertEqual(datagram_sizes(items), [329])
428        self.assertAlmostEqual(client.get_timer(), 2.45)
429        self.assertEqual(type(client.next_event()), events.HandshakeCompleted)
430        self.assertEqual(type(client.next_event()), events.ConnectionIdIssued)
431
432        now = 1.3
433        server.receive_datagram(items[0][0], CLIENT_ADDR, now=now)
434        items = server.datagrams_to_send(now=now)
435        self.assertEqual(datagram_sizes(items), [229])
436        self.assertAlmostEqual(server.get_timer(), 1.925)
437        self.assertEqual(type(server.next_event()), events.HandshakeCompleted)
438        self.assertEqual(type(server.next_event()), events.ConnectionIdIssued)
439
440        now = 1.4
441        client.receive_datagram(items[0][0], SERVER_ADDR, now=now)
442        items = client.datagrams_to_send(now=now)
443        self.assertEqual(datagram_sizes(items), [32])
444        self.assertAlmostEqual(client.get_timer(), 61.4)  # idle timeout
445
446    def test_connect_with_loss_3(self):
447        def datagram_sizes(items):
448            return [len(x[0]) for x in items]
449
450        client_configuration = QuicConfiguration(is_client=True)
451        client_configuration.load_verify_locations(cafile=SERVER_CACERTFILE)
452
453        client = QuicConnection(configuration=client_configuration)
454        client._ack_delay = 0
455
456        server_configuration = QuicConfiguration(is_client=False)
457        server_configuration.load_cert_chain(SERVER_CERTFILE, SERVER_KEYFILE)
458
459        server = QuicConnection(configuration=server_configuration)
460        server._ack_delay = 0
461
462        # client sends INITIAL
463        now = 0.0
464        client.connect(SERVER_ADDR, now=now)
465        items = client.datagrams_to_send(now=now)
466        self.assertEqual(datagram_sizes(items), [1280])
467        self.assertEqual(client.get_timer(), 1.0)
468
469        # server receives INITIAL, sends INITIAL + HANDSHAKE
470        now = 0.1
471        server.receive_datagram(items[0][0], CLIENT_ADDR, now=now)
472        items = server.datagrams_to_send(now=now)
473        self.assertEqual(datagram_sizes(items), [1280, 1030])
474        self.assertEqual(server.get_timer(), 1.1)
475        self.assertEqual(len(server._loss.spaces[0].sent_packets), 1)
476        self.assertEqual(len(server._loss.spaces[1].sent_packets), 2)
477
478        # client receives INITIAL + HANDSHAKE
479        now = 0.2
480        client.receive_datagram(items[0][0], SERVER_ADDR, now=now)
481        client.receive_datagram(items[1][0], SERVER_ADDR, now=now)
482        items = client.datagrams_to_send(now=now)
483        self.assertEqual(datagram_sizes(items), [376])
484        self.assertAlmostEqual(client.get_timer(), 0.825)
485        self.assertEqual(type(client.next_event()), events.ProtocolNegotiated)
486        self.assertEqual(type(client.next_event()), events.HandshakeCompleted)
487        self.assertEqual(type(client.next_event()), events.ConnectionIdIssued)
488
489        # server completes handshake
490        now = 0.3
491        server.receive_datagram(items[0][0], CLIENT_ADDR, now=now)
492        items = server.datagrams_to_send(now=now)
493        self.assertEqual(datagram_sizes(items), [229])
494        self.assertAlmostEqual(server.get_timer(), 0.825)
495        self.assertEqual(len(server._loss.spaces[0].sent_packets), 0)
496        self.assertEqual(len(server._loss.spaces[1].sent_packets), 0)
497        self.assertEqual(type(server.next_event()), events.ProtocolNegotiated)
498        self.assertEqual(type(server.next_event()), events.HandshakeCompleted)
499        self.assertEqual(type(server.next_event()), events.ConnectionIdIssued)
500
501        # server PTO - 1-RTT PING
502        now = 0.825
503        server.handle_timer(now=now)
504        items = server.datagrams_to_send(now=now)
505        self.assertEqual(datagram_sizes(items), [29])
506        self.assertAlmostEqual(server.get_timer(), 1.875)
507
508        # client receives PING, sends ACK
509        now = 0.9
510        client.receive_datagram(items[0][0], SERVER_ADDR, now=now)
511        items = client.datagrams_to_send(now=now)
512        self.assertEqual(datagram_sizes(items), [32])
513        self.assertAlmostEqual(client.get_timer(), 0.825)
514
515        # server receives ACK, retransmits HANDSHAKE_DONE
516        now = 1.0
517        self.assertFalse(server._handshake_done_pending)
518        server.receive_datagram(items[0][0], CLIENT_ADDR, now=now)
519        self.assertTrue(server._handshake_done_pending)
520        items = server.datagrams_to_send(now=now)
521        self.assertFalse(server._handshake_done_pending)
522        self.assertEqual(datagram_sizes(items), [224])
523
524    def test_connect_with_quantum_readiness(self):
525        with client_and_server(client_options={"quantum_readiness_test": True},) as (
526            client,
527            server,
528        ):
529            stream_id = client.get_next_available_stream_id()
530            client.send_stream_data(stream_id, b"hello")
531
532            self.assertEqual(roundtrip(client, server), (1, 1))
533
534            received = None
535            while True:
536                event = server.next_event()
537                if isinstance(event, events.StreamDataReceived):
538                    received = event.data
539                elif event is None:
540                    break
541
542            self.assertEqual(received, b"hello")
543
544    def test_connect_with_0rtt(self):
545        client_ticket = None
546        ticket_store = SessionTicketStore()
547
548        def save_session_ticket(ticket):
549            nonlocal client_ticket
550            client_ticket = ticket
551
552        with client_and_server(
553            client_kwargs={"session_ticket_handler": save_session_ticket},
554            server_kwargs={"session_ticket_handler": ticket_store.add},
555        ) as (client, server):
556            pass
557
558        with client_and_server(
559            client_options={"session_ticket": client_ticket},
560            server_kwargs={"session_ticket_fetcher": ticket_store.pop},
561            handshake=False,
562        ) as (client, server):
563            client.connect(SERVER_ADDR, now=time.time())
564            stream_id = client.get_next_available_stream_id()
565            client.send_stream_data(stream_id, b"hello")
566
567            self.assertEqual(roundtrip(client, server), (2, 1))
568
569            event = server.next_event()
570            self.assertEqual(type(event), events.ProtocolNegotiated)
571
572            event = server.next_event()
573            self.assertEqual(type(event), events.StreamDataReceived)
574            self.assertEqual(event.data, b"hello")
575
576    def test_connect_with_0rtt_bad_max_early_data(self):
577        client_ticket = None
578        ticket_store = SessionTicketStore()
579
580        def patch(server):
581            """
582            Patch server's TLS initialization to set an invalid
583            max_early_data value.
584            """
585            real_initialize = server._initialize
586
587            def patched_initialize(peer_cid: bytes):
588                real_initialize(peer_cid)
589                server.tls._max_early_data = 12345
590
591            server._initialize = patched_initialize
592
593        def save_session_ticket(ticket):
594            nonlocal client_ticket
595            client_ticket = ticket
596
597        with client_and_server(
598            client_kwargs={"session_ticket_handler": save_session_ticket},
599            server_kwargs={"session_ticket_handler": ticket_store.add},
600            server_patch=patch,
601        ) as (client, server):
602            # check handshake failed
603            event = client.next_event()
604            self.assertIsNone(event)
605
606    def test_change_connection_id(self):
607        with client_and_server() as (client, server):
608            self.assertEqual(
609                sequence_numbers(client._peer_cid_available), [1, 2, 3, 4, 5, 6, 7]
610            )
611
612            # the client changes connection ID
613            client.change_connection_id()
614            self.assertEqual(transfer(client, server), 1)
615            self.assertEqual(
616                sequence_numbers(client._peer_cid_available), [2, 3, 4, 5, 6, 7]
617            )
618
619            # the server provides a new connection ID
620            self.assertEqual(transfer(server, client), 1)
621            self.assertEqual(
622                sequence_numbers(client._peer_cid_available), [2, 3, 4, 5, 6, 7, 8]
623            )
624
625    def test_change_connection_id_retransmit_new_connection_id(self):
626        with client_and_server() as (client, server):
627            self.assertEqual(
628                sequence_numbers(client._peer_cid_available), [1, 2, 3, 4, 5, 6, 7]
629            )
630
631            # the client changes connection ID
632            client.change_connection_id()
633            self.assertEqual(transfer(client, server), 1)
634            self.assertEqual(
635                sequence_numbers(client._peer_cid_available), [2, 3, 4, 5, 6, 7]
636            )
637
638            # the server provides a new connection ID, NEW_CONNECTION_ID is lost
639            self.assertEqual(drop(server), 1)
640            self.assertEqual(
641                sequence_numbers(client._peer_cid_available), [2, 3, 4, 5, 6, 7]
642            )
643
644            # NEW_CONNECTION_ID is retransmitted
645            server._on_new_connection_id_delivery(
646                QuicDeliveryState.LOST, server._host_cids[-1]
647            )
648            self.assertEqual(transfer(server, client), 1)
649            self.assertEqual(
650                sequence_numbers(client._peer_cid_available), [2, 3, 4, 5, 6, 7, 8]
651            )
652
653    def test_change_connection_id_retransmit_retire_connection_id(self):
654        with client_and_server() as (client, server):
655            self.assertEqual(
656                sequence_numbers(client._peer_cid_available), [1, 2, 3, 4, 5, 6, 7]
657            )
658
659            # the client changes connection ID, RETIRE_CONNECTION_ID is lost
660            client.change_connection_id()
661            self.assertEqual(drop(client), 1)
662            self.assertEqual(
663                sequence_numbers(client._peer_cid_available), [2, 3, 4, 5, 6, 7]
664            )
665
666            # RETIRE_CONNECTION_ID is retransmitted
667            client._on_retire_connection_id_delivery(QuicDeliveryState.LOST, 0)
668            self.assertEqual(transfer(client, server), 1)
669
670            # the server provides a new connection ID
671            self.assertEqual(transfer(server, client), 1)
672            self.assertEqual(
673                sequence_numbers(client._peer_cid_available), [2, 3, 4, 5, 6, 7, 8]
674            )
675
676    def test_get_next_available_stream_id(self):
677        with client_and_server() as (client, server):
678            # client
679            stream_id = client.get_next_available_stream_id()
680            self.assertEqual(stream_id, 0)
681            client.send_stream_data(stream_id, b"hello")
682
683            stream_id = client.get_next_available_stream_id()
684            self.assertEqual(stream_id, 4)
685            client.send_stream_data(stream_id, b"hello")
686
687            stream_id = client.get_next_available_stream_id(is_unidirectional=True)
688            self.assertEqual(stream_id, 2)
689            client.send_stream_data(stream_id, b"hello")
690
691            stream_id = client.get_next_available_stream_id(is_unidirectional=True)
692            self.assertEqual(stream_id, 6)
693            client.send_stream_data(stream_id, b"hello")
694
695            # server
696            stream_id = server.get_next_available_stream_id()
697            self.assertEqual(stream_id, 1)
698            server.send_stream_data(stream_id, b"hello")
699
700            stream_id = server.get_next_available_stream_id()
701            self.assertEqual(stream_id, 5)
702            server.send_stream_data(stream_id, b"hello")
703
704            stream_id = server.get_next_available_stream_id(is_unidirectional=True)
705            self.assertEqual(stream_id, 3)
706            server.send_stream_data(stream_id, b"hello")
707
708            stream_id = server.get_next_available_stream_id(is_unidirectional=True)
709            self.assertEqual(stream_id, 7)
710            server.send_stream_data(stream_id, b"hello")
711
712    def test_datagram_frame(self):
713        with client_and_server(
714            client_options={"max_datagram_frame_size": 65536},
715            server_options={"max_datagram_frame_size": 65536},
716        ) as (client, server):
717            # check handshake completed
718            self.check_handshake(client=client, server=server, alpn_protocol=None)
719
720            # send datagram
721            client.send_datagram_frame(b"hello")
722            self.assertEqual(transfer(client, server), 1)
723
724            event = server.next_event()
725            self.assertEqual(type(event), events.DatagramFrameReceived)
726            self.assertEqual(event.data, b"hello")
727
728    def test_datagram_frame_2(self):
729        # payload which exactly fills an entire packet
730        payload = b"Z" * 1250
731
732        with client_and_server(
733            client_options={"max_datagram_frame_size": 65536},
734            server_options={"max_datagram_frame_size": 65536},
735        ) as (client, server):
736            # check handshake completed
737            self.check_handshake(client=client, server=server, alpn_protocol=None)
738
739            # queue 20 datagrams
740            for i in range(20):
741                client.send_datagram_frame(payload)
742
743            # client can only 11 datagrams are sent due to congestion control
744            self.assertEqual(transfer(client, server), 11)
745            for i in range(11):
746                event = server.next_event()
747                self.assertEqual(type(event), events.DatagramFrameReceived)
748                self.assertEqual(event.data, payload)
749
750            # server sends ACK
751            self.assertEqual(transfer(server, client), 1)
752
753            # client sends remaining datagrams
754            self.assertEqual(transfer(client, server), 9)
755            for i in range(9):
756                event = server.next_event()
757                self.assertEqual(type(event), events.DatagramFrameReceived)
758                self.assertEqual(event.data, payload)
759
760    def test_decryption_error(self):
761        with client_and_server() as (client, server):
762            # mess with encryption key
763            server._cryptos[tls.Epoch.ONE_RTT].send.setup(
764                cipher_suite=tls.CipherSuite.AES_128_GCM_SHA256,
765                secret=bytes(48),
766                version=server._version,
767            )
768
769            # server sends close
770            server.close(error_code=QuicErrorCode.NO_ERROR)
771            for data, addr in server.datagrams_to_send(now=time.time()):
772                client.receive_datagram(data, SERVER_ADDR, now=time.time())
773
774    def test_tls_error(self):
775        def patch(client):
776            real_initialize = client._initialize
777
778            def patched_initialize(peer_cid: bytes):
779                real_initialize(peer_cid)
780                client.tls._supported_versions = [tls.TLS_VERSION_1_3_DRAFT_28]
781
782            client._initialize = patched_initialize
783
784        # handshake fails
785        with client_and_server(client_patch=patch) as (client, server):
786            timer_at = server.get_timer()
787            server.handle_timer(timer_at)
788
789            event = server.next_event()
790            self.assertEqual(type(event), events.ConnectionTerminated)
791            self.assertEqual(event.error_code, 326)
792            self.assertEqual(event.frame_type, QuicFrameType.CRYPTO)
793            self.assertEqual(event.reason_phrase, "No supported protocol version")
794
795    def test_receive_datagram_garbage(self):
796        client = create_standalone_client(self)
797
798        datagram = binascii.unhexlify("c00000000080")
799        client.receive_datagram(datagram, SERVER_ADDR, now=time.time())
800
801    def test_receive_datagram_reserved_bits_non_zero(self):
802        client = create_standalone_client(self)
803
804        builder = QuicPacketBuilder(
805            host_cid=client._peer_cid,
806            is_client=False,
807            peer_cid=client.host_cid,
808            version=client._version,
809        )
810        crypto = CryptoPair()
811        crypto.setup_initial(client._peer_cid, is_client=False, version=client._version)
812        crypto.encrypt_packet_real = crypto.encrypt_packet
813
814        def encrypt_packet(plain_header, plain_payload, packet_number):
815            # mess with reserved bits
816            plain_header = bytes([plain_header[0] | 0x0C]) + plain_header[1:]
817            return crypto.encrypt_packet_real(
818                plain_header, plain_payload, packet_number
819            )
820
821        crypto.encrypt_packet = encrypt_packet
822
823        builder.start_packet(PACKET_TYPE_INITIAL, crypto)
824        buf = builder.start_frame(QuicFrameType.PADDING)
825        buf.push_bytes(bytes(builder.remaining_flight_space))
826
827        for datagram in builder.flush()[0]:
828            client.receive_datagram(datagram, SERVER_ADDR, now=time.time())
829        self.assertEqual(drop(client), 1)
830        self.assertEqual(
831            client._close_event,
832            events.ConnectionTerminated(
833                error_code=QuicErrorCode.PROTOCOL_VIOLATION,
834                frame_type=None,
835                reason_phrase="Reserved bits must be zero",
836            ),
837        )
838
839    def test_receive_datagram_wrong_version(self):
840        client = create_standalone_client(self)
841
842        builder = QuicPacketBuilder(
843            host_cid=client._peer_cid,
844            is_client=False,
845            peer_cid=client.host_cid,
846            version=0xFF000011,  # DRAFT_16
847        )
848        crypto = CryptoPair()
849        crypto.setup_initial(client._peer_cid, is_client=False, version=client._version)
850        builder.start_packet(PACKET_TYPE_INITIAL, crypto)
851        buf = builder.start_frame(QuicFrameType.PADDING)
852        buf.push_bytes(bytes(builder.remaining_flight_space))
853
854        for datagram in builder.flush()[0]:
855            client.receive_datagram(datagram, SERVER_ADDR, now=time.time())
856        self.assertEqual(drop(client), 0)
857
858    def test_receive_datagram_retry(self):
859        client = create_standalone_client(self)
860
861        client.receive_datagram(
862            encode_quic_retry(
863                version=client._version,
864                source_cid=binascii.unhexlify("85abb547bf28be97"),
865                destination_cid=client.host_cid,
866                original_destination_cid=client._peer_cid,
867                retry_token=bytes(16),
868            ),
869            SERVER_ADDR,
870            now=time.time(),
871        )
872        self.assertEqual(drop(client), 1)
873
874    def test_receive_datagram_retry_wrong_destination_cid(self):
875        client = create_standalone_client(self)
876
877        client.receive_datagram(
878            encode_quic_retry(
879                version=client._version,
880                source_cid=binascii.unhexlify("85abb547bf28be97"),
881                destination_cid=binascii.unhexlify("c98343fe8f5f0ff4"),
882                original_destination_cid=client._peer_cid,
883                retry_token=bytes(16),
884            ),
885            SERVER_ADDR,
886            now=time.time(),
887        )
888        self.assertEqual(drop(client), 0)
889
890    def test_handle_ack_frame_ecn(self):
891        client = create_standalone_client(self)
892
893        client._handle_ack_frame(
894            client_receive_context(client),
895            QuicFrameType.ACK_ECN,
896            Buffer(data=b"\x00\x02\x00\x00\x00\x00\x00"),
897        )
898
899    def test_handle_connection_close_frame(self):
900        with client_and_server() as (client, server):
901            server.close(
902                error_code=QuicErrorCode.PROTOCOL_VIOLATION,
903                frame_type=QuicFrameType.ACK,
904                reason_phrase="illegal ACK frame",
905            )
906            self.assertEqual(roundtrip(server, client), (1, 0))
907
908            self.assertEqual(
909                client._close_event,
910                events.ConnectionTerminated(
911                    error_code=QuicErrorCode.PROTOCOL_VIOLATION,
912                    frame_type=QuicFrameType.ACK,
913                    reason_phrase="illegal ACK frame",
914                ),
915            )
916
917    def test_handle_connection_close_frame_app(self):
918        with client_and_server() as (client, server):
919            server.close(error_code=QuicErrorCode.NO_ERROR, reason_phrase="goodbye")
920            self.assertEqual(roundtrip(server, client), (1, 0))
921
922            self.assertEqual(
923                client._close_event,
924                events.ConnectionTerminated(
925                    error_code=QuicErrorCode.NO_ERROR,
926                    frame_type=None,
927                    reason_phrase="goodbye",
928                ),
929            )
930
931    def test_handle_connection_close_frame_app_not_utf8(self):
932        client = create_standalone_client(self)
933
934        client._handle_connection_close_frame(
935            client_receive_context(client),
936            QuicFrameType.APPLICATION_CLOSE,
937            Buffer(data=binascii.unhexlify("0008676f6f6462798200")),
938        )
939
940        self.assertEqual(
941            client._close_event,
942            events.ConnectionTerminated(
943                error_code=QuicErrorCode.NO_ERROR, frame_type=None, reason_phrase="",
944            ),
945        )
946
947    def test_handle_crypto_frame_over_largest_offset(self):
948        with client_and_server() as (client, server):
949            # client receives offset + length > 2^62 - 1
950            with self.assertRaises(QuicConnectionError) as cm:
951                client._handle_crypto_frame(
952                    client_receive_context(client),
953                    QuicFrameType.CRYPTO,
954                    Buffer(data=encode_uint_var(UINT_VAR_MAX) + encode_uint_var(1)),
955                )
956            self.assertEqual(
957                cm.exception.error_code, QuicErrorCode.FRAME_ENCODING_ERROR
958            )
959            self.assertEqual(cm.exception.frame_type, QuicFrameType.CRYPTO)
960            self.assertEqual(
961                cm.exception.reason_phrase, "offset + length cannot exceed 2^62 - 1"
962            )
963
964    def test_handle_data_blocked_frame(self):
965        with client_and_server() as (client, server):
966            # client receives DATA_BLOCKED: 12345
967            client._handle_data_blocked_frame(
968                client_receive_context(client),
969                QuicFrameType.DATA_BLOCKED,
970                Buffer(data=encode_uint_var(12345)),
971            )
972
973    def test_handle_datagram_frame(self):
974        client = create_standalone_client(self, max_datagram_frame_size=6)
975
976        client._handle_datagram_frame(
977            client_receive_context(client),
978            QuicFrameType.DATAGRAM,
979            Buffer(data=b"hello"),
980        )
981
982        self.assertEqual(
983            client.next_event(), events.DatagramFrameReceived(data=b"hello")
984        )
985
986    def test_handle_datagram_frame_not_allowed(self):
987        client = create_standalone_client(self, max_datagram_frame_size=None)
988
989        with self.assertRaises(QuicConnectionError) as cm:
990            client._handle_datagram_frame(
991                client_receive_context(client),
992                QuicFrameType.DATAGRAM,
993                Buffer(data=b"hello"),
994            )
995        self.assertEqual(cm.exception.error_code, QuicErrorCode.PROTOCOL_VIOLATION)
996        self.assertEqual(cm.exception.frame_type, QuicFrameType.DATAGRAM)
997        self.assertEqual(cm.exception.reason_phrase, "Unexpected DATAGRAM frame")
998
999    def test_handle_datagram_frame_too_large(self):
1000        client = create_standalone_client(self, max_datagram_frame_size=5)
1001
1002        with self.assertRaises(QuicConnectionError) as cm:
1003            client._handle_datagram_frame(
1004                client_receive_context(client),
1005                QuicFrameType.DATAGRAM,
1006                Buffer(data=b"hello"),
1007            )
1008        self.assertEqual(cm.exception.error_code, QuicErrorCode.PROTOCOL_VIOLATION)
1009        self.assertEqual(cm.exception.frame_type, QuicFrameType.DATAGRAM)
1010        self.assertEqual(cm.exception.reason_phrase, "Unexpected DATAGRAM frame")
1011
1012    def test_handle_datagram_frame_with_length(self):
1013        client = create_standalone_client(self, max_datagram_frame_size=7)
1014
1015        client._handle_datagram_frame(
1016            client_receive_context(client),
1017            QuicFrameType.DATAGRAM_WITH_LENGTH,
1018            Buffer(data=b"\x05hellojunk"),
1019        )
1020
1021        self.assertEqual(
1022            client.next_event(), events.DatagramFrameReceived(data=b"hello")
1023        )
1024
1025    def test_handle_datagram_frame_with_length_not_allowed(self):
1026        client = create_standalone_client(self, max_datagram_frame_size=None)
1027
1028        with self.assertRaises(QuicConnectionError) as cm:
1029            client._handle_datagram_frame(
1030                client_receive_context(client),
1031                QuicFrameType.DATAGRAM_WITH_LENGTH,
1032                Buffer(data=b"\x05hellojunk"),
1033            )
1034        self.assertEqual(cm.exception.error_code, QuicErrorCode.PROTOCOL_VIOLATION)
1035        self.assertEqual(cm.exception.frame_type, QuicFrameType.DATAGRAM_WITH_LENGTH)
1036        self.assertEqual(cm.exception.reason_phrase, "Unexpected DATAGRAM frame")
1037
1038    def test_handle_datagram_frame_with_length_too_large(self):
1039        client = create_standalone_client(self, max_datagram_frame_size=6)
1040
1041        with self.assertRaises(QuicConnectionError) as cm:
1042            client._handle_datagram_frame(
1043                client_receive_context(client),
1044                QuicFrameType.DATAGRAM_WITH_LENGTH,
1045                Buffer(data=b"\x05hellojunk"),
1046            )
1047        self.assertEqual(cm.exception.error_code, QuicErrorCode.PROTOCOL_VIOLATION)
1048        self.assertEqual(cm.exception.frame_type, QuicFrameType.DATAGRAM_WITH_LENGTH)
1049        self.assertEqual(cm.exception.reason_phrase, "Unexpected DATAGRAM frame")
1050
1051    def test_handle_handshake_done_not_allowed(self):
1052        with client_and_server() as (client, server):
1053            # server receives HANDSHAKE_DONE frame
1054            with self.assertRaises(QuicConnectionError) as cm:
1055                server._handle_handshake_done_frame(
1056                    client_receive_context(server),
1057                    QuicFrameType.HANDSHAKE_DONE,
1058                    Buffer(data=b""),
1059                )
1060            self.assertEqual(cm.exception.error_code, QuicErrorCode.PROTOCOL_VIOLATION)
1061            self.assertEqual(cm.exception.frame_type, QuicFrameType.HANDSHAKE_DONE)
1062            self.assertEqual(
1063                cm.exception.reason_phrase,
1064                "Clients must not send HANDSHAKE_DONE frames",
1065            )
1066
1067    def test_handle_max_data_frame(self):
1068        with client_and_server() as (client, server):
1069            self.assertEqual(client._remote_max_data, 1048576)
1070
1071            # client receives MAX_DATA raising limit
1072            client._handle_max_data_frame(
1073                client_receive_context(client),
1074                QuicFrameType.MAX_DATA,
1075                Buffer(data=encode_uint_var(1048577)),
1076            )
1077            self.assertEqual(client._remote_max_data, 1048577)
1078
1079    def test_handle_max_stream_data_frame(self):
1080        with client_and_server() as (client, server):
1081            # client creates bidirectional stream 0
1082            stream = client._create_stream(stream_id=0)
1083            self.assertEqual(stream.max_stream_data_remote, 1048576)
1084
1085            # client receives MAX_STREAM_DATA raising limit
1086            client._handle_max_stream_data_frame(
1087                client_receive_context(client),
1088                QuicFrameType.MAX_STREAM_DATA,
1089                Buffer(data=b"\x00" + encode_uint_var(1048577)),
1090            )
1091            self.assertEqual(stream.max_stream_data_remote, 1048577)
1092
1093            # client receives MAX_STREAM_DATA lowering limit
1094            client._handle_max_stream_data_frame(
1095                client_receive_context(client),
1096                QuicFrameType.MAX_STREAM_DATA,
1097                Buffer(data=b"\x00" + encode_uint_var(1048575)),
1098            )
1099            self.assertEqual(stream.max_stream_data_remote, 1048577)
1100
1101    def test_handle_max_stream_data_frame_receive_only(self):
1102        with client_and_server() as (client, server):
1103            # server creates unidirectional stream 3
1104            server.send_stream_data(stream_id=3, data=b"hello")
1105
1106            # client receives MAX_STREAM_DATA: 3, 1
1107            with self.assertRaises(QuicConnectionError) as cm:
1108                client._handle_max_stream_data_frame(
1109                    client_receive_context(client),
1110                    QuicFrameType.MAX_STREAM_DATA,
1111                    Buffer(data=b"\x03\x01"),
1112                )
1113            self.assertEqual(cm.exception.error_code, QuicErrorCode.STREAM_STATE_ERROR)
1114            self.assertEqual(cm.exception.frame_type, QuicFrameType.MAX_STREAM_DATA)
1115            self.assertEqual(cm.exception.reason_phrase, "Stream is receive-only")
1116
1117    def test_handle_max_streams_bidi_frame(self):
1118        with client_and_server() as (client, server):
1119            self.assertEqual(client._remote_max_streams_bidi, 128)
1120
1121            # client receives MAX_STREAMS_BIDI raising limit
1122            client._handle_max_streams_bidi_frame(
1123                client_receive_context(client),
1124                QuicFrameType.MAX_STREAMS_BIDI,
1125                Buffer(data=encode_uint_var(129)),
1126            )
1127            self.assertEqual(client._remote_max_streams_bidi, 129)
1128
1129            # client receives MAX_STREAMS_BIDI lowering limit
1130            client._handle_max_streams_bidi_frame(
1131                client_receive_context(client),
1132                QuicFrameType.MAX_STREAMS_BIDI,
1133                Buffer(data=encode_uint_var(127)),
1134            )
1135            self.assertEqual(client._remote_max_streams_bidi, 129)
1136
1137    def test_handle_max_streams_uni_frame(self):
1138        with client_and_server() as (client, server):
1139            self.assertEqual(client._remote_max_streams_uni, 128)
1140
1141            # client receives MAX_STREAMS_UNI raising limit
1142            client._handle_max_streams_uni_frame(
1143                client_receive_context(client),
1144                QuicFrameType.MAX_STREAMS_UNI,
1145                Buffer(data=encode_uint_var(129)),
1146            )
1147            self.assertEqual(client._remote_max_streams_uni, 129)
1148
1149            # client receives MAX_STREAMS_UNI raising limit
1150            client._handle_max_streams_uni_frame(
1151                client_receive_context(client),
1152                QuicFrameType.MAX_STREAMS_UNI,
1153                Buffer(data=encode_uint_var(127)),
1154            )
1155            self.assertEqual(client._remote_max_streams_uni, 129)
1156
1157    def test_handle_new_token_frame(self):
1158        with client_and_server() as (client, server):
1159            # client receives NEW_TOKEN
1160            client._handle_new_token_frame(
1161                client_receive_context(client),
1162                QuicFrameType.NEW_TOKEN,
1163                Buffer(data=binascii.unhexlify("080102030405060708")),
1164            )
1165
1166    def test_handle_new_token_frame_from_client(self):
1167        with client_and_server() as (client, server):
1168            # server receives NEW_TOKEN
1169            with self.assertRaises(QuicConnectionError) as cm:
1170                server._handle_new_token_frame(
1171                    client_receive_context(client),
1172                    QuicFrameType.NEW_TOKEN,
1173                    Buffer(data=binascii.unhexlify("080102030405060708")),
1174                )
1175            self.assertEqual(cm.exception.error_code, QuicErrorCode.PROTOCOL_VIOLATION)
1176            self.assertEqual(cm.exception.frame_type, QuicFrameType.NEW_TOKEN)
1177            self.assertEqual(
1178                cm.exception.reason_phrase, "Clients must not send NEW_TOKEN frames"
1179            )
1180
1181    def test_handle_path_challenge_frame(self):
1182        with client_and_server() as (client, server):
1183            # client changes address and sends some data
1184            client.send_stream_data(0, b"01234567")
1185            for data, addr in client.datagrams_to_send(now=time.time()):
1186                server.receive_datagram(data, ("1.2.3.4", 2345), now=time.time())
1187
1188            # check paths
1189            self.assertEqual(len(server._network_paths), 2)
1190            self.assertEqual(server._network_paths[0].addr, ("1.2.3.4", 2345))
1191            self.assertFalse(server._network_paths[0].is_validated)
1192            self.assertEqual(server._network_paths[1].addr, ("1.2.3.4", 1234))
1193            self.assertTrue(server._network_paths[1].is_validated)
1194
1195            # server sends PATH_CHALLENGE and receives PATH_RESPONSE
1196            for data, addr in server.datagrams_to_send(now=time.time()):
1197                client.receive_datagram(data, SERVER_ADDR, now=time.time())
1198            for data, addr in client.datagrams_to_send(now=time.time()):
1199                server.receive_datagram(data, ("1.2.3.4", 2345), now=time.time())
1200
1201            # check paths
1202            self.assertEqual(server._network_paths[0].addr, ("1.2.3.4", 2345))
1203            self.assertTrue(server._network_paths[0].is_validated)
1204            self.assertEqual(server._network_paths[1].addr, ("1.2.3.4", 1234))
1205            self.assertTrue(server._network_paths[1].is_validated)
1206
1207    def test_handle_path_response_frame_bad(self):
1208        with client_and_server() as (client, server):
1209            # server receives unsollicited PATH_RESPONSE
1210            with self.assertRaises(QuicConnectionError) as cm:
1211                server._handle_path_response_frame(
1212                    client_receive_context(client),
1213                    QuicFrameType.PATH_RESPONSE,
1214                    Buffer(data=b"\x11\x22\x33\x44\x55\x66\x77\x88"),
1215                )
1216            self.assertEqual(cm.exception.error_code, QuicErrorCode.PROTOCOL_VIOLATION)
1217            self.assertEqual(cm.exception.frame_type, QuicFrameType.PATH_RESPONSE)
1218
1219    def test_handle_padding_frame(self):
1220        client = create_standalone_client(self)
1221
1222        # no more padding
1223        buf = Buffer(data=b"")
1224        client._handle_padding_frame(
1225            client_receive_context(client), QuicFrameType.PADDING, buf
1226        )
1227        self.assertEqual(buf.tell(), 0)
1228
1229        # padding until end
1230        buf = Buffer(data=bytes(10))
1231        client._handle_padding_frame(
1232            client_receive_context(client), QuicFrameType.PADDING, buf
1233        )
1234        self.assertEqual(buf.tell(), 10)
1235
1236        # padding then something else
1237        buf = Buffer(data=bytes(10) + b"\x01")
1238        client._handle_padding_frame(
1239            client_receive_context(client), QuicFrameType.PADDING, buf
1240        )
1241        self.assertEqual(buf.tell(), 10)
1242
1243    def test_handle_reset_stream_frame(self):
1244        with client_and_server() as (client, server):
1245            # client creates bidirectional stream 0
1246            client.send_stream_data(stream_id=0, data=b"hello")
1247            consume_events(client)
1248
1249            # client receives RESET_STREAM
1250            client._handle_reset_stream_frame(
1251                client_receive_context(client),
1252                QuicFrameType.RESET_STREAM,
1253                Buffer(data=binascii.unhexlify("000100")),
1254            )
1255
1256            event = client.next_event()
1257            self.assertEqual(type(event), events.StreamReset)
1258            self.assertEqual(event.error_code, QuicErrorCode.INTERNAL_ERROR)
1259            self.assertEqual(event.stream_id, 0)
1260
1261    def test_handle_reset_stream_frame_send_only(self):
1262        with client_and_server() as (client, server):
1263            # client creates unidirectional stream 2
1264            client.send_stream_data(stream_id=2, data=b"hello")
1265
1266            # client receives RESET_STREAM
1267            with self.assertRaises(QuicConnectionError) as cm:
1268                client._handle_reset_stream_frame(
1269                    client_receive_context(client),
1270                    QuicFrameType.RESET_STREAM,
1271                    Buffer(data=binascii.unhexlify("021100")),
1272                )
1273            self.assertEqual(cm.exception.error_code, QuicErrorCode.STREAM_STATE_ERROR)
1274            self.assertEqual(cm.exception.frame_type, QuicFrameType.RESET_STREAM)
1275            self.assertEqual(cm.exception.reason_phrase, "Stream is send-only")
1276
1277    def test_handle_retire_connection_id_frame(self):
1278        with client_and_server() as (client, server):
1279            self.assertEqual(
1280                sequence_numbers(client._host_cids), [0, 1, 2, 3, 4, 5, 6, 7]
1281            )
1282
1283            # client receives RETIRE_CONNECTION_ID
1284            client._handle_retire_connection_id_frame(
1285                client_receive_context(client),
1286                QuicFrameType.RETIRE_CONNECTION_ID,
1287                Buffer(data=b"\x02"),
1288            )
1289            self.assertEqual(
1290                sequence_numbers(client._host_cids), [0, 1, 3, 4, 5, 6, 7, 8]
1291            )
1292
1293    def test_handle_retire_connection_id_frame_current_cid(self):
1294        with client_and_server() as (client, server):
1295            self.assertEqual(
1296                sequence_numbers(client._host_cids), [0, 1, 2, 3, 4, 5, 6, 7]
1297            )
1298
1299            # client receives RETIRE_CONNECTION_ID for the current CID
1300            with self.assertRaises(QuicConnectionError) as cm:
1301                client._handle_retire_connection_id_frame(
1302                    client_receive_context(client),
1303                    QuicFrameType.RETIRE_CONNECTION_ID,
1304                    Buffer(data=b"\x00"),
1305                )
1306            self.assertEqual(cm.exception.error_code, QuicErrorCode.PROTOCOL_VIOLATION)
1307            self.assertEqual(
1308                cm.exception.frame_type, QuicFrameType.RETIRE_CONNECTION_ID
1309            )
1310            self.assertEqual(
1311                cm.exception.reason_phrase, "Cannot retire current connection ID"
1312            )
1313            self.assertEqual(
1314                sequence_numbers(client._host_cids), [0, 1, 2, 3, 4, 5, 6, 7]
1315            )
1316
1317    def test_handle_stop_sending_frame(self):
1318        with client_and_server() as (client, server):
1319            # client creates bidirectional stream 0
1320            client.send_stream_data(stream_id=0, data=b"hello")
1321
1322            # client receives STOP_SENDING
1323            client._handle_stop_sending_frame(
1324                client_receive_context(client),
1325                QuicFrameType.STOP_SENDING,
1326                Buffer(data=b"\x00\x11"),
1327            )
1328
1329    def test_handle_stop_sending_frame_receive_only(self):
1330        with client_and_server() as (client, server):
1331            # server creates unidirectional stream 3
1332            server.send_stream_data(stream_id=3, data=b"hello")
1333
1334            # client receives STOP_SENDING
1335            with self.assertRaises(QuicConnectionError) as cm:
1336                client._handle_stop_sending_frame(
1337                    client_receive_context(client),
1338                    QuicFrameType.STOP_SENDING,
1339                    Buffer(data=b"\x03\x11"),
1340                )
1341            self.assertEqual(cm.exception.error_code, QuicErrorCode.STREAM_STATE_ERROR)
1342            self.assertEqual(cm.exception.frame_type, QuicFrameType.STOP_SENDING)
1343            self.assertEqual(cm.exception.reason_phrase, "Stream is receive-only")
1344
1345    def test_handle_stream_frame_over_largest_offset(self):
1346        with client_and_server() as (client, server):
1347            # client receives offset + length > 2^62 - 1
1348            frame_type = QuicFrameType.STREAM_BASE | 6
1349            stream_id = 1
1350            with self.assertRaises(QuicConnectionError) as cm:
1351                client._handle_stream_frame(
1352                    client_receive_context(client),
1353                    frame_type,
1354                    Buffer(
1355                        data=encode_uint_var(stream_id)
1356                        + encode_uint_var(UINT_VAR_MAX)
1357                        + encode_uint_var(1)
1358                    ),
1359                )
1360            self.assertEqual(
1361                cm.exception.error_code, QuicErrorCode.FRAME_ENCODING_ERROR
1362            )
1363            self.assertEqual(cm.exception.frame_type, frame_type)
1364            self.assertEqual(
1365                cm.exception.reason_phrase, "offset + length cannot exceed 2^62 - 1"
1366            )
1367
1368    def test_handle_stream_frame_over_max_data(self):
1369        with client_and_server() as (client, server):
1370            # artificially raise received data counter
1371            client._local_max_data_used = client._local_max_data
1372
1373            # client receives STREAM frame
1374            frame_type = QuicFrameType.STREAM_BASE | 4
1375            stream_id = 1
1376            with self.assertRaises(QuicConnectionError) as cm:
1377                client._handle_stream_frame(
1378                    client_receive_context(client),
1379                    frame_type,
1380                    Buffer(data=encode_uint_var(stream_id) + encode_uint_var(1)),
1381                )
1382            self.assertEqual(cm.exception.error_code, QuicErrorCode.FLOW_CONTROL_ERROR)
1383            self.assertEqual(cm.exception.frame_type, frame_type)
1384            self.assertEqual(cm.exception.reason_phrase, "Over connection data limit")
1385
1386    def test_handle_stream_frame_over_max_stream_data(self):
1387        with client_and_server() as (client, server):
1388            # client receives STREAM frame
1389            frame_type = QuicFrameType.STREAM_BASE | 4
1390            stream_id = 1
1391            with self.assertRaises(QuicConnectionError) as cm:
1392                client._handle_stream_frame(
1393                    client_receive_context(client),
1394                    frame_type,
1395                    Buffer(
1396                        data=encode_uint_var(stream_id)
1397                        + encode_uint_var(client._local_max_stream_data_bidi_remote + 1)
1398                    ),
1399                )
1400            self.assertEqual(cm.exception.error_code, QuicErrorCode.FLOW_CONTROL_ERROR)
1401            self.assertEqual(cm.exception.frame_type, frame_type)
1402            self.assertEqual(cm.exception.reason_phrase, "Over stream data limit")
1403
1404    def test_handle_stream_frame_over_max_streams(self):
1405        with client_and_server() as (client, server):
1406            # client receives STREAM frame
1407            with self.assertRaises(QuicConnectionError) as cm:
1408                client._handle_stream_frame(
1409                    client_receive_context(client),
1410                    QuicFrameType.STREAM_BASE,
1411                    Buffer(
1412                        data=encode_uint_var(client._local_max_stream_data_uni * 4 + 3)
1413                    ),
1414                )
1415            self.assertEqual(cm.exception.error_code, QuicErrorCode.STREAM_LIMIT_ERROR)
1416            self.assertEqual(cm.exception.frame_type, QuicFrameType.STREAM_BASE)
1417            self.assertEqual(cm.exception.reason_phrase, "Too many streams open")
1418
1419    def test_handle_stream_frame_send_only(self):
1420        with client_and_server() as (client, server):
1421            # client creates unidirectional stream 2
1422            client.send_stream_data(stream_id=2, data=b"hello")
1423
1424            # client receives STREAM frame
1425            with self.assertRaises(QuicConnectionError) as cm:
1426                client._handle_stream_frame(
1427                    client_receive_context(client),
1428                    QuicFrameType.STREAM_BASE,
1429                    Buffer(data=b"\x02"),
1430                )
1431            self.assertEqual(cm.exception.error_code, QuicErrorCode.STREAM_STATE_ERROR)
1432            self.assertEqual(cm.exception.frame_type, QuicFrameType.STREAM_BASE)
1433            self.assertEqual(cm.exception.reason_phrase, "Stream is send-only")
1434
1435    def test_handle_stream_frame_wrong_initiator(self):
1436        with client_and_server() as (client, server):
1437            # client receives STREAM frame
1438            with self.assertRaises(QuicConnectionError) as cm:
1439                client._handle_stream_frame(
1440                    client_receive_context(client),
1441                    QuicFrameType.STREAM_BASE,
1442                    Buffer(data=b"\x00"),
1443                )
1444            self.assertEqual(cm.exception.error_code, QuicErrorCode.STREAM_STATE_ERROR)
1445            self.assertEqual(cm.exception.frame_type, QuicFrameType.STREAM_BASE)
1446            self.assertEqual(cm.exception.reason_phrase, "Wrong stream initiator")
1447
1448    def test_handle_stream_data_blocked_frame(self):
1449        with client_and_server() as (client, server):
1450            # client creates bidirectional stream 0
1451            client.send_stream_data(stream_id=0, data=b"hello")
1452
1453            # client receives STREAM_DATA_BLOCKED
1454            client._handle_stream_data_blocked_frame(
1455                client_receive_context(client),
1456                QuicFrameType.STREAM_DATA_BLOCKED,
1457                Buffer(data=b"\x00\x01"),
1458            )
1459
1460    def test_handle_stream_data_blocked_frame_send_only(self):
1461        with client_and_server() as (client, server):
1462            # client creates unidirectional stream 2
1463            client.send_stream_data(stream_id=2, data=b"hello")
1464
1465            # client receives STREAM_DATA_BLOCKED
1466            with self.assertRaises(QuicConnectionError) as cm:
1467                client._handle_stream_data_blocked_frame(
1468                    client_receive_context(client),
1469                    QuicFrameType.STREAM_DATA_BLOCKED,
1470                    Buffer(data=b"\x02\x01"),
1471                )
1472            self.assertEqual(cm.exception.error_code, QuicErrorCode.STREAM_STATE_ERROR)
1473            self.assertEqual(cm.exception.frame_type, QuicFrameType.STREAM_DATA_BLOCKED)
1474            self.assertEqual(cm.exception.reason_phrase, "Stream is send-only")
1475
1476    def test_handle_streams_blocked_uni_frame(self):
1477        with client_and_server() as (client, server):
1478            # client receives STREAMS_BLOCKED_UNI: 0
1479            client._handle_streams_blocked_frame(
1480                client_receive_context(client),
1481                QuicFrameType.STREAMS_BLOCKED_UNI,
1482                Buffer(data=b"\x00"),
1483            )
1484
1485    def test_payload_received_padding_only(self):
1486        with client_and_server() as (client, server):
1487            # client receives padding only
1488            is_ack_eliciting, is_probing = client._payload_received(
1489                client_receive_context(client), b"\x00" * 1200
1490            )
1491            self.assertFalse(is_ack_eliciting)
1492            self.assertTrue(is_probing)
1493
1494    def test_payload_received_unknown_frame(self):
1495        with client_and_server() as (client, server):
1496            # client receives unknown frame
1497            with self.assertRaises(QuicConnectionError) as cm:
1498                client._payload_received(client_receive_context(client), b"\x1f")
1499            self.assertEqual(cm.exception.error_code, QuicErrorCode.PROTOCOL_VIOLATION)
1500            self.assertEqual(cm.exception.frame_type, 0x1F)
1501            self.assertEqual(cm.exception.reason_phrase, "Unknown frame type")
1502
1503    def test_payload_received_unexpected_frame(self):
1504        with client_and_server() as (client, server):
1505            # client receives CRYPTO frame in 0-RTT
1506            with self.assertRaises(QuicConnectionError) as cm:
1507                client._payload_received(
1508                    client_receive_context(client, epoch=tls.Epoch.ZERO_RTT), b"\x06"
1509                )
1510            self.assertEqual(cm.exception.error_code, QuicErrorCode.PROTOCOL_VIOLATION)
1511            self.assertEqual(cm.exception.frame_type, QuicFrameType.CRYPTO)
1512            self.assertEqual(cm.exception.reason_phrase, "Unexpected frame type")
1513
1514    def test_payload_received_malformed_frame(self):
1515        with client_and_server() as (client, server):
1516            # client receives malformed TRANSPORT_CLOSE frame
1517            with self.assertRaises(QuicConnectionError) as cm:
1518                client._payload_received(
1519                    client_receive_context(client), b"\x1c\x00\x01"
1520                )
1521            self.assertEqual(
1522                cm.exception.error_code, QuicErrorCode.FRAME_ENCODING_ERROR
1523            )
1524            self.assertEqual(cm.exception.frame_type, 0x1C)
1525            self.assertEqual(cm.exception.reason_phrase, "Failed to parse frame")
1526
1527    def test_send_max_data_blocked_by_cc(self):
1528        with client_and_server() as (client, server):
1529            # check congestion control
1530            self.assertEqual(client._loss.bytes_in_flight, 0)
1531            self.assertEqual(client._loss.congestion_window, 14303)
1532
1533            # artificially raise received data counter
1534            client._local_max_data_used = client._local_max_data
1535            self.assertEqual(server._remote_max_data, 1048576)
1536
1537            # artificially raise bytes in flight
1538            client._loss._cc.bytes_in_flight = 14303
1539
1540            # MAX_DATA is not sent due to congestion control
1541            self.assertEqual(drop(client), 0)
1542
1543    def test_send_max_data_retransmit(self):
1544        with client_and_server() as (client, server):
1545            # artificially raise received data counter
1546            client._local_max_data_used = client._local_max_data
1547            self.assertEqual(server._remote_max_data, 1048576)
1548
1549            # MAX_DATA is sent and lost
1550            self.assertEqual(drop(client), 1)
1551            self.assertEqual(client._local_max_data_sent, 2097152)
1552            self.assertEqual(server._remote_max_data, 1048576)
1553
1554            # MAX_DATA is retransmitted and acked
1555            client._on_max_data_delivery(QuicDeliveryState.LOST)
1556            self.assertEqual(client._local_max_data_sent, 0)
1557            self.assertEqual(roundtrip(client, server), (1, 1))
1558            self.assertEqual(server._remote_max_data, 2097152)
1559
1560    def test_send_max_stream_data_retransmit(self):
1561        with client_and_server() as (client, server):
1562            # client creates bidirectional stream 0
1563            stream = client._create_stream(stream_id=0)
1564            client.send_stream_data(0, b"hello")
1565            self.assertEqual(stream.max_stream_data_local, 1048576)
1566            self.assertEqual(stream.max_stream_data_local_sent, 1048576)
1567            self.assertEqual(roundtrip(client, server), (1, 1))
1568
1569            # server sends data, just before raising MAX_STREAM_DATA
1570            server.send_stream_data(0, b"Z" * 524288)  # 1048576 // 2
1571            for i in range(10):
1572                roundtrip(server, client)
1573            self.assertEqual(stream.max_stream_data_local, 1048576)
1574            self.assertEqual(stream.max_stream_data_local_sent, 1048576)
1575
1576            # server sends one more byte
1577            server.send_stream_data(0, b"Z")
1578            self.assertEqual(transfer(server, client), 1)
1579
1580            # MAX_STREAM_DATA is sent and lost
1581            self.assertEqual(drop(client), 1)
1582            self.assertEqual(stream.max_stream_data_local, 2097152)
1583            self.assertEqual(stream.max_stream_data_local_sent, 2097152)
1584            client._on_max_stream_data_delivery(QuicDeliveryState.LOST, stream)
1585            self.assertEqual(stream.max_stream_data_local, 2097152)
1586            self.assertEqual(stream.max_stream_data_local_sent, 0)
1587
1588            # MAX_DATA is retransmitted and acked
1589            self.assertEqual(roundtrip(client, server), (1, 1))
1590            self.assertEqual(stream.max_stream_data_local, 2097152)
1591            self.assertEqual(stream.max_stream_data_local_sent, 2097152)
1592
1593    def test_send_ping(self):
1594        with client_and_server() as (client, server):
1595            consume_events(client)
1596
1597            # client sends ping, server ACKs it
1598            client.send_ping(uid=12345)
1599            self.assertEqual(roundtrip(client, server), (1, 1))
1600
1601            # check event
1602            event = client.next_event()
1603            self.assertEqual(type(event), events.PingAcknowledged)
1604            self.assertEqual(event.uid, 12345)
1605
1606    def test_send_ping_retransmit(self):
1607        with client_and_server() as (client, server):
1608            consume_events(client)
1609
1610            # client sends another ping, PING is lost
1611            client.send_ping(uid=12345)
1612            self.assertEqual(drop(client), 1)
1613
1614            # PING is retransmitted and acked
1615            client._on_ping_delivery(QuicDeliveryState.LOST, (12345,))
1616            self.assertEqual(roundtrip(client, server), (1, 1))
1617
1618            # check event
1619            event = client.next_event()
1620            self.assertEqual(type(event), events.PingAcknowledged)
1621            self.assertEqual(event.uid, 12345)
1622
1623    def test_send_stream_data_over_max_streams_bidi(self):
1624        with client_and_server() as (client, server):
1625            # create streams
1626            for i in range(128):
1627                stream_id = i * 4
1628                client.send_stream_data(stream_id, b"")
1629                self.assertFalse(client._streams[stream_id].is_blocked)
1630            self.assertEqual(len(client._streams_blocked_bidi), 0)
1631            self.assertEqual(len(client._streams_blocked_uni), 0)
1632            self.assertEqual(roundtrip(client, server), (0, 0))
1633
1634            # create one too many -> STREAMS_BLOCKED
1635            stream_id = 128 * 4
1636            client.send_stream_data(stream_id, b"")
1637            self.assertTrue(client._streams[stream_id].is_blocked)
1638            self.assertEqual(len(client._streams_blocked_bidi), 1)
1639            self.assertEqual(len(client._streams_blocked_uni), 0)
1640            self.assertEqual(roundtrip(client, server), (1, 1))
1641
1642            # peer raises max streams
1643            client._handle_max_streams_bidi_frame(
1644                client_receive_context(client),
1645                QuicFrameType.MAX_STREAMS_BIDI,
1646                Buffer(data=encode_uint_var(129)),
1647            )
1648            self.assertFalse(client._streams[stream_id].is_blocked)
1649
1650    def test_send_stream_data_over_max_streams_uni(self):
1651        with client_and_server() as (client, server):
1652            # create streams
1653            for i in range(128):
1654                stream_id = i * 4 + 2
1655                client.send_stream_data(stream_id, b"")
1656                self.assertFalse(client._streams[stream_id].is_blocked)
1657            self.assertEqual(len(client._streams_blocked_bidi), 0)
1658            self.assertEqual(len(client._streams_blocked_uni), 0)
1659            self.assertEqual(roundtrip(client, server), (0, 0))
1660
1661            # create one too many -> STREAMS_BLOCKED
1662            stream_id = 128 * 4 + 2
1663            client.send_stream_data(stream_id, b"")
1664            self.assertTrue(client._streams[stream_id].is_blocked)
1665            self.assertEqual(len(client._streams_blocked_bidi), 0)
1666            self.assertEqual(len(client._streams_blocked_uni), 1)
1667            self.assertEqual(roundtrip(client, server), (1, 1))
1668
1669            # peer raises max streams
1670            client._handle_max_streams_uni_frame(
1671                client_receive_context(client),
1672                QuicFrameType.MAX_STREAMS_UNI,
1673                Buffer(data=encode_uint_var(129)),
1674            )
1675            self.assertFalse(client._streams[stream_id].is_blocked)
1676
1677    def test_send_stream_data_peer_initiated(self):
1678        with client_and_server() as (client, server):
1679            # server creates bidirectional stream
1680            server.send_stream_data(1, b"hello")
1681            self.assertEqual(roundtrip(server, client), (1, 1))
1682
1683            # server creates unidirectional stream
1684            server.send_stream_data(3, b"hello")
1685            self.assertEqual(roundtrip(server, client), (1, 1))
1686
1687            # client creates bidirectional stream
1688            client.send_stream_data(0, b"hello")
1689            self.assertEqual(roundtrip(client, server), (1, 1))
1690
1691            # client sends data on server-initiated bidirectional stream
1692            client.send_stream_data(1, b"hello")
1693            self.assertEqual(roundtrip(client, server), (1, 1))
1694
1695            # client create unidirectional stream
1696            client.send_stream_data(2, b"hello")
1697            self.assertEqual(roundtrip(client, server), (1, 1))
1698
1699            # client tries to send data on server-initial unidirectional stream
1700            with self.assertRaises(ValueError) as cm:
1701                client.send_stream_data(3, b"hello")
1702            self.assertEqual(
1703                str(cm.exception),
1704                "Cannot send data on peer-initiated unidirectional stream",
1705            )
1706
1707            # client tries to send data on unknown server-initiated bidirectional stream
1708            with self.assertRaises(ValueError) as cm:
1709                client.send_stream_data(5, b"hello")
1710            self.assertEqual(
1711                str(cm.exception), "Cannot send data on unknown peer-initiated stream"
1712            )
1713
1714    def test_stream_direction(self):
1715        with client_and_server() as (client, server):
1716            for off in [0, 4, 8]:
1717                # Client-Initiated, Bidirectional
1718                self.assertTrue(client._stream_can_receive(off))
1719                self.assertTrue(client._stream_can_send(off))
1720                self.assertTrue(server._stream_can_receive(off))
1721                self.assertTrue(server._stream_can_send(off))
1722
1723                # Server-Initiated, Bidirectional
1724                self.assertTrue(client._stream_can_receive(off + 1))
1725                self.assertTrue(client._stream_can_send(off + 1))
1726                self.assertTrue(server._stream_can_receive(off + 1))
1727                self.assertTrue(server._stream_can_send(off + 1))
1728
1729                # Client-Initiated, Unidirectional
1730                self.assertFalse(client._stream_can_receive(off + 2))
1731                self.assertTrue(client._stream_can_send(off + 2))
1732                self.assertTrue(server._stream_can_receive(off + 2))
1733                self.assertFalse(server._stream_can_send(off + 2))
1734
1735                # Server-Initiated, Unidirectional
1736                self.assertTrue(client._stream_can_receive(off + 3))
1737                self.assertFalse(client._stream_can_send(off + 3))
1738                self.assertFalse(server._stream_can_receive(off + 3))
1739                self.assertTrue(server._stream_can_send(off + 3))
1740
1741    def test_version_negotiation_fail(self):
1742        client = create_standalone_client(self)
1743
1744        # no common version, no retry
1745        client.receive_datagram(
1746            encode_quic_version_negotiation(
1747                source_cid=client._peer_cid,
1748                destination_cid=client.host_cid,
1749                supported_versions=[0xFF000011],  # DRAFT_16
1750            ),
1751            SERVER_ADDR,
1752            now=time.time(),
1753        )
1754        self.assertEqual(drop(client), 0)
1755
1756        event = client.next_event()
1757        self.assertEqual(type(event), events.ConnectionTerminated)
1758        self.assertEqual(event.error_code, QuicErrorCode.INTERNAL_ERROR)
1759        self.assertEqual(event.frame_type, None)
1760        self.assertEqual(
1761            event.reason_phrase, "Could not find a common protocol version"
1762        )
1763
1764    def test_version_negotiation_ok(self):
1765        client = create_standalone_client(self)
1766
1767        # found a common version, retry
1768        client.receive_datagram(
1769            encode_quic_version_negotiation(
1770                source_cid=client._peer_cid,
1771                destination_cid=client.host_cid,
1772                supported_versions=[client._version],
1773            ),
1774            SERVER_ADDR,
1775            now=time.time(),
1776        )
1777        self.assertEqual(drop(client), 1)
1778
1779
1780class QuicNetworkPathTest(TestCase):
1781    def test_can_send(self):
1782        path = QuicNetworkPath(("1.2.3.4", 1234))
1783        self.assertFalse(path.is_validated)
1784
1785        # initially, cannot send any data
1786        self.assertTrue(path.can_send(0))
1787        self.assertFalse(path.can_send(1))
1788
1789        # receive some data
1790        path.bytes_received += 1
1791        self.assertTrue(path.can_send(0))
1792        self.assertTrue(path.can_send(1))
1793        self.assertTrue(path.can_send(2))
1794        self.assertTrue(path.can_send(3))
1795        self.assertFalse(path.can_send(4))
1796
1797        # send some data
1798        path.bytes_sent += 3
1799        self.assertTrue(path.can_send(0))
1800        self.assertFalse(path.can_send(1))
1801