1from unittest import TestCase 2 3from aiortc import rtp 4from aiortc.rtcrtpparameters import RTCRtpHeaderExtensionParameters, RTCRtpParameters 5from aiortc.rtp import ( 6 RtcpByePacket, 7 RtcpPacket, 8 RtcpPsfbPacket, 9 RtcpRrPacket, 10 RtcpRtpfbPacket, 11 RtcpSdesPacket, 12 RtcpSrPacket, 13 RtpPacket, 14 clamp_packets_lost, 15 pack_header_extensions, 16 pack_packets_lost, 17 pack_remb_fci, 18 unpack_header_extensions, 19 unpack_packets_lost, 20 unpack_remb_fci, 21 unwrap_rtx, 22 wrap_rtx, 23) 24 25from .utils import load 26 27 28class RtcpPacketTest(TestCase): 29 def test_bye(self): 30 data = load("rtcp_bye.bin") 31 packets = RtcpPacket.parse(data) 32 self.assertEqual(len(packets), 1) 33 34 packet = packets[0] 35 self.assertTrue(isinstance(packet, RtcpByePacket)) 36 self.assertEqual(packet.sources, [2924645187]) 37 self.assertEqual(bytes(packet), data) 38 39 self.assertEqual(repr(packet), "RtcpByePacket(sources=[2924645187])") 40 41 def test_bye_invalid(self): 42 data = load("rtcp_bye_invalid.bin") 43 44 with self.assertRaises(ValueError) as cm: 45 RtcpPacket.parse(data) 46 self.assertEqual(str(cm.exception), "RTCP bye length is invalid") 47 48 def test_bye_no_sources(self): 49 data = load("rtcp_bye_no_sources.bin") 50 packets = RtcpPacket.parse(data) 51 self.assertEqual(len(packets), 1) 52 53 packet = packets[0] 54 self.assertTrue(isinstance(packet, RtcpByePacket)) 55 self.assertEqual(packet.sources, []) 56 self.assertEqual(bytes(packet), data) 57 58 self.assertEqual(repr(packet), "RtcpByePacket(sources=[])") 59 60 def test_bye_only_padding(self): 61 data = load("rtcp_bye_padding.bin") 62 packets = RtcpPacket.parse(data) 63 self.assertEqual(len(packets), 1) 64 65 packet = packets[0] 66 self.assertTrue(isinstance(packet, RtcpByePacket)) 67 self.assertEqual(packet.sources, []) 68 self.assertEqual(bytes(packet), b"\x80\xcb\x00\x00") 69 70 self.assertEqual(repr(packet), "RtcpByePacket(sources=[])") 71 72 def test_bye_only_padding_zero(self): 73 data = load("rtcp_bye_padding.bin")[0:4] + b"\x00\x00\x00\x00" 74 75 with self.assertRaises(ValueError) as cm: 76 RtcpPacket.parse(data) 77 self.assertEqual(str(cm.exception), "RTCP packet padding length is invalid") 78 79 def test_psfb_invalid(self): 80 data = load("rtcp_psfb_invalid.bin") 81 82 with self.assertRaises(ValueError) as cm: 83 RtcpPacket.parse(data) 84 self.assertEqual( 85 str(cm.exception), "RTCP payload-specific feedback length is invalid" 86 ) 87 88 def test_psfb_pli(self): 89 data = load("rtcp_psfb_pli.bin") 90 packets = RtcpPacket.parse(data) 91 self.assertEqual(len(packets), 1) 92 93 packet = packets[0] 94 self.assertTrue(isinstance(packet, RtcpPsfbPacket)) 95 self.assertEqual(packet.fmt, 1) 96 self.assertEqual(packet.ssrc, 1414554213) 97 self.assertEqual(packet.media_ssrc, 587284409) 98 self.assertEqual(packet.fci, b"") 99 self.assertEqual(bytes(packet), data) 100 101 def test_rr(self): 102 data = load("rtcp_rr.bin") 103 packets = RtcpPacket.parse(data) 104 self.assertEqual(len(packets), 1) 105 106 packet = packets[0] 107 self.assertTrue(isinstance(packet, RtcpRrPacket)) 108 self.assertEqual(packet.ssrc, 817267719) 109 self.assertEqual(packet.reports[0].ssrc, 1200895919) 110 self.assertEqual(packet.reports[0].fraction_lost, 0) 111 self.assertEqual(packet.reports[0].packets_lost, 0) 112 self.assertEqual(packet.reports[0].highest_sequence, 630) 113 self.assertEqual(packet.reports[0].jitter, 1906) 114 self.assertEqual(packet.reports[0].lsr, 0) 115 self.assertEqual(packet.reports[0].dlsr, 0) 116 self.assertEqual(bytes(packet), data) 117 118 def test_rr_invalid(self): 119 data = load("rtcp_rr_invalid.bin") 120 121 with self.assertRaises(ValueError) as cm: 122 RtcpPacket.parse(data) 123 self.assertEqual(str(cm.exception), "RTCP receiver report length is invalid") 124 125 def test_rr_truncated(self): 126 data = load("rtcp_rr.bin") 127 128 for length in range(1, 4): 129 with self.assertRaises(ValueError) as cm: 130 RtcpPacket.parse(data[0:length]) 131 self.assertEqual( 132 str(cm.exception), "RTCP packet length is less than 4 bytes" 133 ) 134 135 for length in range(4, 32): 136 with self.assertRaises(ValueError) as cm: 137 RtcpPacket.parse(data[0:length]) 138 self.assertEqual(str(cm.exception), "RTCP packet is truncated") 139 140 def test_sdes(self): 141 data = load("rtcp_sdes.bin") 142 packets = RtcpPacket.parse(data) 143 self.assertEqual(len(packets), 1) 144 145 packet = packets[0] 146 self.assertTrue(isinstance(packet, RtcpSdesPacket)) 147 self.assertEqual(packet.chunks[0].ssrc, 1831097322) 148 self.assertEqual( 149 packet.chunks[0].items, [(1, b"{63f459ea-41fe-4474-9d33-9707c9ee79d1}")] 150 ) 151 self.assertEqual(bytes(packet), data) 152 153 def test_sdes_item_truncated(self): 154 data = load("rtcp_sdes_item_truncated.bin") 155 156 with self.assertRaises(ValueError) as cm: 157 RtcpPacket.parse(data) 158 self.assertEqual(str(cm.exception), "RTCP SDES item is truncated") 159 160 def test_sdes_source_truncated(self): 161 data = load("rtcp_sdes_source_truncated.bin") 162 163 with self.assertRaises(ValueError) as cm: 164 RtcpPacket.parse(data) 165 self.assertEqual(str(cm.exception), "RTCP SDES source is truncated") 166 167 def test_sr(self): 168 data = load("rtcp_sr.bin") 169 packets = RtcpPacket.parse(data) 170 self.assertEqual(len(packets), 1) 171 172 packet = packets[0] 173 self.assertTrue(isinstance(packet, RtcpSrPacket)) 174 self.assertEqual(packet.ssrc, 1831097322) 175 self.assertEqual(packet.sender_info.ntp_timestamp, 16016567581311369308) 176 self.assertEqual(packet.sender_info.rtp_timestamp, 1722342718) 177 self.assertEqual(packet.sender_info.packet_count, 269) 178 self.assertEqual(packet.sender_info.octet_count, 13557) 179 self.assertEqual(len(packet.reports), 1) 180 self.assertEqual(packet.reports[0].ssrc, 2398654957) 181 self.assertEqual(packet.reports[0].fraction_lost, 0) 182 self.assertEqual(packet.reports[0].packets_lost, 0) 183 self.assertEqual(packet.reports[0].highest_sequence, 246) 184 self.assertEqual(packet.reports[0].jitter, 127) 185 self.assertEqual(packet.reports[0].lsr, 0) 186 self.assertEqual(packet.reports[0].dlsr, 0) 187 self.assertEqual(bytes(packet), data) 188 189 def test_sr_invalid(self): 190 data = load("rtcp_sr_invalid.bin") 191 192 with self.assertRaises(ValueError) as cm: 193 RtcpPacket.parse(data) 194 self.assertEqual(str(cm.exception), "RTCP sender report length is invalid") 195 196 def test_rtpfb(self): 197 data = load("rtcp_rtpfb.bin") 198 packets = RtcpPacket.parse(data) 199 self.assertEqual(len(packets), 1) 200 201 packet = packets[0] 202 self.assertTrue(isinstance(packet, RtcpRtpfbPacket)) 203 self.assertEqual(packet.fmt, 1) 204 self.assertEqual(packet.ssrc, 2336520123) 205 self.assertEqual(packet.media_ssrc, 4145934052) 206 self.assertEqual( 207 packet.lost, 208 [12, 32, 39, 54, 76, 110, 123, 142, 183, 187, 223, 236, 271, 292], 209 ) 210 self.assertEqual(bytes(packet), data) 211 212 def test_rtpfb_invalid(self): 213 data = load("rtcp_rtpfb_invalid.bin") 214 215 with self.assertRaises(ValueError) as cm: 216 RtcpPacket.parse(data) 217 self.assertEqual(str(cm.exception), "RTCP RTP feedback length is invalid") 218 219 def test_compound(self): 220 data = load("rtcp_sr.bin") + load("rtcp_sdes.bin") 221 222 packets = RtcpPacket.parse(data) 223 self.assertEqual(len(packets), 2) 224 self.assertTrue(isinstance(packets[0], RtcpSrPacket)) 225 self.assertTrue(isinstance(packets[1], RtcpSdesPacket)) 226 227 def test_bad_version(self): 228 data = b"\xc0" + load("rtcp_rr.bin")[1:] 229 with self.assertRaises(ValueError) as cm: 230 RtcpPacket.parse(data) 231 self.assertEqual(str(cm.exception), "RTCP packet has invalid version") 232 233 234class RtpPacketTest(TestCase): 235 def test_dtmf(self): 236 data = load("rtp_dtmf.bin") 237 packet = RtpPacket.parse(data) 238 self.assertEqual(packet.version, 2) 239 self.assertEqual(packet.marker, 1) 240 self.assertEqual(packet.payload_type, 101) 241 self.assertEqual(packet.sequence_number, 24152) 242 self.assertEqual(packet.timestamp, 4021352124) 243 self.assertEqual(packet.csrc, []) 244 self.assertEqual(packet.extensions, rtp.HeaderExtensions()) 245 self.assertEqual(len(packet.payload), 4) 246 self.assertEqual(packet.serialize(), data) 247 248 def test_no_ssrc(self): 249 data = load("rtp.bin") 250 packet = RtpPacket.parse(data) 251 self.assertEqual(packet.version, 2) 252 self.assertEqual(packet.marker, 0) 253 self.assertEqual(packet.payload_type, 0) 254 self.assertEqual(packet.sequence_number, 15743) 255 self.assertEqual(packet.timestamp, 3937035252) 256 self.assertEqual(packet.csrc, []) 257 self.assertEqual(packet.extensions, rtp.HeaderExtensions()) 258 self.assertEqual(len(packet.payload), 160) 259 self.assertEqual(packet.serialize(), data) 260 261 self.assertEqual( 262 repr(packet), 263 "RtpPacket(seq=15743, ts=3937035252, marker=0, payload=0, 160 bytes)", 264 ) 265 266 def test_padding_only(self): 267 data = load("rtp_only_padding.bin") 268 packet = RtpPacket.parse(data) 269 self.assertEqual(packet.version, 2) 270 self.assertEqual(packet.marker, 0) 271 self.assertEqual(packet.payload_type, 120) 272 self.assertEqual(packet.sequence_number, 27759) 273 self.assertEqual(packet.timestamp, 4044047131) 274 self.assertEqual(packet.csrc, []) 275 self.assertEqual(packet.extensions, rtp.HeaderExtensions()) 276 self.assertEqual(len(packet.payload), 0) 277 self.assertEqual(packet.padding_size, 224) 278 279 serialized = packet.serialize() 280 self.assertEqual(len(serialized), len(data)) 281 self.assertEqual(serialized[0:12], data[0:12]) 282 self.assertEqual(serialized[-1], data[-1]) 283 284 def test_padding_only_with_header_extensions(self): 285 extensions_map = rtp.HeaderExtensionsMap() 286 extensions_map.configure( 287 RTCRtpParameters( 288 headerExtensions=[ 289 RTCRtpHeaderExtensionParameters( 290 id=2, 291 uri="http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time", 292 ) 293 ] 294 ) 295 ) 296 297 data = load("rtp_only_padding_with_header_extensions.bin") 298 packet = RtpPacket.parse(data, extensions_map) 299 self.assertEqual(packet.version, 2) 300 self.assertEqual(packet.marker, 0) 301 self.assertEqual(packet.payload_type, 98) 302 self.assertEqual(packet.sequence_number, 22138) 303 self.assertEqual(packet.timestamp, 3171065731) 304 self.assertEqual(packet.csrc, []) 305 self.assertEqual( 306 packet.extensions, rtp.HeaderExtensions(abs_send_time=15846540) 307 ) 308 self.assertEqual(len(packet.payload), 0) 309 self.assertEqual(packet.padding_size, 224) 310 311 serialized = packet.serialize(extensions_map) 312 self.assertEqual(len(serialized), len(data)) 313 self.assertEqual(serialized[0:20], data[0:20]) 314 self.assertEqual(serialized[-1], data[-1]) 315 316 def test_padding_too_long(self): 317 data = load("rtp_only_padding.bin")[0:12] + b"\x02" 318 with self.assertRaises(ValueError) as cm: 319 RtpPacket.parse(data) 320 self.assertEqual(str(cm.exception), "RTP packet padding length is invalid") 321 322 def test_padding_zero(self): 323 data = load("rtp_only_padding.bin")[0:12] + b"\x00" 324 with self.assertRaises(ValueError) as cm: 325 RtpPacket.parse(data) 326 self.assertEqual(str(cm.exception), "RTP packet padding length is invalid") 327 328 def test_with_csrc(self): 329 data = load("rtp_with_csrc.bin") 330 packet = RtpPacket.parse(data) 331 self.assertEqual(packet.version, 2) 332 self.assertEqual(packet.marker, 0) 333 self.assertEqual(packet.payload_type, 0) 334 self.assertEqual(packet.sequence_number, 16082) 335 self.assertEqual(packet.timestamp, 144) 336 self.assertEqual(packet.csrc, [2882400001, 3735928559]) 337 self.assertEqual(packet.extensions, rtp.HeaderExtensions()) 338 self.assertEqual(len(packet.payload), 160) 339 self.assertEqual(packet.serialize(), data) 340 341 def test_with_csrc_truncated(self): 342 data = load("rtp_with_csrc.bin") 343 for length in range(12, 20): 344 with self.assertRaises(ValueError) as cm: 345 RtpPacket.parse(data[0:length]) 346 self.assertEqual(str(cm.exception), "RTP packet has truncated CSRC") 347 348 def test_with_sdes_mid(self): 349 extensions_map = rtp.HeaderExtensionsMap() 350 extensions_map.configure( 351 RTCRtpParameters( 352 headerExtensions=[ 353 RTCRtpHeaderExtensionParameters( 354 id=9, uri="urn:ietf:params:rtp-hdrext:sdes:mid" 355 ) 356 ] 357 ) 358 ) 359 360 data = load("rtp_with_sdes_mid.bin") 361 packet = RtpPacket.parse(data, extensions_map) 362 self.assertEqual(packet.version, 2) 363 self.assertEqual(packet.marker, 1) 364 self.assertEqual(packet.payload_type, 111) 365 self.assertEqual(packet.sequence_number, 14156) 366 self.assertEqual(packet.timestamp, 1327210925) 367 self.assertEqual(packet.csrc, []) 368 self.assertEqual(packet.extensions, rtp.HeaderExtensions(mid="0")) 369 self.assertEqual(len(packet.payload), 54) 370 self.assertEqual(packet.serialize(extensions_map), data) 371 372 def test_with_sdes_mid_truncated(self): 373 data = load("rtp_with_sdes_mid.bin") 374 375 for length in range(12, 16): 376 with self.assertRaises(ValueError) as cm: 377 RtpPacket.parse(data[0:length]) 378 self.assertEqual( 379 str(cm.exception), "RTP packet has truncated extension profile / length" 380 ) 381 382 for length in range(16, 20): 383 with self.assertRaises(ValueError) as cm: 384 RtpPacket.parse(data[0:length]) 385 self.assertEqual( 386 str(cm.exception), "RTP packet has truncated extension value" 387 ) 388 389 def test_truncated(self): 390 data = load("rtp.bin")[0:11] 391 with self.assertRaises(ValueError) as cm: 392 RtpPacket.parse(data) 393 self.assertEqual(str(cm.exception), "RTP packet length is less than 12 bytes") 394 395 def test_bad_version(self): 396 data = b"\xc0" + load("rtp.bin")[1:] 397 with self.assertRaises(ValueError) as cm: 398 RtpPacket.parse(data) 399 self.assertEqual(str(cm.exception), "RTP packet has invalid version") 400 401 402class RtpUtilTest(TestCase): 403 def test_clamp_packets_lost(self): 404 self.assertEqual(clamp_packets_lost(-8388609), -8388608) 405 self.assertEqual(clamp_packets_lost(-8388608), -8388608) 406 self.assertEqual(clamp_packets_lost(0), 0) 407 self.assertEqual(clamp_packets_lost(8388607), 8388607) 408 self.assertEqual(clamp_packets_lost(8388608), 8388607) 409 410 def test_pack_packets_lost(self): 411 self.assertEqual(pack_packets_lost(-8388608), b"\x80\x00\x00") 412 self.assertEqual(pack_packets_lost(-1), b"\xff\xff\xff") 413 self.assertEqual(pack_packets_lost(0), b"\x00\x00\x00") 414 self.assertEqual(pack_packets_lost(1), b"\x00\x00\x01") 415 self.assertEqual(pack_packets_lost(8388607), b"\x7f\xff\xff") 416 417 def test_pack_remb_fci(self): 418 # exponent = 0, mantissa = 0 419 data = pack_remb_fci(0, [2529072847]) 420 self.assertEqual(data, b"REMB\x01\x00\x00\x00\x96\xbe\x96\xcf") 421 422 # exponent = 0, mantissa = 0x3ffff 423 data = pack_remb_fci(0x3FFFF, [2529072847]) 424 self.assertEqual(data, b"REMB\x01\x03\xff\xff\x96\xbe\x96\xcf") 425 426 # exponent = 1, mantissa = 0 427 data = pack_remb_fci(0x40000, [2529072847]) 428 self.assertEqual(data, b"REMB\x01\x06\x00\x00\x96\xbe\x96\xcf") 429 430 data = pack_remb_fci(4160000, [2529072847]) 431 self.assertEqual(data, b"REMB\x01\x13\xf7\xa0\x96\xbe\x96\xcf") 432 433 # exponent = 63, mantissa = 0x3ffff 434 data = pack_remb_fci(0x3FFFF << 63, [2529072847]) 435 self.assertEqual(data, b"REMB\x01\xff\xff\xff\x96\xbe\x96\xcf") 436 437 def test_unpack_packets_lost(self): 438 self.assertEqual(unpack_packets_lost(b"\x80\x00\x00"), -8388608) 439 self.assertEqual(unpack_packets_lost(b"\xff\xff\xff"), -1) 440 self.assertEqual(unpack_packets_lost(b"\x00\x00\x00"), 0) 441 self.assertEqual(unpack_packets_lost(b"\x00\x00\x01"), 1) 442 self.assertEqual(unpack_packets_lost(b"\x7f\xff\xff"), 8388607) 443 444 def test_unpack_remb_fci(self): 445 # junk 446 with self.assertRaises(ValueError): 447 unpack_remb_fci(b"JUNK") 448 449 # exponent = 0, mantissa = 0 450 bitrate, ssrcs = unpack_remb_fci(b"REMB\x01\x00\x00\x00\x96\xbe\x96\xcf") 451 self.assertEqual(bitrate, 0) 452 self.assertEqual(ssrcs, [2529072847]) 453 454 # exponent = 0, mantissa = 0x3ffff 455 bitrate, ssrcs = unpack_remb_fci(b"REMB\x01\x03\xff\xff\x96\xbe\x96\xcf") 456 self.assertEqual(bitrate, 0x3FFFF) 457 self.assertEqual(ssrcs, [2529072847]) 458 459 # exponent = 1, mantissa = 0 460 bitrate, ssrcs = unpack_remb_fci(b"REMB\x01\x06\x00\x00\x96\xbe\x96\xcf") 461 self.assertEqual(bitrate, 0x40000) 462 self.assertEqual(ssrcs, [2529072847]) 463 464 # 4160000 bps 465 bitrate, ssrcs = unpack_remb_fci(b"REMB\x01\x13\xf7\xa0\x96\xbe\x96\xcf") 466 self.assertEqual(bitrate, 4160000) 467 self.assertEqual(ssrcs, [2529072847]) 468 469 # exponent = 63, mantissa = 0x3ffff 470 bitrate, ssrcs = unpack_remb_fci(b"REMB\x01\xff\xff\xff\x96\xbe\x96\xcf") 471 self.assertEqual(bitrate, 0x3FFFF << 63) 472 self.assertEqual(ssrcs, [2529072847]) 473 474 def test_unpack_header_extensions(self): 475 # none 476 self.assertEqual(unpack_header_extensions(0, None), []) 477 478 # one-byte, value 479 self.assertEqual(unpack_header_extensions(0xBEDE, b"\x900"), [(9, b"0")]) 480 481 # one-byte, value, padding, value 482 self.assertEqual( 483 unpack_header_extensions(0xBEDE, b"\x900\x00\x00\x301"), 484 [(9, b"0"), (3, b"1")], 485 ) 486 487 # one-byte, value, value 488 self.assertEqual( 489 unpack_header_extensions(0xBEDE, b"\x10\xc18sdparta_0"), 490 [(1, b"\xc1"), (3, b"sdparta_0")], 491 ) 492 493 # two-byte, value 494 self.assertEqual(unpack_header_extensions(0x1000, b"\xff\x010"), [(255, b"0")]) 495 496 # two-byte, value (1 byte), padding, value (2 bytes) 497 self.assertEqual( 498 unpack_header_extensions(0x1000, b"\xff\x010\x00\xf0\x0212"), 499 [(255, b"0"), (240, b"12")], 500 ) 501 502 def test_unpack_header_extensions_bad(self): 503 # one-byte, value (truncated) 504 with self.assertRaises(ValueError) as cm: 505 unpack_header_extensions(0xBEDE, b"\x90") 506 self.assertEqual( 507 str(cm.exception), "RTP one-byte header extension value is truncated" 508 ) 509 510 # two-byte (truncated) 511 with self.assertRaises(ValueError) as cm: 512 unpack_header_extensions(0x1000, b"\xff") 513 self.assertEqual( 514 str(cm.exception), "RTP two-byte header extension is truncated" 515 ) 516 517 # two-byte, value (truncated) 518 with self.assertRaises(ValueError) as cm: 519 unpack_header_extensions(0x1000, b"\xff\x020") 520 self.assertEqual( 521 str(cm.exception), "RTP two-byte header extension value is truncated" 522 ) 523 524 def test_pack_header_extensions(self): 525 # none 526 self.assertEqual(pack_header_extensions([]), (0, b"")) 527 528 # one-byte, single value 529 self.assertEqual( 530 pack_header_extensions([(9, b"0")]), (0xBEDE, b"\x900\x00\x00") 531 ) 532 533 # one-byte, two values 534 self.assertEqual( 535 pack_header_extensions([(1, b"\xc1"), (3, b"sdparta_0")]), 536 (0xBEDE, b"\x10\xc18sdparta_0"), 537 ) 538 539 # two-byte, single value 540 self.assertEqual( 541 pack_header_extensions([(255, b"0")]), (0x1000, b"\xff\x010\x00") 542 ) 543 544 def test_map_header_extensions(self): 545 data = bytearray( 546 [ 547 0x90, 548 0x64, 549 0x00, 550 0x58, 551 0x65, 552 0x43, 553 0x12, 554 0x78, 555 0x12, 556 0x34, 557 0x56, 558 0x78, # SSRC 559 0xBE, 560 0xDE, 561 0x00, 562 0x08, # Extension of size 8x32bit words. 563 0x40, 564 0xDA, # AudioLevel. 565 0x22, 566 0x01, 567 0x56, 568 0xCE, # TransmissionOffset. 569 0x62, 570 0x12, 571 0x34, 572 0x56, # AbsoluteSendTime. 573 0x81, 574 0xCE, 575 0xAB, # TransportSequenceNumber. 576 0xA0, 577 0x03, # VideoRotation. 578 0xB2, 579 0x12, 580 0x48, 581 0x76, # PlayoutDelayLimits. 582 0xC2, 583 0x72, 584 0x74, 585 0x78, # RtpStreamId 586 0xD5, 587 0x73, 588 0x74, 589 0x72, 590 0x65, 591 0x61, 592 0x6D, # RepairedRtpStreamId 593 0x00, 594 0x00, # Padding to 32bit boundary. 595 ] 596 ) 597 extensions_map = rtp.HeaderExtensionsMap() 598 extensions_map.configure( 599 RTCRtpParameters( 600 headerExtensions=[ 601 RTCRtpHeaderExtensionParameters( 602 id=2, uri="urn:ietf:params:rtp-hdrext:toffset" 603 ), 604 RTCRtpHeaderExtensionParameters( 605 id=4, uri="urn:ietf:params:rtp-hdrext:ssrc-audio-level" 606 ), 607 RTCRtpHeaderExtensionParameters( 608 id=6, 609 uri="http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time", 610 ), 611 RTCRtpHeaderExtensionParameters( 612 id=8, 613 uri="http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01", 614 ), 615 RTCRtpHeaderExtensionParameters( 616 id=12, uri="urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id" 617 ), 618 RTCRtpHeaderExtensionParameters( 619 id=13, 620 uri="urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id", 621 ), 622 ] 623 ) 624 ) 625 626 packet = RtpPacket.parse(data, extensions_map) 627 628 # check mapped values 629 self.assertEqual(packet.extensions.abs_send_time, 0x123456) 630 self.assertEqual(packet.extensions.audio_level, (True, 90)) 631 self.assertEqual(packet.extensions.mid, None) 632 self.assertEqual(packet.extensions.repaired_rtp_stream_id, "stream") 633 self.assertEqual(packet.extensions.rtp_stream_id, "rtx") 634 self.assertEqual(packet.extensions.transmission_offset, 0x156CE) 635 self.assertEqual(packet.extensions.transport_sequence_number, 0xCEAB) 636 637 # TODO: check 638 packet.serialize(extensions_map) 639 640 def test_rtx(self): 641 extensions_map = rtp.HeaderExtensionsMap() 642 extensions_map.configure( 643 RTCRtpParameters( 644 headerExtensions=[ 645 RTCRtpHeaderExtensionParameters( 646 id=9, uri="urn:ietf:params:rtp-hdrext:sdes:mid" 647 ) 648 ] 649 ) 650 ) 651 652 data = load("rtp_with_sdes_mid.bin") 653 packet = RtpPacket.parse(data, extensions_map) 654 655 # wrap / unwrap RTX 656 rtx = wrap_rtx(packet, payload_type=112, sequence_number=12345, ssrc=1234) 657 recovered = unwrap_rtx(rtx, payload_type=111, ssrc=4084547440) 658 659 # check roundtrip 660 self.assertEqual(recovered.version, packet.version) 661 self.assertEqual(recovered.marker, packet.marker) 662 self.assertEqual(recovered.payload_type, packet.payload_type) 663 self.assertEqual(recovered.sequence_number, packet.sequence_number) 664 self.assertEqual(recovered.timestamp, packet.timestamp) 665 self.assertEqual(recovered.ssrc, packet.ssrc) 666 self.assertEqual(recovered.csrc, packet.csrc) 667 self.assertEqual(recovered.extensions, packet.extensions) 668 self.assertEqual(recovered.payload, packet.payload) 669