1from unittest import TestCase
2
3from aiortc import rtp
4from aiortc.rtcrtpparameters import RTCRtpHeaderExtensionParameters, RTCRtpParameters
5from aiortc.rtp import (
6    RtcpByePacket,
7    RtcpPacket,
8    RtcpPsfbPacket,
9    RtcpRrPacket,
10    RtcpRtpfbPacket,
11    RtcpSdesPacket,
12    RtcpSrPacket,
13    RtpPacket,
14    clamp_packets_lost,
15    pack_header_extensions,
16    pack_packets_lost,
17    pack_remb_fci,
18    unpack_header_extensions,
19    unpack_packets_lost,
20    unpack_remb_fci,
21    unwrap_rtx,
22    wrap_rtx,
23)
24
25from .utils import load
26
27
28class RtcpPacketTest(TestCase):
29    def test_bye(self):
30        data = load("rtcp_bye.bin")
31        packets = RtcpPacket.parse(data)
32        self.assertEqual(len(packets), 1)
33
34        packet = packets[0]
35        self.assertTrue(isinstance(packet, RtcpByePacket))
36        self.assertEqual(packet.sources, [2924645187])
37        self.assertEqual(bytes(packet), data)
38
39        self.assertEqual(repr(packet), "RtcpByePacket(sources=[2924645187])")
40
41    def test_bye_invalid(self):
42        data = load("rtcp_bye_invalid.bin")
43
44        with self.assertRaises(ValueError) as cm:
45            RtcpPacket.parse(data)
46        self.assertEqual(str(cm.exception), "RTCP bye length is invalid")
47
48    def test_bye_no_sources(self):
49        data = load("rtcp_bye_no_sources.bin")
50        packets = RtcpPacket.parse(data)
51        self.assertEqual(len(packets), 1)
52
53        packet = packets[0]
54        self.assertTrue(isinstance(packet, RtcpByePacket))
55        self.assertEqual(packet.sources, [])
56        self.assertEqual(bytes(packet), data)
57
58        self.assertEqual(repr(packet), "RtcpByePacket(sources=[])")
59
60    def test_bye_only_padding(self):
61        data = load("rtcp_bye_padding.bin")
62        packets = RtcpPacket.parse(data)
63        self.assertEqual(len(packets), 1)
64
65        packet = packets[0]
66        self.assertTrue(isinstance(packet, RtcpByePacket))
67        self.assertEqual(packet.sources, [])
68        self.assertEqual(bytes(packet), b"\x80\xcb\x00\x00")
69
70        self.assertEqual(repr(packet), "RtcpByePacket(sources=[])")
71
72    def test_bye_only_padding_zero(self):
73        data = load("rtcp_bye_padding.bin")[0:4] + b"\x00\x00\x00\x00"
74
75        with self.assertRaises(ValueError) as cm:
76            RtcpPacket.parse(data)
77        self.assertEqual(str(cm.exception), "RTCP packet padding length is invalid")
78
79    def test_psfb_invalid(self):
80        data = load("rtcp_psfb_invalid.bin")
81
82        with self.assertRaises(ValueError) as cm:
83            RtcpPacket.parse(data)
84        self.assertEqual(
85            str(cm.exception), "RTCP payload-specific feedback length is invalid"
86        )
87
88    def test_psfb_pli(self):
89        data = load("rtcp_psfb_pli.bin")
90        packets = RtcpPacket.parse(data)
91        self.assertEqual(len(packets), 1)
92
93        packet = packets[0]
94        self.assertTrue(isinstance(packet, RtcpPsfbPacket))
95        self.assertEqual(packet.fmt, 1)
96        self.assertEqual(packet.ssrc, 1414554213)
97        self.assertEqual(packet.media_ssrc, 587284409)
98        self.assertEqual(packet.fci, b"")
99        self.assertEqual(bytes(packet), data)
100
101    def test_rr(self):
102        data = load("rtcp_rr.bin")
103        packets = RtcpPacket.parse(data)
104        self.assertEqual(len(packets), 1)
105
106        packet = packets[0]
107        self.assertTrue(isinstance(packet, RtcpRrPacket))
108        self.assertEqual(packet.ssrc, 817267719)
109        self.assertEqual(packet.reports[0].ssrc, 1200895919)
110        self.assertEqual(packet.reports[0].fraction_lost, 0)
111        self.assertEqual(packet.reports[0].packets_lost, 0)
112        self.assertEqual(packet.reports[0].highest_sequence, 630)
113        self.assertEqual(packet.reports[0].jitter, 1906)
114        self.assertEqual(packet.reports[0].lsr, 0)
115        self.assertEqual(packet.reports[0].dlsr, 0)
116        self.assertEqual(bytes(packet), data)
117
118    def test_rr_invalid(self):
119        data = load("rtcp_rr_invalid.bin")
120
121        with self.assertRaises(ValueError) as cm:
122            RtcpPacket.parse(data)
123        self.assertEqual(str(cm.exception), "RTCP receiver report length is invalid")
124
125    def test_rr_truncated(self):
126        data = load("rtcp_rr.bin")
127
128        for length in range(1, 4):
129            with self.assertRaises(ValueError) as cm:
130                RtcpPacket.parse(data[0:length])
131            self.assertEqual(
132                str(cm.exception), "RTCP packet length is less than 4 bytes"
133            )
134
135        for length in range(4, 32):
136            with self.assertRaises(ValueError) as cm:
137                RtcpPacket.parse(data[0:length])
138            self.assertEqual(str(cm.exception), "RTCP packet is truncated")
139
140    def test_sdes(self):
141        data = load("rtcp_sdes.bin")
142        packets = RtcpPacket.parse(data)
143        self.assertEqual(len(packets), 1)
144
145        packet = packets[0]
146        self.assertTrue(isinstance(packet, RtcpSdesPacket))
147        self.assertEqual(packet.chunks[0].ssrc, 1831097322)
148        self.assertEqual(
149            packet.chunks[0].items, [(1, b"{63f459ea-41fe-4474-9d33-9707c9ee79d1}")]
150        )
151        self.assertEqual(bytes(packet), data)
152
153    def test_sdes_item_truncated(self):
154        data = load("rtcp_sdes_item_truncated.bin")
155
156        with self.assertRaises(ValueError) as cm:
157            RtcpPacket.parse(data)
158        self.assertEqual(str(cm.exception), "RTCP SDES item is truncated")
159
160    def test_sdes_source_truncated(self):
161        data = load("rtcp_sdes_source_truncated.bin")
162
163        with self.assertRaises(ValueError) as cm:
164            RtcpPacket.parse(data)
165        self.assertEqual(str(cm.exception), "RTCP SDES source is truncated")
166
167    def test_sr(self):
168        data = load("rtcp_sr.bin")
169        packets = RtcpPacket.parse(data)
170        self.assertEqual(len(packets), 1)
171
172        packet = packets[0]
173        self.assertTrue(isinstance(packet, RtcpSrPacket))
174        self.assertEqual(packet.ssrc, 1831097322)
175        self.assertEqual(packet.sender_info.ntp_timestamp, 16016567581311369308)
176        self.assertEqual(packet.sender_info.rtp_timestamp, 1722342718)
177        self.assertEqual(packet.sender_info.packet_count, 269)
178        self.assertEqual(packet.sender_info.octet_count, 13557)
179        self.assertEqual(len(packet.reports), 1)
180        self.assertEqual(packet.reports[0].ssrc, 2398654957)
181        self.assertEqual(packet.reports[0].fraction_lost, 0)
182        self.assertEqual(packet.reports[0].packets_lost, 0)
183        self.assertEqual(packet.reports[0].highest_sequence, 246)
184        self.assertEqual(packet.reports[0].jitter, 127)
185        self.assertEqual(packet.reports[0].lsr, 0)
186        self.assertEqual(packet.reports[0].dlsr, 0)
187        self.assertEqual(bytes(packet), data)
188
189    def test_sr_invalid(self):
190        data = load("rtcp_sr_invalid.bin")
191
192        with self.assertRaises(ValueError) as cm:
193            RtcpPacket.parse(data)
194        self.assertEqual(str(cm.exception), "RTCP sender report length is invalid")
195
196    def test_rtpfb(self):
197        data = load("rtcp_rtpfb.bin")
198        packets = RtcpPacket.parse(data)
199        self.assertEqual(len(packets), 1)
200
201        packet = packets[0]
202        self.assertTrue(isinstance(packet, RtcpRtpfbPacket))
203        self.assertEqual(packet.fmt, 1)
204        self.assertEqual(packet.ssrc, 2336520123)
205        self.assertEqual(packet.media_ssrc, 4145934052)
206        self.assertEqual(
207            packet.lost,
208            [12, 32, 39, 54, 76, 110, 123, 142, 183, 187, 223, 236, 271, 292],
209        )
210        self.assertEqual(bytes(packet), data)
211
212    def test_rtpfb_invalid(self):
213        data = load("rtcp_rtpfb_invalid.bin")
214
215        with self.assertRaises(ValueError) as cm:
216            RtcpPacket.parse(data)
217        self.assertEqual(str(cm.exception), "RTCP RTP feedback length is invalid")
218
219    def test_compound(self):
220        data = load("rtcp_sr.bin") + load("rtcp_sdes.bin")
221
222        packets = RtcpPacket.parse(data)
223        self.assertEqual(len(packets), 2)
224        self.assertTrue(isinstance(packets[0], RtcpSrPacket))
225        self.assertTrue(isinstance(packets[1], RtcpSdesPacket))
226
227    def test_bad_version(self):
228        data = b"\xc0" + load("rtcp_rr.bin")[1:]
229        with self.assertRaises(ValueError) as cm:
230            RtcpPacket.parse(data)
231        self.assertEqual(str(cm.exception), "RTCP packet has invalid version")
232
233
234class RtpPacketTest(TestCase):
235    def test_dtmf(self):
236        data = load("rtp_dtmf.bin")
237        packet = RtpPacket.parse(data)
238        self.assertEqual(packet.version, 2)
239        self.assertEqual(packet.marker, 1)
240        self.assertEqual(packet.payload_type, 101)
241        self.assertEqual(packet.sequence_number, 24152)
242        self.assertEqual(packet.timestamp, 4021352124)
243        self.assertEqual(packet.csrc, [])
244        self.assertEqual(packet.extensions, rtp.HeaderExtensions())
245        self.assertEqual(len(packet.payload), 4)
246        self.assertEqual(packet.serialize(), data)
247
248    def test_no_ssrc(self):
249        data = load("rtp.bin")
250        packet = RtpPacket.parse(data)
251        self.assertEqual(packet.version, 2)
252        self.assertEqual(packet.marker, 0)
253        self.assertEqual(packet.payload_type, 0)
254        self.assertEqual(packet.sequence_number, 15743)
255        self.assertEqual(packet.timestamp, 3937035252)
256        self.assertEqual(packet.csrc, [])
257        self.assertEqual(packet.extensions, rtp.HeaderExtensions())
258        self.assertEqual(len(packet.payload), 160)
259        self.assertEqual(packet.serialize(), data)
260
261        self.assertEqual(
262            repr(packet),
263            "RtpPacket(seq=15743, ts=3937035252, marker=0, payload=0, 160 bytes)",
264        )
265
266    def test_padding_only(self):
267        data = load("rtp_only_padding.bin")
268        packet = RtpPacket.parse(data)
269        self.assertEqual(packet.version, 2)
270        self.assertEqual(packet.marker, 0)
271        self.assertEqual(packet.payload_type, 120)
272        self.assertEqual(packet.sequence_number, 27759)
273        self.assertEqual(packet.timestamp, 4044047131)
274        self.assertEqual(packet.csrc, [])
275        self.assertEqual(packet.extensions, rtp.HeaderExtensions())
276        self.assertEqual(len(packet.payload), 0)
277        self.assertEqual(packet.padding_size, 224)
278
279        serialized = packet.serialize()
280        self.assertEqual(len(serialized), len(data))
281        self.assertEqual(serialized[0:12], data[0:12])
282        self.assertEqual(serialized[-1], data[-1])
283
284    def test_padding_only_with_header_extensions(self):
285        extensions_map = rtp.HeaderExtensionsMap()
286        extensions_map.configure(
287            RTCRtpParameters(
288                headerExtensions=[
289                    RTCRtpHeaderExtensionParameters(
290                        id=2,
291                        uri="http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
292                    )
293                ]
294            )
295        )
296
297        data = load("rtp_only_padding_with_header_extensions.bin")
298        packet = RtpPacket.parse(data, extensions_map)
299        self.assertEqual(packet.version, 2)
300        self.assertEqual(packet.marker, 0)
301        self.assertEqual(packet.payload_type, 98)
302        self.assertEqual(packet.sequence_number, 22138)
303        self.assertEqual(packet.timestamp, 3171065731)
304        self.assertEqual(packet.csrc, [])
305        self.assertEqual(
306            packet.extensions, rtp.HeaderExtensions(abs_send_time=15846540)
307        )
308        self.assertEqual(len(packet.payload), 0)
309        self.assertEqual(packet.padding_size, 224)
310
311        serialized = packet.serialize(extensions_map)
312        self.assertEqual(len(serialized), len(data))
313        self.assertEqual(serialized[0:20], data[0:20])
314        self.assertEqual(serialized[-1], data[-1])
315
316    def test_padding_too_long(self):
317        data = load("rtp_only_padding.bin")[0:12] + b"\x02"
318        with self.assertRaises(ValueError) as cm:
319            RtpPacket.parse(data)
320        self.assertEqual(str(cm.exception), "RTP packet padding length is invalid")
321
322    def test_padding_zero(self):
323        data = load("rtp_only_padding.bin")[0:12] + b"\x00"
324        with self.assertRaises(ValueError) as cm:
325            RtpPacket.parse(data)
326        self.assertEqual(str(cm.exception), "RTP packet padding length is invalid")
327
328    def test_with_csrc(self):
329        data = load("rtp_with_csrc.bin")
330        packet = RtpPacket.parse(data)
331        self.assertEqual(packet.version, 2)
332        self.assertEqual(packet.marker, 0)
333        self.assertEqual(packet.payload_type, 0)
334        self.assertEqual(packet.sequence_number, 16082)
335        self.assertEqual(packet.timestamp, 144)
336        self.assertEqual(packet.csrc, [2882400001, 3735928559])
337        self.assertEqual(packet.extensions, rtp.HeaderExtensions())
338        self.assertEqual(len(packet.payload), 160)
339        self.assertEqual(packet.serialize(), data)
340
341    def test_with_csrc_truncated(self):
342        data = load("rtp_with_csrc.bin")
343        for length in range(12, 20):
344            with self.assertRaises(ValueError) as cm:
345                RtpPacket.parse(data[0:length])
346            self.assertEqual(str(cm.exception), "RTP packet has truncated CSRC")
347
348    def test_with_sdes_mid(self):
349        extensions_map = rtp.HeaderExtensionsMap()
350        extensions_map.configure(
351            RTCRtpParameters(
352                headerExtensions=[
353                    RTCRtpHeaderExtensionParameters(
354                        id=9, uri="urn:ietf:params:rtp-hdrext:sdes:mid"
355                    )
356                ]
357            )
358        )
359
360        data = load("rtp_with_sdes_mid.bin")
361        packet = RtpPacket.parse(data, extensions_map)
362        self.assertEqual(packet.version, 2)
363        self.assertEqual(packet.marker, 1)
364        self.assertEqual(packet.payload_type, 111)
365        self.assertEqual(packet.sequence_number, 14156)
366        self.assertEqual(packet.timestamp, 1327210925)
367        self.assertEqual(packet.csrc, [])
368        self.assertEqual(packet.extensions, rtp.HeaderExtensions(mid="0"))
369        self.assertEqual(len(packet.payload), 54)
370        self.assertEqual(packet.serialize(extensions_map), data)
371
372    def test_with_sdes_mid_truncated(self):
373        data = load("rtp_with_sdes_mid.bin")
374
375        for length in range(12, 16):
376            with self.assertRaises(ValueError) as cm:
377                RtpPacket.parse(data[0:length])
378            self.assertEqual(
379                str(cm.exception), "RTP packet has truncated extension profile / length"
380            )
381
382        for length in range(16, 20):
383            with self.assertRaises(ValueError) as cm:
384                RtpPacket.parse(data[0:length])
385            self.assertEqual(
386                str(cm.exception), "RTP packet has truncated extension value"
387            )
388
389    def test_truncated(self):
390        data = load("rtp.bin")[0:11]
391        with self.assertRaises(ValueError) as cm:
392            RtpPacket.parse(data)
393        self.assertEqual(str(cm.exception), "RTP packet length is less than 12 bytes")
394
395    def test_bad_version(self):
396        data = b"\xc0" + load("rtp.bin")[1:]
397        with self.assertRaises(ValueError) as cm:
398            RtpPacket.parse(data)
399        self.assertEqual(str(cm.exception), "RTP packet has invalid version")
400
401
402class RtpUtilTest(TestCase):
403    def test_clamp_packets_lost(self):
404        self.assertEqual(clamp_packets_lost(-8388609), -8388608)
405        self.assertEqual(clamp_packets_lost(-8388608), -8388608)
406        self.assertEqual(clamp_packets_lost(0), 0)
407        self.assertEqual(clamp_packets_lost(8388607), 8388607)
408        self.assertEqual(clamp_packets_lost(8388608), 8388607)
409
410    def test_pack_packets_lost(self):
411        self.assertEqual(pack_packets_lost(-8388608), b"\x80\x00\x00")
412        self.assertEqual(pack_packets_lost(-1), b"\xff\xff\xff")
413        self.assertEqual(pack_packets_lost(0), b"\x00\x00\x00")
414        self.assertEqual(pack_packets_lost(1), b"\x00\x00\x01")
415        self.assertEqual(pack_packets_lost(8388607), b"\x7f\xff\xff")
416
417    def test_pack_remb_fci(self):
418        # exponent = 0, mantissa = 0
419        data = pack_remb_fci(0, [2529072847])
420        self.assertEqual(data, b"REMB\x01\x00\x00\x00\x96\xbe\x96\xcf")
421
422        # exponent = 0, mantissa = 0x3ffff
423        data = pack_remb_fci(0x3FFFF, [2529072847])
424        self.assertEqual(data, b"REMB\x01\x03\xff\xff\x96\xbe\x96\xcf")
425
426        # exponent = 1, mantissa = 0
427        data = pack_remb_fci(0x40000, [2529072847])
428        self.assertEqual(data, b"REMB\x01\x06\x00\x00\x96\xbe\x96\xcf")
429
430        data = pack_remb_fci(4160000, [2529072847])
431        self.assertEqual(data, b"REMB\x01\x13\xf7\xa0\x96\xbe\x96\xcf")
432
433        # exponent = 63, mantissa = 0x3ffff
434        data = pack_remb_fci(0x3FFFF << 63, [2529072847])
435        self.assertEqual(data, b"REMB\x01\xff\xff\xff\x96\xbe\x96\xcf")
436
437    def test_unpack_packets_lost(self):
438        self.assertEqual(unpack_packets_lost(b"\x80\x00\x00"), -8388608)
439        self.assertEqual(unpack_packets_lost(b"\xff\xff\xff"), -1)
440        self.assertEqual(unpack_packets_lost(b"\x00\x00\x00"), 0)
441        self.assertEqual(unpack_packets_lost(b"\x00\x00\x01"), 1)
442        self.assertEqual(unpack_packets_lost(b"\x7f\xff\xff"), 8388607)
443
444    def test_unpack_remb_fci(self):
445        # junk
446        with self.assertRaises(ValueError):
447            unpack_remb_fci(b"JUNK")
448
449        # exponent = 0, mantissa = 0
450        bitrate, ssrcs = unpack_remb_fci(b"REMB\x01\x00\x00\x00\x96\xbe\x96\xcf")
451        self.assertEqual(bitrate, 0)
452        self.assertEqual(ssrcs, [2529072847])
453
454        # exponent = 0, mantissa = 0x3ffff
455        bitrate, ssrcs = unpack_remb_fci(b"REMB\x01\x03\xff\xff\x96\xbe\x96\xcf")
456        self.assertEqual(bitrate, 0x3FFFF)
457        self.assertEqual(ssrcs, [2529072847])
458
459        # exponent = 1, mantissa = 0
460        bitrate, ssrcs = unpack_remb_fci(b"REMB\x01\x06\x00\x00\x96\xbe\x96\xcf")
461        self.assertEqual(bitrate, 0x40000)
462        self.assertEqual(ssrcs, [2529072847])
463
464        # 4160000 bps
465        bitrate, ssrcs = unpack_remb_fci(b"REMB\x01\x13\xf7\xa0\x96\xbe\x96\xcf")
466        self.assertEqual(bitrate, 4160000)
467        self.assertEqual(ssrcs, [2529072847])
468
469        # exponent = 63, mantissa = 0x3ffff
470        bitrate, ssrcs = unpack_remb_fci(b"REMB\x01\xff\xff\xff\x96\xbe\x96\xcf")
471        self.assertEqual(bitrate, 0x3FFFF << 63)
472        self.assertEqual(ssrcs, [2529072847])
473
474    def test_unpack_header_extensions(self):
475        # none
476        self.assertEqual(unpack_header_extensions(0, None), [])
477
478        # one-byte, value
479        self.assertEqual(unpack_header_extensions(0xBEDE, b"\x900"), [(9, b"0")])
480
481        # one-byte, value, padding, value
482        self.assertEqual(
483            unpack_header_extensions(0xBEDE, b"\x900\x00\x00\x301"),
484            [(9, b"0"), (3, b"1")],
485        )
486
487        # one-byte, value, value
488        self.assertEqual(
489            unpack_header_extensions(0xBEDE, b"\x10\xc18sdparta_0"),
490            [(1, b"\xc1"), (3, b"sdparta_0")],
491        )
492
493        # two-byte, value
494        self.assertEqual(unpack_header_extensions(0x1000, b"\xff\x010"), [(255, b"0")])
495
496        # two-byte, value (1 byte), padding, value (2 bytes)
497        self.assertEqual(
498            unpack_header_extensions(0x1000, b"\xff\x010\x00\xf0\x0212"),
499            [(255, b"0"), (240, b"12")],
500        )
501
502    def test_unpack_header_extensions_bad(self):
503        # one-byte, value (truncated)
504        with self.assertRaises(ValueError) as cm:
505            unpack_header_extensions(0xBEDE, b"\x90")
506        self.assertEqual(
507            str(cm.exception), "RTP one-byte header extension value is truncated"
508        )
509
510        # two-byte (truncated)
511        with self.assertRaises(ValueError) as cm:
512            unpack_header_extensions(0x1000, b"\xff")
513        self.assertEqual(
514            str(cm.exception), "RTP two-byte header extension is truncated"
515        )
516
517        # two-byte, value (truncated)
518        with self.assertRaises(ValueError) as cm:
519            unpack_header_extensions(0x1000, b"\xff\x020")
520        self.assertEqual(
521            str(cm.exception), "RTP two-byte header extension value is truncated"
522        )
523
524    def test_pack_header_extensions(self):
525        # none
526        self.assertEqual(pack_header_extensions([]), (0, b""))
527
528        # one-byte, single value
529        self.assertEqual(
530            pack_header_extensions([(9, b"0")]), (0xBEDE, b"\x900\x00\x00")
531        )
532
533        # one-byte, two values
534        self.assertEqual(
535            pack_header_extensions([(1, b"\xc1"), (3, b"sdparta_0")]),
536            (0xBEDE, b"\x10\xc18sdparta_0"),
537        )
538
539        # two-byte, single value
540        self.assertEqual(
541            pack_header_extensions([(255, b"0")]), (0x1000, b"\xff\x010\x00")
542        )
543
544    def test_map_header_extensions(self):
545        data = bytearray(
546            [
547                0x90,
548                0x64,
549                0x00,
550                0x58,
551                0x65,
552                0x43,
553                0x12,
554                0x78,
555                0x12,
556                0x34,
557                0x56,
558                0x78,  # SSRC
559                0xBE,
560                0xDE,
561                0x00,
562                0x08,  # Extension of size 8x32bit words.
563                0x40,
564                0xDA,  # AudioLevel.
565                0x22,
566                0x01,
567                0x56,
568                0xCE,  # TransmissionOffset.
569                0x62,
570                0x12,
571                0x34,
572                0x56,  # AbsoluteSendTime.
573                0x81,
574                0xCE,
575                0xAB,  # TransportSequenceNumber.
576                0xA0,
577                0x03,  # VideoRotation.
578                0xB2,
579                0x12,
580                0x48,
581                0x76,  # PlayoutDelayLimits.
582                0xC2,
583                0x72,
584                0x74,
585                0x78,  # RtpStreamId
586                0xD5,
587                0x73,
588                0x74,
589                0x72,
590                0x65,
591                0x61,
592                0x6D,  # RepairedRtpStreamId
593                0x00,
594                0x00,  # Padding to 32bit boundary.
595            ]
596        )
597        extensions_map = rtp.HeaderExtensionsMap()
598        extensions_map.configure(
599            RTCRtpParameters(
600                headerExtensions=[
601                    RTCRtpHeaderExtensionParameters(
602                        id=2, uri="urn:ietf:params:rtp-hdrext:toffset"
603                    ),
604                    RTCRtpHeaderExtensionParameters(
605                        id=4, uri="urn:ietf:params:rtp-hdrext:ssrc-audio-level"
606                    ),
607                    RTCRtpHeaderExtensionParameters(
608                        id=6,
609                        uri="http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
610                    ),
611                    RTCRtpHeaderExtensionParameters(
612                        id=8,
613                        uri="http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
614                    ),
615                    RTCRtpHeaderExtensionParameters(
616                        id=12, uri="urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id"
617                    ),
618                    RTCRtpHeaderExtensionParameters(
619                        id=13,
620                        uri="urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
621                    ),
622                ]
623            )
624        )
625
626        packet = RtpPacket.parse(data, extensions_map)
627
628        # check mapped values
629        self.assertEqual(packet.extensions.abs_send_time, 0x123456)
630        self.assertEqual(packet.extensions.audio_level, (True, 90))
631        self.assertEqual(packet.extensions.mid, None)
632        self.assertEqual(packet.extensions.repaired_rtp_stream_id, "stream")
633        self.assertEqual(packet.extensions.rtp_stream_id, "rtx")
634        self.assertEqual(packet.extensions.transmission_offset, 0x156CE)
635        self.assertEqual(packet.extensions.transport_sequence_number, 0xCEAB)
636
637        # TODO: check
638        packet.serialize(extensions_map)
639
640    def test_rtx(self):
641        extensions_map = rtp.HeaderExtensionsMap()
642        extensions_map.configure(
643            RTCRtpParameters(
644                headerExtensions=[
645                    RTCRtpHeaderExtensionParameters(
646                        id=9, uri="urn:ietf:params:rtp-hdrext:sdes:mid"
647                    )
648                ]
649            )
650        )
651
652        data = load("rtp_with_sdes_mid.bin")
653        packet = RtpPacket.parse(data, extensions_map)
654
655        # wrap / unwrap RTX
656        rtx = wrap_rtx(packet, payload_type=112, sequence_number=12345, ssrc=1234)
657        recovered = unwrap_rtx(rtx, payload_type=111, ssrc=4084547440)
658
659        # check roundtrip
660        self.assertEqual(recovered.version, packet.version)
661        self.assertEqual(recovered.marker, packet.marker)
662        self.assertEqual(recovered.payload_type, packet.payload_type)
663        self.assertEqual(recovered.sequence_number, packet.sequence_number)
664        self.assertEqual(recovered.timestamp, packet.timestamp)
665        self.assertEqual(recovered.ssrc, packet.ssrc)
666        self.assertEqual(recovered.csrc, packet.csrc)
667        self.assertEqual(recovered.extensions, packet.extensions)
668        self.assertEqual(recovered.payload, packet.payload)
669