1 /*
2 * rtp.cxx
3 *
4 * RTP protocol handler
5 *
6 * Open H323 Library
7 *
8 * Copyright (c) 1998-2000 Equivalence Pty. Ltd.
9 *
10 * The contents of this file are subject to the Mozilla Public License
11 * Version 1.0 (the "License"); you may not use this file except in
12 * compliance with the License. You may obtain a copy of the License at
13 * http://www.mozilla.org/MPL/
14 *
15 * Software distributed under the License is distributed on an "AS IS"
16 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
17 * the License for the specific language governing rights and limitations
18 * under the License.
19 *
20 * The Original Code is Open H323 Library.
21 *
22 * The Initial Developer of the Original Code is Equivalence Pty. Ltd.
23 *
24 * Portions of this code were written with the assisance of funding from
25 * Vovida Networks, Inc. http://www.vovida.com.
26 *
27 * Contributor(s): ______________________________________.
28 *
29 * $Revision: 29118 $
30 * $Author: rjongbloed $
31 * $Date: 2013-02-15 20:10:30 -0600 (Fri, 15 Feb 2013) $
32 */
33
34 #include <ptlib.h>
35
36 #ifdef __GNUC__
37 #pragma implementation "rtp.h"
38 #endif
39
40 #include <opal/buildopts.h>
41
42 #include <rtp/rtp.h>
43
44 #include <rtp/jitter.h>
45
46 #include <rtp/metrics.h>
47
48 #include <ptclib/random.h>
49 #include <ptclib/pstun.h>
50 #include <opal/rtpconn.h>
51
52 #include <algorithm>
53
54 #define new PNEW
55
56 #define BAD_TRANSMIT_TIME_MAX 10 // maximum of seconds of transmit fails before session is killed
57
58 const unsigned SecondsFrom1900to1970 = (70*365+17)*24*60*60U;
59
60 #define RTP_VIDEO_RX_BUFFER_SIZE 0x100000 // 1Mb
61 #define RTP_AUDIO_RX_BUFFER_SIZE 0x4000 // 16kb
62 #define RTP_DATA_TX_BUFFER_SIZE 0x2000 // 8kb
63 #define RTP_CTRL_BUFFER_SIZE 0x1000 // 4kb
64
65
66 PFACTORY_CREATE(PFactory<RTP_Encoding>, RTP_Encoding, "rtp/avp", false);
67
68
69 /////////////////////////////////////////////////////////////////////////////
70
RTP_DataFrame(PINDEX payloadSz,PINDEX bufferSz)71 RTP_DataFrame::RTP_DataFrame(PINDEX payloadSz, PINDEX bufferSz)
72 : PBYTEArray(std::max(bufferSz, MinHeaderSize+payloadSz))
73 , m_headerSize(MinHeaderSize)
74 , m_payloadSize(payloadSz)
75 , m_paddingSize(0)
76 {
77 theArray[0] = '\x80'; // Default to version 2
78 theArray[1] = '\x7f'; // Default to MaxPayloadType
79 }
80
81
RTP_DataFrame(const BYTE * data,PINDEX len,PBoolean dynamic)82 RTP_DataFrame::RTP_DataFrame(const BYTE * data, PINDEX len, PBoolean dynamic)
83 : PBYTEArray(data, len, dynamic)
84 , m_headerSize(MinHeaderSize)
85 , m_payloadSize(0)
86 , m_paddingSize(0)
87 {
88 SetPacketSize(len);
89 }
90
91
SetPacketSize(PINDEX sz)92 bool RTP_DataFrame::SetPacketSize(PINDEX sz)
93 {
94 if (sz < RTP_DataFrame::MinHeaderSize) {
95 PTRACE(2, "RTP\tInvalid RTP packet, "
96 "smaller than minimum header size, " << sz << " < " << RTP_DataFrame::MinHeaderSize);
97 m_payloadSize = m_paddingSize = 0;
98 return false;
99 }
100
101 m_headerSize = MinHeaderSize + 4*GetContribSrcCount();
102
103 if (GetExtension())
104 m_headerSize += (GetExtensionSizeDWORDs()+1)*4;
105
106 if (sz < m_headerSize) {
107 PTRACE(2, "RTP\tInvalid RTP packet, "
108 "smaller than indicated header size, " << sz << " < " << m_headerSize);
109 m_payloadSize = m_paddingSize = 0;
110 return false;
111 }
112
113 if (!GetPadding()) {
114 m_payloadSize = sz - m_headerSize;
115 return true;
116 }
117
118 /* We do this as some systems send crap at the end of the packet, giving
119 incorrect results for the padding size. So we do a sanity check that
120 the indicating padding size is not larger than the payload itself. Not
121 100% accurate, but you do whatever you can.
122 */
123 PINDEX pos = sz;
124 do {
125 if (pos-- <= m_headerSize) {
126 PTRACE(2, "RTP\tInvalid RTP packet, padding indicated but not enough data, "
127 "size=" << sz << ", header=" << m_headerSize);
128 m_payloadSize = m_paddingSize = 0;
129 return false;
130 }
131
132 m_paddingSize = theArray[pos] & 0xff;
133 } while (m_paddingSize > (pos-m_headerSize));
134
135 m_payloadSize = pos - m_headerSize - 1;
136
137 return true;
138 }
139
140
SetExtension(PBoolean ext)141 void RTP_DataFrame::SetExtension(PBoolean ext)
142 {
143 if (ext)
144 theArray[0] |= 0x10;
145 else
146 theArray[0] &= 0xef;
147 }
148
149
SetMarker(PBoolean m)150 void RTP_DataFrame::SetMarker(PBoolean m)
151 {
152 if (m)
153 theArray[1] |= 0x80;
154 else
155 theArray[1] &= 0x7f;
156 }
157
158
SetPayloadType(PayloadTypes t)159 void RTP_DataFrame::SetPayloadType(PayloadTypes t)
160 {
161 PAssert(t <= 0x7f, PInvalidParameter);
162
163 theArray[1] &= 0x80;
164 theArray[1] |= t;
165 }
166
167
GetContribSource(PINDEX idx) const168 DWORD RTP_DataFrame::GetContribSource(PINDEX idx) const
169 {
170 PAssert(idx < GetContribSrcCount(), PInvalidParameter);
171 return ((PUInt32b *)&theArray[MinHeaderSize])[idx];
172 }
173
174
SetContribSource(PINDEX idx,DWORD src)175 void RTP_DataFrame::SetContribSource(PINDEX idx, DWORD src)
176 {
177 PAssert(idx <= 15, PInvalidParameter);
178
179 if (idx >= GetContribSrcCount()) {
180 BYTE * oldPayload = GetPayloadPtr();
181 theArray[0] &= 0xf0;
182 theArray[0] |= idx+1;
183 m_headerSize += 4;
184 PINDEX sz = m_payloadSize + m_paddingSize;
185 SetMinSize(m_headerSize+sz);
186 memmove(GetPayloadPtr(), oldPayload, sz);
187 }
188
189 ((PUInt32b *)&theArray[MinHeaderSize])[idx] = src;
190 }
191
192
GetExtensionType() const193 int RTP_DataFrame::GetExtensionType() const
194 {
195 if (GetExtension())
196 return *(PUInt16b *)&theArray[MinHeaderSize + 4*GetContribSrcCount()];
197
198 return -1;
199 }
200
201
SetExtensionType(int type)202 void RTP_DataFrame::SetExtensionType(int type)
203 {
204 if (type < 0)
205 SetExtension(false);
206 else {
207 if (!GetExtension())
208 SetExtensionSizeDWORDs(0);
209 *(PUInt16b *)&theArray[MinHeaderSize + 4*GetContribSrcCount()] = (WORD)type;
210 }
211 }
212
213
GetExtensionSizeDWORDs() const214 PINDEX RTP_DataFrame::GetExtensionSizeDWORDs() const
215 {
216 if (GetExtension())
217 return *(PUInt16b *)&theArray[MinHeaderSize + 4*GetContribSrcCount() + 2];
218
219 return 0;
220 }
221
222
SetExtensionSizeDWORDs(PINDEX sz)223 PBoolean RTP_DataFrame::SetExtensionSizeDWORDs(PINDEX sz)
224 {
225 m_headerSize = MinHeaderSize + 4*GetContribSrcCount() + (sz+1)*4;
226 if (!SetMinSize(m_headerSize+m_payloadSize+m_paddingSize))
227 return false;
228
229 SetExtension(true);
230 *(PUInt16b *)&theArray[MinHeaderSize + 4*GetContribSrcCount() + 2] = (WORD)sz;
231 return true;
232 }
233
234
GetExtensionPtr() const235 BYTE * RTP_DataFrame::GetExtensionPtr() const
236 {
237 if (GetExtension())
238 return (BYTE *)&theArray[MinHeaderSize + 4*GetContribSrcCount() + 4];
239
240 return NULL;
241 }
242
243
SetPayloadSize(PINDEX sz)244 bool RTP_DataFrame::SetPayloadSize(PINDEX sz)
245 {
246 m_payloadSize = sz;
247 return SetMinSize(m_headerSize+m_payloadSize+m_paddingSize);
248 }
249
250
SetPaddingSize(PINDEX sz)251 bool RTP_DataFrame::SetPaddingSize(PINDEX sz)
252 {
253 m_paddingSize = sz;
254 return SetMinSize(m_headerSize+m_payloadSize+m_paddingSize);
255 }
256
257
PrintOn(ostream & strm) const258 void RTP_DataFrame::PrintOn(ostream & strm) const
259 {
260 strm << "V=" << GetVersion()
261 << " X=" << GetExtension()
262 << " M=" << GetMarker()
263 << " PT=" << GetPayloadType()
264 << " SN=" << GetSequenceNumber()
265 << " TS=" << GetTimestamp()
266 << " SSRC=" << hex << GetSyncSource() << dec
267 << " size=" << GetPayloadSize()
268 << '\n';
269
270 int csrcCount = GetContribSrcCount();
271 for (int csrc = 0; csrc < csrcCount; csrc++)
272 strm << " CSRC[" << csrc << "]=" << GetContribSource(csrc) << '\n';
273
274 if (GetExtension())
275 strm << " Header Extension Type: " << GetExtensionType() << '\n'
276 << hex << setfill('0') << PBYTEArray(GetExtensionPtr(), GetExtensionSizeDWORDs()*4, false) << setfill(' ') << dec << '\n';
277
278 strm << hex << setfill('0') << PBYTEArray(GetPayloadPtr(), GetPayloadSize(), false) << setfill(' ') << dec;
279 }
280
281
282 #if PTRACING
283 static const char * const PayloadTypesNames[RTP_DataFrame::LastKnownPayloadType] = {
284 "PCMU",
285 "FS1016",
286 "G721",
287 "GSM",
288 "G723",
289 "DVI4_8k",
290 "DVI4_16k",
291 "LPC",
292 "PCMA",
293 "G722",
294 "L16_Stereo",
295 "L16_Mono",
296 "G723",
297 "CN",
298 "MPA",
299 "G728",
300 "DVI4_11k",
301 "DVI4_22k",
302 "G729",
303 "CiscoCN",
304 NULL, NULL, NULL, NULL, NULL,
305 "CelB",
306 "JPEG",
307 NULL, NULL, NULL, NULL,
308 "H261",
309 "MPV",
310 "MP2T",
311 "H263",
312 NULL, NULL, NULL,
313 "T38"
314 };
315
operator <<(ostream & o,RTP_DataFrame::PayloadTypes t)316 ostream & operator<<(ostream & o, RTP_DataFrame::PayloadTypes t)
317 {
318 if ((PINDEX)t < PARRAYSIZE(PayloadTypesNames) && PayloadTypesNames[t] != NULL)
319 o << PayloadTypesNames[t];
320 else
321 o << "[pt=" << (int)t << ']';
322 return o;
323 }
324
325 #endif
326
327
328 /////////////////////////////////////////////////////////////////////////////
329
RTP_ControlFrame(PINDEX sz)330 RTP_ControlFrame::RTP_ControlFrame(PINDEX sz)
331 : PBYTEArray(sz)
332 {
333 compoundOffset = 0;
334 payloadSize = 0;
335 }
336
Reset(PINDEX size)337 void RTP_ControlFrame::Reset(PINDEX size)
338 {
339 SetSize(size);
340 compoundOffset = 0;
341 payloadSize = 0;
342 }
343
344
SetCount(unsigned count)345 void RTP_ControlFrame::SetCount(unsigned count)
346 {
347 PAssert(count < 32, PInvalidParameter);
348 theArray[compoundOffset] &= 0xe0;
349 theArray[compoundOffset] |= count;
350 }
351
352
SetFbType(unsigned type,PINDEX fciSize)353 void RTP_ControlFrame::SetFbType(unsigned type, PINDEX fciSize)
354 {
355 PAssert(type < 32, PInvalidParameter);
356 theArray[compoundOffset] &= 0xe0;
357 theArray[compoundOffset] |= type;
358 SetPayloadSize(fciSize+8);
359 }
360
361
SetPayloadType(unsigned t)362 void RTP_ControlFrame::SetPayloadType(unsigned t)
363 {
364 PAssert(t < 256, PInvalidParameter);
365 theArray[compoundOffset+1] = (BYTE)t;
366 }
367
GetCompoundSize() const368 PINDEX RTP_ControlFrame::GetCompoundSize() const
369 {
370 // transmitted length is the offset of the last compound block
371 // plus the compound length of the last block
372 return compoundOffset + *(PUInt16b *)&theArray[compoundOffset+2]*4;
373 }
374
SetPayloadSize(PINDEX sz)375 void RTP_ControlFrame::SetPayloadSize(PINDEX sz)
376 {
377 payloadSize = sz;
378
379 // compound size is in words, rounded up to nearest word
380 PINDEX compoundSize = (payloadSize + 3) & ~3;
381 PAssert(compoundSize <= 0xffff, PInvalidParameter);
382
383 // make sure buffer is big enough for previous packets plus packet header plus payload
384 SetMinSize(compoundOffset + 4 + 4*(compoundSize));
385
386 // put the new compound size into the packet (always at offset 2)
387 *(PUInt16b *)&theArray[compoundOffset+2] = (WORD)(compoundSize / 4);
388 }
389
GetPayloadPtr() const390 BYTE * RTP_ControlFrame::GetPayloadPtr() const
391 {
392 // payload for current packet is always one DWORD after the current compound start
393 if ((GetPayloadSize() == 0) || ((compoundOffset + 4) >= GetSize()))
394 return NULL;
395 return (BYTE *)(theArray + compoundOffset + 4);
396 }
397
ReadNextPacket()398 PBoolean RTP_ControlFrame::ReadNextPacket()
399 {
400 // skip over current packet
401 compoundOffset += GetPayloadSize() + 4;
402
403 // see if another packet is feasible
404 if (compoundOffset + 4 > GetSize())
405 return false;
406
407 // check if payload size for new packet is legal
408 return compoundOffset + GetPayloadSize() + 4 <= GetSize();
409 }
410
411
StartNewPacket()412 PBoolean RTP_ControlFrame::StartNewPacket()
413 {
414 // allocate storage for new packet header
415 if (!SetMinSize(compoundOffset + 4))
416 return false;
417
418 theArray[compoundOffset] = '\x80'; // Set version 2
419 theArray[compoundOffset+1] = 0; // Set payload type to illegal
420 theArray[compoundOffset+2] = 0; // Set payload size to zero
421 theArray[compoundOffset+3] = 0;
422
423 // payload is now zero bytes
424 payloadSize = 0;
425 SetPayloadSize(payloadSize);
426
427 return true;
428 }
429
EndPacket()430 void RTP_ControlFrame::EndPacket()
431 {
432 // all packets must align to DWORD boundaries
433 while (((4 + payloadSize) & 3) != 0) {
434 theArray[compoundOffset + 4 + payloadSize - 1] = 0;
435 ++payloadSize;
436 }
437
438 compoundOffset += 4 + payloadSize;
439 payloadSize = 0;
440 }
441
StartSourceDescription(DWORD src)442 void RTP_ControlFrame::StartSourceDescription(DWORD src)
443 {
444 // extend payload to include SSRC + END
445 SetPayloadSize(payloadSize + 4 + 1);
446 SetPayloadType(RTP_ControlFrame::e_SourceDescription);
447 SetCount(GetCount()+1); // will be incremented automatically
448
449 // get ptr to new item SDES
450 BYTE * payload = GetPayloadPtr();
451 *(PUInt32b *)payload = src;
452 payload[4] = e_END;
453 }
454
455
AddSourceDescriptionItem(unsigned type,const PString & data)456 void RTP_ControlFrame::AddSourceDescriptionItem(unsigned type, const PString & data)
457 {
458 // get ptr to new item, remembering that END was inserted previously
459 BYTE * payload = GetPayloadPtr() + payloadSize - 1;
460
461 // length of new item
462 PINDEX dataLength = data.GetLength();
463
464 // add storage for new item (note that END has already been included)
465 SetPayloadSize(payloadSize + 1 + 1 + dataLength);
466
467 // insert new item
468 payload[0] = (BYTE)type;
469 payload[1] = (BYTE)dataLength;
470 memcpy(payload+2, (const char *)data, dataLength);
471
472 // insert new END
473 payload[2+dataLength] = (BYTE)e_END;
474 }
475
476
SetLostPackets(unsigned packets)477 void RTP_ControlFrame::ReceiverReport::SetLostPackets(unsigned packets)
478 {
479 lost[0] = (BYTE)(packets >> 16);
480 lost[1] = (BYTE)(packets >> 8);
481 lost[2] = (BYTE)packets;
482 }
483
484
485 ///////////////////////////////////////////////////////////////////////////////
486
487 #if OPAL_STATISTICS
488
OpalMediaStatistics()489 OpalMediaStatistics::OpalMediaStatistics()
490 : m_totalBytes(0)
491 , m_totalPackets(0)
492 , m_packetsLost(0)
493 , m_packetsOutOfOrder(0)
494 , m_packetsTooLate(0)
495 , m_packetOverruns(0)
496 , m_minimumPacketTime(0)
497 , m_averagePacketTime(0)
498 , m_maximumPacketTime(0)
499
500 // Audio
501 , m_averageJitter(0)
502 , m_maximumJitter(0)
503 , m_jitterBufferDelay(0)
504
505 // Video
506 , m_totalFrames(0)
507 , m_keyFrames(0)
508 {
509 }
510
511 #if OPAL_FAX
Fax()512 OpalMediaStatistics::Fax::Fax()
513 : m_result(OpalMediaStatistics::FaxNotStarted)
514 , m_phase(' ')
515 , m_bitRate(9600)
516 , m_compression(FaxCompressionUnknown)
517 , m_errorCorrection(false)
518 , m_txPages(-1)
519 , m_rxPages(-1)
520 , m_totalPages(0)
521 , m_imageSize(0)
522 , m_resolutionX(0)
523 , m_resolutionY(0)
524 , m_pageWidth(0)
525 , m_pageHeight(0)
526 , m_badRows(0)
527 , m_mostBadRows(0)
528 , m_errorCorrectionRetries(0)
529 {
530 }
531
operator <<(ostream & strm,OpalMediaStatistics::FaxCompression compression)532 ostream & operator<<(ostream & strm, OpalMediaStatistics::FaxCompression compression)
533 {
534 static const char * const Names[] = { "N/A", "T.4 1d", "T.4 2d", "T.6" };
535 if (compression >= 0 && compression < PARRAYSIZE(Names))
536 strm << Names[compression];
537 else
538 strm << (unsigned)compression;
539 return strm;
540 }
541 #endif
542
543 #endif
544
545 /////////////////////////////////////////////////////////////////////////////
546
OnTxStatistics(const RTP_Session &) const547 void RTP_UserData::OnTxStatistics(const RTP_Session & /*session*/) const
548 {
549 }
550
551
OnRxStatistics(const RTP_Session &) const552 void RTP_UserData::OnRxStatistics(const RTP_Session & /*session*/) const
553 {
554 }
555
SessionFailing(RTP_Session &)556 void RTP_UserData::SessionFailing(RTP_Session & /*session*/)
557 {
558 }
559
560 #if OPAL_VIDEO
OnRxIntraFrameRequest(const RTP_Session &) const561 void RTP_UserData::OnRxIntraFrameRequest(const RTP_Session & /*session*/) const
562 {
563 }
564
OnTxIntraFrameRequest(const RTP_Session &) const565 void RTP_UserData::OnTxIntraFrameRequest(const RTP_Session & /*session*/) const
566 {
567 }
568 #endif
569
570
571 /////////////////////////////////////////////////////////////////////////////
572
573 #if P_CONFIG_FILE
GetDefaultOutOfOrderWaitTime()574 static PTimeInterval GetDefaultOutOfOrderWaitTime()
575 {
576 static PTimeInterval ooowt(PConfig(PConfig::Environment).GetInteger("OPAL_RTP_OUT_OF_ORDER_TIME", 100));
577 return ooowt;
578 }
579 #else
580 #define GetDefaultOutOfOrderWaitTime() (100)
581 #endif
582
RTP_Session(const Params & params)583 RTP_Session::RTP_Session(const Params & params)
584 : m_timeUnits(params.isAudio ? 8 : 90)
585 , canonicalName(PProcess::Current().GetUserName())
586 , toolName(PProcess::Current().GetName())
587 , lastSRTimestamp(0)
588 , lastSRReceiveTime(0)
589 , outOfOrderWaitTime(GetDefaultOutOfOrderWaitTime())
590 , firstPacketSent(0)
591 , firstPacketReceived(0)
592 #if OPAL_RTCP_XR
593 , m_metrics(NULL)
594 #endif
595 , m_reportTimer(0, 12) // Seconds
596 , failed(false)
597 {
598 PAssert(params.id > 0, PInvalidParameter);
599 sessionID = params.id;
600 isAudio = params.isAudio;
601
602 userData = params.userData;
603 autoDeleteUserData = params.autoDelete;
604
605 ignorePayloadTypeChanges = true;
606 syncSourceOut = PRandom::Number();
607
608 timeStampOffs = 0;
609 oobTimeStampBaseEstablished = false;
610 lastSentPacketTime = PTimer::Tick();
611
612 syncSourceIn = 0;
613 allowAnySyncSource = true;
614 allowOneSyncSourceChange = false;
615 allowRemoteTransmitAddressChange = false;
616 allowSequenceChange = false;
617 txStatisticsInterval = 100; // Number of data packets between tx reports
618 rxStatisticsInterval = 100; // Number of data packets between rx reports
619 lastSentSequenceNumber = (WORD)PRandom::Number();
620 expectedSequenceNumber = 0;
621 lastRRSequenceNumber = 0;
622 resequenceOutOfOrderPackets = true;
623 senderReportsReceived = 0;
624 consecutiveOutOfOrderPackets = 0;
625
626 ClearStatistics();
627
628 lastReceivedPayloadType = RTP_DataFrame::IllegalPayloadType;
629
630 closeOnBye = false;
631 byeSent = false;
632
633 lastSentTimestamp = 0; // should be calculated, but we'll settle for initialising it
634
635 m_encodingHandler = NULL;
636 SetEncoding(params.encoding);
637
638 m_reportTimer.SetNotifier(PCREATE_NOTIFIER(SendReport));
639 }
640
641
~RTP_Session()642 RTP_Session::~RTP_Session()
643 {
644 m_reportTimer.Stop(true);
645
646 #if OPAL_RTCP_XR
647 delete m_metrics;
648 #endif
649
650 #if PTRACING
651 PTime now;
652 int sentDuration = (now-firstPacketSent).GetSeconds();
653 if (sentDuration == 0)
654 sentDuration = 1;
655 int receiveDuration = (now-firstPacketReceived).GetSeconds();
656 if (receiveDuration == 0)
657 receiveDuration = 1;
658 #endif
659 PTRACE_IF(3, packetsSent != 0 || packetsReceived != 0,
660 "RTP\tSession " << sessionID << ", final statistics:\n"
661 " firstPacketSent = " << firstPacketSent << "\n"
662 " packetsSent = " << packetsSent << "\n"
663 " octetsSent = " << octetsSent << "\n"
664 " bitRateSent = " << (8*octetsSent/sentDuration) << "\n"
665 " averageSendTime = " << averageSendTime << "\n"
666 " maximumSendTime = " << maximumSendTime << "\n"
667 " minimumSendTime = " << minimumSendTime << "\n"
668 " packetsLostByRemote= " << packetsLostByRemote << "\n"
669 " jitterLevelOnRemote= " << jitterLevelOnRemote << "\n"
670 " firstPacketReceived= " << firstPacketReceived << "\n"
671 " packetsReceived = " << packetsReceived << "\n"
672 " octetsReceived = " << octetsReceived << "\n"
673 " bitRateReceived = " << (8*octetsReceived/receiveDuration) << "\n"
674 " packetsLost = " << packetsLost << "\n"
675 " packetsTooLate = " << GetPacketsTooLate() << "\n"
676 " packetOverruns = " << GetPacketOverruns() << "\n"
677 " packetsOutOfOrder = " << packetsOutOfOrder << "\n"
678 " averageReceiveTime = " << averageReceiveTime << "\n"
679 " maximumReceiveTime = " << maximumReceiveTime << "\n"
680 " minimumReceiveTime = " << minimumReceiveTime << "\n"
681 " averageJitter = " << GetAvgJitterTime() << "\n"
682 " maximumJitter = " << GetMaxJitterTime()
683 );
684 if (autoDeleteUserData)
685 delete userData;
686 delete m_encodingHandler;
687 }
688
689
ClearStatistics()690 void RTP_Session::ClearStatistics()
691 {
692 firstPacketSent.SetTimestamp(0);
693 packetsSent = 0;
694 rtcpPacketsSent = 0;
695 octetsSent = 0;
696 firstPacketReceived.SetTimestamp(0);
697 packetsReceived = 0;
698 octetsReceived = 0;
699 packetsLost = 0;
700 packetsLostByRemote = 0;
701 packetsOutOfOrder = 0;
702 averageSendTime = 0;
703 maximumSendTime = 0;
704 minimumSendTime = 0;
705 averageReceiveTime = 0;
706 maximumReceiveTime = 0;
707 minimumReceiveTime = 0;
708 jitterLevel = 0;
709 maximumJitterLevel = 0;
710 jitterLevelOnRemote = 0;
711 markerRecvCount = 0;
712 markerSendCount = 0;
713
714 txStatisticsCount = 0;
715 rxStatisticsCount = 0;
716 averageSendTimeAccum = 0;
717 maximumSendTimeAccum = 0;
718 minimumSendTimeAccum = 0xffffffff;
719 averageReceiveTimeAccum = 0;
720 maximumReceiveTimeAccum = 0;
721 minimumReceiveTimeAccum = 0xffffffff;
722 packetsLostSinceLastRR = 0;
723 lastTransitTime = 0;
724 }
725
726
SendBYE()727 void RTP_Session::SendBYE()
728 {
729 {
730 PWaitAndSignal mutex(dataMutex);
731 if (byeSent)
732 return;
733
734 byeSent = true;
735 }
736
737 RTP_ControlFrame report;
738 InsertReportPacket(report);
739
740 static char const ReasonStr[] = "Session ended";
741 static size_t ReasonLen = sizeof(ReasonStr);
742
743 // insert BYE
744 report.StartNewPacket();
745 report.SetPayloadType(RTP_ControlFrame::e_Goodbye);
746 report.SetPayloadSize(4+1+ReasonLen); // length is SSRC + ReasonLen + reason
747
748 BYTE * payload = report.GetPayloadPtr();
749
750 // one SSRC
751 report.SetCount(1);
752 *(PUInt32b *)payload = syncSourceOut;
753
754 // insert reason
755 payload[4] = (BYTE)ReasonLen;
756 memcpy((char *)(payload+5), ReasonStr, ReasonLen);
757
758 report.EndPacket();
759 WriteControl(report);
760 }
761
GetCanonicalName() const762 PString RTP_Session::GetCanonicalName() const
763 {
764 PWaitAndSignal mutex(m_reportMutex);
765 PString s = canonicalName;
766 s.MakeUnique();
767 return s;
768 }
769
770
SetCanonicalName(const PString & name)771 void RTP_Session::SetCanonicalName(const PString & name)
772 {
773 PWaitAndSignal mutex(m_reportMutex);
774 canonicalName = name;
775 canonicalName.MakeUnique();
776 }
777
778
GetToolName() const779 PString RTP_Session::GetToolName() const
780 {
781 PWaitAndSignal mutex(m_reportMutex);
782 PString s = toolName;
783 s.MakeUnique();
784 return s;
785 }
786
787
SetToolName(const PString & name)788 void RTP_Session::SetToolName(const PString & name)
789 {
790 PWaitAndSignal mutex(m_reportMutex);
791 toolName = name;
792 toolName.MakeUnique();
793 }
794
795
SetUserData(RTP_UserData * data,PBoolean autoDelete)796 void RTP_Session::SetUserData(RTP_UserData * data, PBoolean autoDelete)
797 {
798 if (autoDeleteUserData)
799 delete userData;
800 userData = data;
801 autoDeleteUserData = autoDelete;
802 }
803
804
SetJitterBufferSize(unsigned minJitterDelay,unsigned maxJitterDelay,unsigned timeUnits,PINDEX packetSize)805 void RTP_Session::SetJitterBufferSize(unsigned minJitterDelay,
806 unsigned maxJitterDelay,
807 unsigned timeUnits,
808 PINDEX packetSize)
809 {
810 PWaitAndSignal mutex(dataMutex);
811
812 if (timeUnits > 0)
813 m_timeUnits = timeUnits;
814
815 if (minJitterDelay == 0 && maxJitterDelay == 0) {
816 PTRACE_IF(4, m_jitterBuffer != NULL, "RTP\tSwitching off jitter buffer " << *m_jitterBuffer);
817 // This can block waiting for JB thread to end, signal mutex to avoid deadlock
818 dataMutex.Signal();
819 m_jitterBuffer.SetNULL();
820 dataMutex.Wait();
821 }
822 else {
823 resequenceOutOfOrderPackets = false;
824 FlushData();
825 if (m_jitterBuffer != NULL) {
826 PTRACE(4, "RTP\tSetting jitter buffer time from " << minJitterDelay << " to " << maxJitterDelay);
827 m_jitterBuffer->SetDelay(minJitterDelay, maxJitterDelay, packetSize);
828 }
829 else {
830 m_jitterBuffer = new RTP_JitterBuffer(*this, minJitterDelay, maxJitterDelay, m_timeUnits, packetSize);
831 PTRACE(4, "RTP\tCreated RTP jitter buffer " << *m_jitterBuffer);
832 m_jitterBuffer->StartThread();
833 }
834 }
835 }
836
837
GetJitterBufferSize() const838 unsigned RTP_Session::GetJitterBufferSize() const
839 {
840 JitterBufferPtr jitter = m_jitterBuffer; // Increase reference count
841 return jitter != NULL ? jitter->GetCurrentJitterDelay() : 0;
842 }
843
844
ReadBufferedData(RTP_DataFrame & frame)845 PBoolean RTP_Session::ReadBufferedData(RTP_DataFrame & frame)
846 {
847 JitterBufferPtr jitter = m_jitterBuffer; // Increase reference count
848 if (jitter != NULL)
849 return jitter->ReadData(frame);
850
851 if (m_outOfOrderPackets.empty())
852 return ReadData(frame);
853
854 unsigned sequenceNumber = m_outOfOrderPackets.back().GetSequenceNumber();
855 if (sequenceNumber != expectedSequenceNumber) {
856 PTRACE(5, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
857 << ", still out of order packets, next "
858 << sequenceNumber << " expected " << expectedSequenceNumber);
859 return ReadData(frame);
860 }
861
862 frame = m_outOfOrderPackets.back();
863 m_outOfOrderPackets.pop_back();
864 expectedSequenceNumber = (WORD)(sequenceNumber + 1);
865
866 PTRACE(m_outOfOrderPackets.empty() ? 2 : 5,
867 "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn << ", resequenced "
868 << (m_outOfOrderPackets.empty() ? "last" : "next") << " out of order packet " << sequenceNumber);
869 return true;
870 }
871
872
FlushData()873 void RTP_Session::FlushData()
874 {
875 }
876
877
SetTxStatisticsInterval(unsigned packets)878 void RTP_Session::SetTxStatisticsInterval(unsigned packets)
879 {
880 txStatisticsInterval = PMAX(packets, 2);
881 txStatisticsCount = 0;
882 averageSendTimeAccum = 0;
883 maximumSendTimeAccum = 0;
884 minimumSendTimeAccum = 0xffffffff;
885 }
886
887
SetRxStatisticsInterval(unsigned packets)888 void RTP_Session::SetRxStatisticsInterval(unsigned packets)
889 {
890 rxStatisticsInterval = PMAX(packets, 2);
891 rxStatisticsCount = 0;
892 averageReceiveTimeAccum = 0;
893 maximumReceiveTimeAccum = 0;
894 minimumReceiveTimeAccum = 0xffffffff;
895 }
896
897
AddReceiverReport(RTP_ControlFrame::ReceiverReport & receiver)898 void RTP_Session::AddReceiverReport(RTP_ControlFrame::ReceiverReport & receiver)
899 {
900 receiver.ssrc = syncSourceIn;
901 receiver.SetLostPackets(GetPacketsLost()+GetPacketsTooLate());
902
903 if (expectedSequenceNumber > lastRRSequenceNumber)
904 receiver.fraction = (BYTE)((packetsLostSinceLastRR<<8)/(expectedSequenceNumber - lastRRSequenceNumber));
905 else
906 receiver.fraction = 0;
907 packetsLostSinceLastRR = 0;
908
909 receiver.last_seq = lastRRSequenceNumber;
910 lastRRSequenceNumber = expectedSequenceNumber;
911
912 receiver.jitter = jitterLevel >> JitterRoundingGuardBits; // Allow for rounding protection bits
913
914 if (senderReportsReceived > 0) {
915 // Calculate the last SR timestamp
916 PUInt32b lsr_ntp_sec = (DWORD)(lastSRTimestamp.GetTimeInSeconds()+SecondsFrom1900to1970); // Convert from 1970 to 1900
917 PUInt32b lsr_ntp_frac = lastSRTimestamp.GetMicrosecond()*4294; // Scale microseconds to "fraction" from 0 to 2^32
918 receiver.lsr = (((lsr_ntp_sec << 16) & 0xFFFF0000) | ((lsr_ntp_frac >> 16) & 0x0000FFFF));
919
920 // Calculate the delay since last SR
921 PTime now;
922 delaySinceLastSR = now - lastSRReceiveTime;
923 receiver.dlsr = (DWORD)(delaySinceLastSR.GetMilliSeconds()*65536/1000);
924 }
925 else {
926 receiver.lsr = 0;
927 receiver.dlsr = 0;
928 }
929
930 PTRACE(3, "RTP\tSession " << sessionID << ", SentReceiverReport:"
931 " ssrc=" << receiver.ssrc
932 << " fraction=" << (unsigned)receiver.fraction
933 << " lost=" << receiver.GetLostPackets()
934 << " last_seq=" << receiver.last_seq
935 << " jitter=" << receiver.jitter
936 << " lsr=" << receiver.lsr
937 << " dlsr=" << receiver.dlsr);
938 }
939
940
OnSendData(RTP_DataFrame & frame)941 RTP_Session::SendReceiveStatus RTP_Session::OnSendData(RTP_DataFrame & frame)
942 {
943 return EncodingLock(*this)->OnSendData(frame);
944 }
945
946
Internal_OnSendData(RTP_DataFrame & frame)947 RTP_Session::SendReceiveStatus RTP_Session::Internal_OnSendData(RTP_DataFrame & frame)
948 {
949 PWaitAndSignal mutex(dataMutex);
950
951 PTimeInterval tick = PTimer::Tick(); // Timestamp set now
952
953 frame.SetSequenceNumber(++lastSentSequenceNumber);
954 frame.SetSyncSource(syncSourceOut);
955
956 DWORD frameTimestamp = frame.GetTimestamp();
957
958 // special handling for first packet
959 if (packetsSent == 0) {
960
961 // establish timestamp offset
962 if (oobTimeStampBaseEstablished) {
963 timeStampOffs = oobTimeStampOutBase - frameTimestamp + ((PTimer::Tick() - oobTimeStampBase).GetInterval() * m_timeUnits);
964 frameTimestamp += timeStampOffs;
965 }
966 else {
967 oobTimeStampBaseEstablished = true;
968 timeStampOffs = 0;
969 oobTimeStampOutBase = frameTimestamp;
970 oobTimeStampBase = PTimer::Tick();
971 }
972
973 firstPacketSent.SetCurrentTime();
974
975 // display stuff
976 PTRACE(3, "RTP\tSession " << sessionID << ", first sent data:"
977 " ver=" << frame.GetVersion()
978 << " pt=" << frame.GetPayloadType()
979 << " psz=" << frame.GetPayloadSize()
980 << " m=" << frame.GetMarker()
981 << " x=" << frame.GetExtension()
982 << " seq=" << frame.GetSequenceNumber()
983 << " ts=" << frameTimestamp
984 << " src=" << hex << frame.GetSyncSource()
985 << " ccnt=" << frame.GetContribSrcCount() << dec);
986 }
987
988 else {
989 // set timestamp
990 frameTimestamp += timeStampOffs;
991
992 // reset OOB timestamp every marker bit
993 if (frame.GetMarker()) {
994 oobTimeStampOutBase = frameTimestamp;
995 oobTimeStampBase = PTimer::Tick();
996 }
997
998 // Only do statistics on subsequent packets
999 if ( ! (isAudio && frame.GetMarker()) ) {
1000 DWORD diff = (tick - lastSentPacketTime).GetInterval();
1001
1002 averageSendTimeAccum += diff;
1003 if (diff > maximumSendTimeAccum)
1004 maximumSendTimeAccum = diff;
1005 if (diff < minimumSendTimeAccum)
1006 minimumSendTimeAccum = diff;
1007 txStatisticsCount++;
1008 }
1009 }
1010
1011 frame.SetTimestamp(frameTimestamp);
1012 lastSentTimestamp = frameTimestamp;
1013 lastSentPacketTime = tick;
1014
1015 octetsSent += frame.GetPayloadSize();
1016 packetsSent++;
1017
1018 if (frame.GetMarker())
1019 markerSendCount++;
1020
1021 // Call the statistics call-back on the first PDU with total count == 1
1022 if (packetsSent == 1 && userData != NULL)
1023 userData->OnTxStatistics(*this);
1024
1025 if (txStatisticsCount < txStatisticsInterval)
1026 return e_ProcessPacket;
1027
1028 txStatisticsCount = 0;
1029
1030 averageSendTime = averageSendTimeAccum/txStatisticsInterval;
1031 maximumSendTime = maximumSendTimeAccum;
1032 minimumSendTime = minimumSendTimeAccum;
1033
1034 averageSendTimeAccum = 0;
1035 maximumSendTimeAccum = 0;
1036 minimumSendTimeAccum = 0xffffffff;
1037
1038 PTRACE(3, "RTP\tSession " << sessionID << ", transmit statistics: "
1039 " packets=" << packetsSent <<
1040 " octets=" << octetsSent <<
1041 " avgTime=" << averageSendTime <<
1042 " maxTime=" << maximumSendTime <<
1043 " minTime=" << minimumSendTime
1044 );
1045
1046 if (userData != NULL)
1047 userData->OnTxStatistics(*this);
1048
1049 return e_ProcessPacket;
1050 }
1051
OnSendControl(RTP_ControlFrame & frame,PINDEX & len)1052 RTP_Session::SendReceiveStatus RTP_Session::OnSendControl(RTP_ControlFrame & frame, PINDEX & len)
1053 {
1054 return EncodingLock(*this)->OnSendControl(frame, len);
1055 }
1056
1057 #if OPAL_VIDEO
Internal_OnSendControl(RTP_ControlFrame & frame,PINDEX &)1058 RTP_Session::SendReceiveStatus RTP_Session::Internal_OnSendControl(RTP_ControlFrame & frame, PINDEX & /*len*/)
1059 {
1060 rtcpPacketsSent++;
1061
1062 if(frame.GetPayloadType() == RTP_ControlFrame::e_IntraFrameRequest && userData != NULL)
1063 userData->OnTxIntraFrameRequest(*this);
1064
1065 return e_ProcessPacket;
1066 }
1067 #else
Internal_OnSendControl(RTP_ControlFrame &,PINDEX &)1068 RTP_Session::SendReceiveStatus RTP_Session::Internal_OnSendControl(RTP_ControlFrame & /*frame*/, PINDEX & /*len*/)
1069 {
1070 rtcpPacketsSent++;
1071 return e_ProcessPacket;
1072 }
1073 #endif
1074
1075
OnReceiveData(RTP_DataFrame & frame)1076 RTP_Session::SendReceiveStatus RTP_Session::OnReceiveData(RTP_DataFrame & frame)
1077 {
1078 return EncodingLock(*this)->OnReceiveData(frame);
1079 }
1080
Internal_OnReceiveData(RTP_DataFrame & frame)1081 RTP_Session::SendReceiveStatus RTP_Session::Internal_OnReceiveData(RTP_DataFrame & frame)
1082 {
1083 // Check that the PDU is the right version
1084 if (frame.GetVersion() != RTP_DataFrame::ProtocolVersion)
1085 return e_IgnorePacket; // Non fatal error, just ignore
1086
1087 // Check if expected payload type
1088 if (lastReceivedPayloadType == RTP_DataFrame::IllegalPayloadType)
1089 lastReceivedPayloadType = frame.GetPayloadType();
1090
1091 if (lastReceivedPayloadType != frame.GetPayloadType() && !ignorePayloadTypeChanges) {
1092
1093 PTRACE(4, "RTP\tSession " << sessionID << ", got payload type "
1094 << frame.GetPayloadType() << ", but was expecting " << lastReceivedPayloadType);
1095 return e_IgnorePacket;
1096 }
1097
1098 // Check for if a control packet rather than data packet.
1099 if (frame.GetPayloadType() > RTP_DataFrame::MaxPayloadType)
1100 return e_IgnorePacket; // Non fatal error, just ignore
1101
1102 PTimeInterval tick = PTimer::Tick(); // Get timestamp now
1103
1104 // Have not got SSRC yet, so grab it now
1105 if (syncSourceIn == 0)
1106 syncSourceIn = frame.GetSyncSource();
1107
1108 // Check packet sequence numbers
1109 if (packetsReceived == 0) {
1110 firstPacketReceived.SetCurrentTime();
1111 PTRACE(3, "RTP\tSession " << sessionID << ", first receive data:"
1112 " ver=" << frame.GetVersion()
1113 << " pt=" << frame.GetPayloadType()
1114 << " psz=" << frame.GetPayloadSize()
1115 << " m=" << frame.GetMarker()
1116 << " x=" << frame.GetExtension()
1117 << " seq=" << frame.GetSequenceNumber()
1118 << " ts=" << frame.GetTimestamp()
1119 << " src=" << hex << frame.GetSyncSource()
1120 << " ccnt=" << frame.GetContribSrcCount() << dec);
1121
1122 #if OPAL_RTCP_XR
1123 delete m_metrics; // Should be NULL, but just in case ...
1124 m_metrics = RTCP_XR_Metrics::Create(frame);
1125 #endif
1126
1127 if ((frame.GetPayloadType() == RTP_DataFrame::T38) &&
1128 (frame.GetSequenceNumber() >= 0x8000) &&
1129 (frame.GetPayloadSize() == 0)) {
1130 PTRACE(4, "RTP\tSession " << sessionID << ", ignoring left over audio packet from switch to T.38");
1131 return e_IgnorePacket; // Non fatal error, just ignore
1132 }
1133
1134 expectedSequenceNumber = (WORD)(frame.GetSequenceNumber() + 1);
1135 }
1136 else {
1137 if (frame.GetSyncSource() != syncSourceIn) {
1138 if (allowAnySyncSource) {
1139 PTRACE(2, "RTP\tSession " << sessionID << ", SSRC changed from " << hex << frame.GetSyncSource() << " to " << syncSourceIn << dec);
1140 syncSourceIn = frame.GetSyncSource();
1141 allowSequenceChange = true;
1142 }
1143 else if (allowOneSyncSourceChange) {
1144 PTRACE(2, "RTP\tSession " << sessionID << ", allowed one SSRC change from SSRC=" << hex << syncSourceIn << " to =" << dec << frame.GetSyncSource() << dec);
1145 syncSourceIn = frame.GetSyncSource();
1146 allowSequenceChange = true;
1147 allowOneSyncSourceChange = false;
1148 }
1149 else {
1150 PTRACE(2, "RTP\tSession " << sessionID << ", packet from SSRC=" << hex << frame.GetSyncSource() << " ignored, expecting SSRC=" << syncSourceIn << dec);
1151 return e_IgnorePacket; // Non fatal error, just ignore
1152 }
1153 }
1154
1155 WORD sequenceNumber = frame.GetSequenceNumber();
1156 if (sequenceNumber == expectedSequenceNumber) {
1157 expectedSequenceNumber++;
1158 consecutiveOutOfOrderPackets = 0;
1159
1160 if (!m_outOfOrderPackets.empty()) {
1161 PTRACE(5, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
1162 << ", received out of order packet " << sequenceNumber);
1163 outOfOrderPacketTime = tick;
1164 packetsOutOfOrder++;
1165 }
1166
1167 /* For audio we do not do statistics on start of talk burst (marker bit)
1168 as that could be a substantial time and is not really "jitter".
1169 For video we measure jitter between whoile frames which is indicated
1170 by the marker bit being on. */
1171 if (frame.GetMarker() != isAudio) {
1172 DWORD diff = (tick - lastReceivedPacketTime).GetInterval();
1173
1174 averageReceiveTimeAccum += diff;
1175 if (diff > maximumReceiveTimeAccum)
1176 maximumReceiveTimeAccum = diff;
1177 if (diff < minimumReceiveTimeAccum)
1178 minimumReceiveTimeAccum = diff;
1179 rxStatisticsCount++;
1180
1181 // As per RFC3550 Appendix 8
1182 diff *= GetJitterTimeUnits(); // Convert to timestamp units
1183 long variance = diff > lastTransitTime ? (diff - lastTransitTime) : (lastTransitTime - diff);
1184 lastTransitTime = diff;
1185 jitterLevel += variance - ((jitterLevel+(1<<(JitterRoundingGuardBits-1))) >> JitterRoundingGuardBits);
1186 if (jitterLevel > maximumJitterLevel)
1187 maximumJitterLevel = jitterLevel;
1188 }
1189
1190 if (frame.GetMarker())
1191 markerRecvCount++;
1192 }
1193 else if (allowSequenceChange) {
1194 expectedSequenceNumber = (WORD) (sequenceNumber + 1);
1195 allowSequenceChange = false;
1196 m_outOfOrderPackets.clear();
1197 PTRACE(2, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
1198 << ", adjusting sequence numbers to expect " << expectedSequenceNumber);
1199 }
1200 else if (sequenceNumber < expectedSequenceNumber) {
1201 #if OPAL_RTCP_XR
1202 if (m_metrics != NULL) m_metrics->OnPacketDiscarded();
1203 #endif
1204
1205 // Check for Cisco bug where sequence numbers suddenly start incrementing
1206 // from a different base.
1207 if (++consecutiveOutOfOrderPackets > 10) {
1208 expectedSequenceNumber = (WORD)(sequenceNumber + 1);
1209 PTRACE(2, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
1210 << ", abnormal change of sequence numbers, adjusting to expect " << expectedSequenceNumber);
1211 }
1212 else {
1213 PTRACE(2, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
1214 << ", incorrect sequence, got " << sequenceNumber << " expected " << expectedSequenceNumber);
1215
1216 if (resequenceOutOfOrderPackets)
1217 return e_IgnorePacket; // Non fatal error, just ignore
1218
1219 packetsOutOfOrder++;
1220 }
1221 }
1222 else if (resequenceOutOfOrderPackets &&
1223 (m_outOfOrderPackets.empty() || (tick - outOfOrderPacketTime) < outOfOrderWaitTime)) {
1224 if (m_outOfOrderPackets.empty())
1225 outOfOrderPacketTime = tick;
1226 // Maybe packet lost, maybe out of order, save for now
1227 SaveOutOfOrderPacket(frame);
1228 return e_IgnorePacket;
1229 }
1230 else {
1231 if (!m_outOfOrderPackets.empty()) {
1232 // Give up on the packet, probably never coming in. Save current and switch in
1233 // the lowest numbered packet.
1234 SaveOutOfOrderPacket(frame);
1235
1236 for (;;) {
1237 if (m_outOfOrderPackets.empty())
1238 return e_IgnorePacket;
1239
1240 frame = m_outOfOrderPackets.back();
1241 m_outOfOrderPackets.pop_back();
1242
1243 sequenceNumber = frame.GetSequenceNumber();
1244 if (sequenceNumber >= expectedSequenceNumber)
1245 break;
1246
1247 PTRACE(2, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
1248 << ", incorrect sequence after re-ordering, got "
1249 << sequenceNumber << " expected " << expectedSequenceNumber);
1250 }
1251
1252 outOfOrderPacketTime = tick;
1253 }
1254
1255 unsigned dropped = sequenceNumber - expectedSequenceNumber;
1256 packetsLost += dropped;
1257 packetsLostSinceLastRR += dropped;
1258 PTRACE(2, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
1259 << ", " << dropped << " packet(s) missing at " << sequenceNumber);
1260 expectedSequenceNumber = (WORD)(sequenceNumber + 1);
1261 consecutiveOutOfOrderPackets = 0;
1262 #if OPAL_RTCP_XR
1263 if (m_metrics != NULL) m_metrics->OnPacketLost(dropped);
1264 #endif
1265 }
1266 }
1267
1268 lastReceivedPacketTime = tick;
1269
1270 octetsReceived += frame.GetPayloadSize();
1271 packetsReceived++;
1272
1273 #if OPAL_RTCP_XR
1274 if (m_metrics != NULL) m_metrics->OnPacketReceived();
1275 #endif
1276
1277 // Call the statistics call-back on the first PDU with total count == 1
1278 if (packetsReceived == 1 && userData != NULL)
1279 userData->OnRxStatistics(*this);
1280
1281 if (rxStatisticsCount >= rxStatisticsInterval) {
1282
1283 rxStatisticsCount = 0;
1284
1285 averageReceiveTime = averageReceiveTimeAccum/rxStatisticsInterval;
1286 maximumReceiveTime = maximumReceiveTimeAccum;
1287 minimumReceiveTime = minimumReceiveTimeAccum;
1288
1289 averageReceiveTimeAccum = 0;
1290 maximumReceiveTimeAccum = 0;
1291 minimumReceiveTimeAccum = 0xffffffff;
1292
1293 PTRACE(4, "RTP\tSession " << sessionID << ", receive statistics:"
1294 " packets=" << packetsReceived <<
1295 " octets=" << octetsReceived <<
1296 " lost=" << packetsLost <<
1297 " tooLate=" << GetPacketsTooLate() <<
1298 " order=" << packetsOutOfOrder <<
1299 " avgTime=" << averageReceiveTime <<
1300 " maxTime=" << maximumReceiveTime <<
1301 " minTime=" << minimumReceiveTime <<
1302 " jitter=" << GetAvgJitterTime() <<
1303 " maxJitter=" << GetMaxJitterTime());
1304
1305 if (userData != NULL)
1306 userData->OnRxStatistics(*this);
1307 }
1308
1309 SendReceiveStatus status = e_ProcessPacket;
1310 for (list<FilterNotifier>::iterator filter = m_filters.begin(); filter != m_filters.end(); ++filter)
1311 (*filter)(frame, status);
1312
1313 return status;
1314 }
1315
1316
SaveOutOfOrderPacket(RTP_DataFrame & frame)1317 void RTP_Session::SaveOutOfOrderPacket(RTP_DataFrame & frame)
1318 {
1319 WORD sequenceNumber = frame.GetSequenceNumber();
1320
1321 PTRACE(m_outOfOrderPackets.empty() ? 2 : 5,
1322 "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn << ", "
1323 << (m_outOfOrderPackets.empty() ? "first" : "next") << " out of order packet, got "
1324 << sequenceNumber << " expected " << expectedSequenceNumber);
1325
1326 std::list<RTP_DataFrame>::iterator it;
1327 for (it = m_outOfOrderPackets.begin(); it != m_outOfOrderPackets.end(); ++it) {
1328 if (sequenceNumber > it->GetSequenceNumber())
1329 break;
1330 }
1331
1332 m_outOfOrderPackets.insert(it, frame);
1333 frame.MakeUnique();
1334 }
1335
1336
InsertReportPacket(RTP_ControlFrame & report)1337 PBoolean RTP_Session::InsertReportPacket(RTP_ControlFrame & report)
1338 {
1339 report.StartNewPacket();
1340
1341 // No packets sent yet, so only set RR
1342 if (packetsSent == 0) {
1343
1344 // Send RR as we are not transmitting
1345 report.SetPayloadType(RTP_ControlFrame::e_ReceiverReport);
1346
1347 // if no packets received, put in an empty report
1348 if (packetsReceived == 0) {
1349 report.SetPayloadSize(sizeof(PUInt32b)); // length is SSRC
1350 report.SetCount(0);
1351
1352 // add the SSRC to the start of the payload
1353 *(PUInt32b *)report.GetPayloadPtr() = syncSourceOut;
1354 }
1355 else {
1356 report.SetPayloadSize(sizeof(PUInt32b) + sizeof(RTP_ControlFrame::ReceiverReport)); // length is SSRC of packet sender plus RR
1357 report.SetCount(1);
1358 BYTE * payload = report.GetPayloadPtr();
1359
1360 // add the SSRC to the start of the payload
1361 *(PUInt32b *)payload = syncSourceOut;
1362
1363 // add the RR after the SSRC
1364 AddReceiverReport(*(RTP_ControlFrame::ReceiverReport *)(payload+sizeof(PUInt32b)));
1365 }
1366 }
1367 else {
1368 // send SR and RR
1369 report.SetPayloadType(RTP_ControlFrame::e_SenderReport);
1370 report.SetPayloadSize(sizeof(PUInt32b) + sizeof(RTP_ControlFrame::SenderReport)); // length is SSRC of packet sender plus SR
1371 report.SetCount(0);
1372 BYTE * payload = report.GetPayloadPtr();
1373
1374 // add the SSRC to the start of the payload
1375 *(PUInt32b *)payload = syncSourceOut;
1376
1377 // add the SR after the SSRC
1378 RTP_ControlFrame::SenderReport * sender = (RTP_ControlFrame::SenderReport *)(payload+sizeof(PUInt32b));
1379 PTime now;
1380 sender->ntp_sec = (DWORD)(now.GetTimeInSeconds()+SecondsFrom1900to1970); // Convert from 1970 to 1900
1381 sender->ntp_frac = now.GetMicrosecond()*4294; // Scale microseconds to "fraction" from 0 to 2^32
1382 sender->rtp_ts = lastSentTimestamp;
1383 sender->psent = packetsSent;
1384 sender->osent = octetsSent;
1385
1386 PTRACE(3, "RTP\tSession " << sessionID << ", SentSenderReport:"
1387 " ssrc=" << syncSourceOut
1388 << " ntp=" << sender->ntp_sec << '.' << sender->ntp_frac
1389 << " rtp=" << sender->rtp_ts
1390 << " psent=" << sender->psent
1391 << " osent=" << sender->osent);
1392
1393 if (syncSourceIn != 0) {
1394 report.SetPayloadSize(sizeof(PUInt32b) + sizeof(RTP_ControlFrame::SenderReport) + sizeof(RTP_ControlFrame::ReceiverReport));
1395 report.SetCount(1);
1396 AddReceiverReport(*(RTP_ControlFrame::ReceiverReport *)(payload+sizeof(PUInt32b)+sizeof(RTP_ControlFrame::SenderReport)));
1397 }
1398 }
1399
1400 report.EndPacket();
1401 return true;
1402 }
1403
1404
SendReport(PTimer &,INT)1405 void RTP_Session::SendReport(PTimer&, INT)
1406 {
1407 PWaitAndSignal mutex(m_reportMutex);
1408
1409 // Have not got anything yet, do nothing
1410 if (packetsSent == 0 && packetsReceived == 0)
1411 return;
1412
1413 RTP_ControlFrame report;
1414 InsertReportPacket(report);
1415
1416 // Add the SDES part to compound RTCP packet
1417 PTRACE(3, "RTP\tSession " << sessionID << ", sending SDES: " << canonicalName);
1418 report.StartNewPacket();
1419
1420 report.SetCount(0); // will be incremented automatically
1421 report.StartSourceDescription (syncSourceOut);
1422 report.AddSourceDescriptionItem(RTP_ControlFrame::e_CNAME, canonicalName);
1423 report.AddSourceDescriptionItem(RTP_ControlFrame::e_TOOL, toolName);
1424 report.EndPacket();
1425
1426 #if OPAL_RTCP_XR
1427 //Generate and send RTCP-XR packet
1428 if (m_metrics != NULL) m_metrics->InsertExtendedReportPacket(sessionID, syncSourceOut, m_jitterBuffer, report);
1429 #endif
1430
1431 WriteControl(report);
1432 }
1433
1434
1435 #if OPAL_STATISTICS
GetStatistics(OpalMediaStatistics & statistics,bool receiver) const1436 void RTP_Session::GetStatistics(OpalMediaStatistics & statistics, bool receiver) const
1437 {
1438 statistics.m_totalBytes = receiver ? GetOctetsReceived() : GetOctetsSent();
1439 statistics.m_totalPackets = receiver ? GetPacketsReceived() : GetPacketsSent();
1440 statistics.m_packetsLost = receiver ? GetPacketsLost() : GetPacketsLostByRemote();
1441 statistics.m_packetsOutOfOrder = receiver ? GetPacketsOutOfOrder() : 0;
1442 statistics.m_packetsTooLate = receiver ? GetPacketsTooLate() : 0;
1443 statistics.m_packetOverruns = receiver ? GetPacketOverruns() : 0;
1444 statistics.m_minimumPacketTime = receiver ? GetMinimumReceiveTime() : GetMinimumSendTime();
1445 statistics.m_averagePacketTime = receiver ? GetAverageReceiveTime() : GetAverageSendTime();
1446 statistics.m_maximumPacketTime = receiver ? GetMaximumReceiveTime() : GetMaximumSendTime();
1447 statistics.m_averageJitter = receiver ? GetAvgJitterTime() : GetJitterTimeOnRemote();
1448 statistics.m_maximumJitter = receiver ? GetMaxJitterTime() : 0;
1449 statistics.m_jitterBufferDelay = receiver ? GetJitterBufferDelay() : 0;
1450 }
1451 #endif
1452
1453
1454 RTP_Session::ReceiverReportArray
BuildReceiverReportArray(const RTP_ControlFrame & frame,PINDEX offset)1455 RTP_Session::BuildReceiverReportArray(const RTP_ControlFrame & frame, PINDEX offset)
1456 {
1457 RTP_Session::ReceiverReportArray reports;
1458
1459 const RTP_ControlFrame::ReceiverReport * rr = (const RTP_ControlFrame::ReceiverReport *)(frame.GetPayloadPtr()+offset);
1460 for (PINDEX repIdx = 0; repIdx < (PINDEX)frame.GetCount(); repIdx++) {
1461 RTP_Session::ReceiverReport * report = new RTP_Session::ReceiverReport;
1462 report->sourceIdentifier = rr->ssrc;
1463 report->fractionLost = rr->fraction;
1464 report->totalLost = rr->GetLostPackets();
1465 report->lastSequenceNumber = rr->last_seq;
1466 report->jitter = rr->jitter;
1467 report->lastTimestamp = (PInt64)(DWORD)rr->lsr;
1468 report->delay = ((PInt64)rr->dlsr << 16)/1000;
1469 reports.SetAt(repIdx, report);
1470 #if OPAL_RTCP_XR
1471 if (m_metrics != NULL) m_metrics->OnRxSenderReport(rr->lsr, rr->dlsr);
1472 #endif
1473 rr++;
1474 }
1475
1476 return reports;
1477 }
1478
1479
OnReceiveControl(RTP_ControlFrame & frame)1480 RTP_Session::SendReceiveStatus RTP_Session::OnReceiveControl(RTP_ControlFrame & frame)
1481 {
1482 do {
1483 BYTE * payload = frame.GetPayloadPtr();
1484 PINDEX size = frame.GetPayloadSize();
1485 if ((payload == NULL) || (size == 0) || ((payload + size) > (frame.GetPointer() + frame.GetSize()))){
1486 /* TODO: 1.shall we test for a maximum size ? Indeed but what's the value ? *
1487 2. what's the correct exit status ? */
1488 PTRACE(2, "RTP\tSession " << sessionID << ", OnReceiveControl invalid frame");
1489 break;
1490 }
1491
1492 switch (frame.GetPayloadType()) {
1493 case RTP_ControlFrame::e_SenderReport :
1494 if (size >= (PINDEX)(sizeof(PUInt32b)+sizeof(RTP_ControlFrame::SenderReport)+frame.GetCount()*sizeof(RTP_ControlFrame::ReceiverReport))) {
1495 SenderReport sender;
1496 sender.sourceIdentifier = *(const PUInt32b *)payload;
1497 const RTP_ControlFrame::SenderReport & sr = *(const RTP_ControlFrame::SenderReport *)(payload+sizeof(PUInt32b));
1498 sender.realTimestamp = PTime(sr.ntp_sec-SecondsFrom1900to1970, sr.ntp_frac/4294);
1499
1500 // Save the receive time
1501 lastSRTimestamp = sender.realTimestamp;
1502 lastSRReceiveTime.SetCurrentTime();
1503 senderReportsReceived++;
1504
1505 sender.rtpTimestamp = sr.rtp_ts;
1506 sender.packetsSent = sr.psent;
1507 sender.octetsSent = sr.osent;
1508
1509 OnRxSenderReport(sender, BuildReceiverReportArray(frame, sizeof(PUInt32b)+sizeof(RTP_ControlFrame::SenderReport)));
1510 }
1511 else {
1512 PTRACE(2, "RTP\tSession " << sessionID << ", SenderReport packet truncated");
1513 }
1514 break;
1515
1516 case RTP_ControlFrame::e_ReceiverReport :
1517 if (size >= (PINDEX)(sizeof(PUInt32b)+frame.GetCount()*sizeof(RTP_ControlFrame::ReceiverReport)))
1518 OnRxReceiverReport(*(const PUInt32b *)payload, BuildReceiverReportArray(frame, sizeof(PUInt32b)));
1519 else {
1520 PTRACE(2, "RTP\tSession " << sessionID << ", ReceiverReport packet truncated");
1521 }
1522 break;
1523
1524 case RTP_ControlFrame::e_SourceDescription :
1525 if (size >= (PINDEX)(frame.GetCount()*sizeof(RTP_ControlFrame::SourceDescription))) {
1526 SourceDescriptionArray descriptions;
1527 const RTP_ControlFrame::SourceDescription * sdes = (const RTP_ControlFrame::SourceDescription *)payload;
1528 PINDEX srcIdx;
1529 for (srcIdx = 0; srcIdx < (PINDEX)frame.GetCount(); srcIdx++) {
1530 descriptions.SetAt(srcIdx, new SourceDescription(sdes->src));
1531 const RTP_ControlFrame::SourceDescription::Item * item = sdes->item;
1532 PINDEX uiSizeCurrent = 0; /* current size of the items already parsed */
1533 while ((item != NULL) && (item->type != RTP_ControlFrame::e_END)) {
1534 descriptions[srcIdx].items.SetAt(item->type, PString(item->data, item->length));
1535 uiSizeCurrent += item->GetLengthTotal();
1536 PTRACE(4,"RTP\tSession " << sessionID << ", SourceDescription item " << item << ", current size = " << uiSizeCurrent);
1537
1538 /* avoid reading where GetNextItem() shall not */
1539 if (uiSizeCurrent >= size){
1540 PTRACE(4,"RTP\tSession " << sessionID << ", SourceDescription end of items");
1541 item = NULL;
1542 break;
1543 }
1544
1545 item = item->GetNextItem();
1546 }
1547
1548 /* RTP_ControlFrame::e_END doesn't have a length field, so do NOT call item->GetNextItem()
1549 otherwise it reads over the buffer */
1550 if (item == NULL ||
1551 item->type == RTP_ControlFrame::e_END ||
1552 (sdes = (const RTP_ControlFrame::SourceDescription *)item->GetNextItem()) == NULL)
1553 break;
1554 }
1555 OnRxSourceDescription(descriptions);
1556 }
1557 else {
1558 PTRACE(2, "RTP\tSession " << sessionID << ", SourceDescription packet truncated");
1559 }
1560 break;
1561
1562 case RTP_ControlFrame::e_Goodbye :
1563 if (size >= 4) {
1564 PString str;
1565 PINDEX count = frame.GetCount()*4;
1566
1567 if (size > count) {
1568 if (size >= (PINDEX)(payload[count] + sizeof(DWORD) /*SSRC*/ + sizeof(unsigned char) /* length */))
1569 str = PString((const char *)(payload+count+1), payload[count]);
1570 else {
1571 PTRACE(2, "RTP\tSession " << sessionID << ", Goodbye packet invalid");
1572 }
1573 }
1574
1575 PDWORDArray sources(count);
1576 for (PINDEX i = 0; i < count; i++)
1577 sources[i] = ((const PUInt32b *)payload)[i];
1578 OnRxGoodbye(sources, str);
1579 }
1580 else {
1581 PTRACE(2, "RTP\tSession " << sessionID << ", Goodbye packet truncated");
1582 }
1583
1584 if (closeOnBye) {
1585 PTRACE(3, "RTP\tSession " << sessionID << ", Goodbye packet closing transport");
1586 return e_AbortTransport;
1587 }
1588 break;
1589
1590 case RTP_ControlFrame::e_ApplDefined :
1591 if (size >= 4) {
1592 PString str((const char *)(payload+4), 4);
1593 OnRxApplDefined(str, frame.GetCount(), *(const PUInt32b *)payload,
1594 payload+8, frame.GetPayloadSize()-8);
1595 }
1596 else {
1597 PTRACE(2, "RTP\tSession " << sessionID << ", ApplDefined packet truncated");
1598 }
1599 break;
1600
1601 #if OPAL_RTCP_XR
1602 case RTP_ControlFrame::e_ExtendedReport :
1603 if (size >= (PINDEX)(sizeof(PUInt32b)+frame.GetCount()*sizeof(RTP_ControlFrame::ExtendedReport)))
1604 OnRxExtendedReport(*(const PUInt32b *)payload, RTCP_XR_Metrics::BuildExtendedReportArray(frame, sizeof(PUInt32b)));
1605 else {
1606 PTRACE(2, "RTP\tSession " << sessionID << ", ReceiverReport packet truncated");
1607 }
1608 break;
1609 #endif
1610
1611 #if OPAL_VIDEO
1612 case RTP_ControlFrame::e_IntraFrameRequest :
1613 PTRACE(4, "RTP\tSession " << sessionID << ", received RFC2032 FIR");
1614 if(userData != NULL)
1615 userData->OnRxIntraFrameRequest(*this);
1616 break;
1617
1618 case RTP_ControlFrame::e_PayloadSpecificFeedBack :
1619 switch (frame.GetFbType()) {
1620 case RTP_ControlFrame::e_PictureLossIndication :
1621 PTRACE(4, "RTP\tSession " << sessionID << ", received RFC5104 PLI");
1622 if(userData != NULL)
1623 userData->OnRxIntraFrameRequest(*this);
1624 break;
1625
1626 case RTP_ControlFrame::e_FullIntraRequest :
1627 PTRACE(4, "RTP\tSession " << sessionID << ", received RFC5104 FIR");
1628 if(userData != NULL)
1629 userData->OnRxIntraFrameRequest(*this);
1630 break;
1631
1632 default :
1633 PTRACE(2, "RTP\tSession " << sessionID << ", Unknown Payload Specific feedback type: " << frame.GetFbType());
1634 }
1635 break;
1636 #endif
1637
1638 default :
1639 PTRACE(2, "RTP\tSession " << sessionID << ", Unknown control payload type: " << frame.GetPayloadType());
1640 }
1641 } while (frame.ReadNextPacket());
1642
1643 return e_ProcessPacket;
1644 }
1645
1646
OnRxSenderReport(const SenderReport & PTRACE_PARAM (sender),const ReceiverReportArray & reports)1647 void RTP_Session::OnRxSenderReport(const SenderReport & PTRACE_PARAM(sender), const ReceiverReportArray & reports)
1648 {
1649 #if PTRACING
1650 if (PTrace::CanTrace(3)) {
1651 ostream & strm = PTrace::Begin(3, __FILE__, __LINE__);
1652 strm << "RTP\tSession " << sessionID << ", OnRxSenderReport: " << sender << '\n';
1653 for (PINDEX i = 0; i < reports.GetSize(); i++)
1654 strm << " RR: " << reports[i] << '\n';
1655 strm << PTrace::End;
1656 }
1657 #endif
1658 OnReceiverReports(reports);
1659 }
1660
1661
OnRxReceiverReport(DWORD PTRACE_PARAM (src),const ReceiverReportArray & reports)1662 void RTP_Session::OnRxReceiverReport(DWORD PTRACE_PARAM(src), const ReceiverReportArray & reports)
1663 {
1664 #if PTRACING
1665 if (PTrace::CanTrace(3)) {
1666 ostream & strm = PTrace::Begin(2, __FILE__, __LINE__);
1667 strm << "RTP\tSession " << sessionID << ", OnReceiverReport: ssrc=" << src << '\n';
1668 for (PINDEX i = 0; i < reports.GetSize(); i++)
1669 strm << " RR: " << reports[i] << '\n';
1670 strm << PTrace::End;
1671 }
1672 #endif
1673 OnReceiverReports(reports);
1674 }
1675
1676
OnReceiverReports(const ReceiverReportArray & reports)1677 void RTP_Session::OnReceiverReports(const ReceiverReportArray & reports)
1678 {
1679 for (PINDEX i = 0; i < reports.GetSize(); i++) {
1680 if (reports[i].sourceIdentifier == syncSourceOut) {
1681 packetsLostByRemote = reports[i].totalLost;
1682 jitterLevelOnRemote = reports[i].jitter;
1683 break;
1684 }
1685 }
1686 }
1687
1688
OnRxSourceDescription(const SourceDescriptionArray & PTRACE_PARAM (description))1689 void RTP_Session::OnRxSourceDescription(const SourceDescriptionArray & PTRACE_PARAM(description))
1690 {
1691 #if PTRACING
1692 if (PTrace::CanTrace(3)) {
1693 ostream & strm = PTrace::Begin(3, __FILE__, __LINE__);
1694 strm << "RTP\tSession " << sessionID << ", OnSourceDescription: " << description.GetSize() << " entries";
1695 for (PINDEX i = 0; i < description.GetSize(); i++)
1696 strm << "\n " << description[i];
1697 strm << PTrace::End;
1698 }
1699 #endif
1700 }
1701
1702
OnRxGoodbye(const PDWORDArray & PTRACE_PARAM (src),const PString & PTRACE_PARAM (reason))1703 void RTP_Session::OnRxGoodbye(const PDWORDArray & PTRACE_PARAM(src), const PString & PTRACE_PARAM(reason))
1704 {
1705 PTRACE(3, "RTP\tSession " << sessionID << ", OnGoodbye: \"" << reason << "\" srcs=" << src);
1706 }
1707
1708
OnRxApplDefined(const PString & PTRACE_PARAM (type),unsigned PTRACE_PARAM (subtype),DWORD PTRACE_PARAM (src),const BYTE *,PINDEX PTRACE_PARAM (size))1709 void RTP_Session::OnRxApplDefined(const PString & PTRACE_PARAM(type),
1710 unsigned PTRACE_PARAM(subtype), DWORD PTRACE_PARAM(src),
1711 const BYTE * /*data*/, PINDEX PTRACE_PARAM(size))
1712 {
1713 PTRACE(3, "RTP\tSession " << sessionID << ", OnApplDefined: \""
1714 << type << "\"-" << subtype << " " << src << " [" << size << ']');
1715 }
1716
1717
PrintOn(ostream & strm) const1718 void RTP_Session::ReceiverReport::PrintOn(ostream & strm) const
1719 {
1720 strm << "ssrc=" << sourceIdentifier
1721 << " fraction=" << fractionLost
1722 << " lost=" << totalLost
1723 << " last_seq=" << lastSequenceNumber
1724 << " jitter=" << jitter
1725 << " lsr=" << lastTimestamp
1726 << " dlsr=" << delay;
1727 }
1728
1729
PrintOn(ostream & strm) const1730 void RTP_Session::SenderReport::PrintOn(ostream & strm) const
1731 {
1732 strm << "ssrc=" << sourceIdentifier
1733 << " ntp=" << realTimestamp.AsString("yyyy/M/d-h:m:s.uuuu")
1734 << " rtp=" << rtpTimestamp
1735 << " psent=" << packetsSent
1736 << " osent=" << octetsSent;
1737 }
1738
1739
PrintOn(ostream & strm) const1740 void RTP_Session::SourceDescription::PrintOn(ostream & strm) const
1741 {
1742 static const char * const DescriptionNames[RTP_ControlFrame::NumDescriptionTypes] = {
1743 "END", "CNAME", "NAME", "EMAIL", "PHONE", "LOC", "TOOL", "NOTE", "PRIV"
1744 };
1745
1746 strm << "ssrc=" << sourceIdentifier;
1747 for (PINDEX i = 0; i < items.GetSize(); i++) {
1748 strm << "\n item[" << i << "]: type=";
1749 unsigned typeNum = items.GetKeyAt(i);
1750 if (typeNum < PARRAYSIZE(DescriptionNames))
1751 strm << DescriptionNames[typeNum];
1752 else
1753 strm << typeNum;
1754 strm << " data=\""
1755 << items.GetDataAt(i)
1756 << '"';
1757 }
1758 }
1759
1760
GetPacketsTooLate() const1761 DWORD RTP_Session::GetPacketsTooLate() const
1762 {
1763 JitterBufferPtr jitter = m_jitterBuffer; // Increase reference count
1764 return jitter != NULL ? jitter->GetPacketsTooLate() : 0;
1765 }
1766
1767
GetPacketOverruns() const1768 DWORD RTP_Session::GetPacketOverruns() const
1769 {
1770 JitterBufferPtr jitter = m_jitterBuffer; // Increase reference count
1771 return jitter != NULL ? jitter->GetBufferOverruns() : 0;
1772 }
1773
1774
WriteOOBData(RTP_DataFrame &,bool)1775 PBoolean RTP_Session::WriteOOBData(RTP_DataFrame &, bool)
1776 {
1777 return true;
1778 }
1779
1780
AddFilter(const FilterNotifier & filter)1781 void RTP_Session::AddFilter(const FilterNotifier & filter)
1782 {
1783 // ensures that a filter is added only once
1784 if (find(m_filters.begin(), m_filters.end(), filter) == m_filters.end())
1785 m_filters.push_back(filter);
1786 }
1787
1788
1789 /////////////////////////////////////////////////////////////////////////////
1790
SetMinBufferSize(PUDPSocket & sock,int buftype,int bufsz)1791 static void SetMinBufferSize(PUDPSocket & sock, int buftype, int bufsz)
1792 {
1793 int sz = 0;
1794 if (!sock.GetOption(buftype, sz)) {
1795 PTRACE(1, "RTP_UDP\tGetOption(" << sock.GetHandle() << ',' << buftype << ") failed: " << sock.GetErrorText());
1796 return;
1797 }
1798
1799 // Already big enough
1800 if (sz >= bufsz)
1801 return;
1802
1803 for (; bufsz >= 1024; bufsz /= 2) {
1804 // Set to new size
1805 if (!sock.SetOption(buftype, bufsz)) {
1806 PTRACE(1, "RTP_UDP\tSetOption(" << sock.GetHandle() << ',' << buftype << ',' << bufsz << ") failed: " << sock.GetErrorText());
1807 continue;
1808 }
1809
1810 // As some stacks lie about setting the buffer size, we double check.
1811 if (!sock.GetOption(buftype, sz)) {
1812 PTRACE(1, "RTP_UDP\tGetOption(" << sock.GetHandle() << ',' << buftype << ") failed: " << sock.GetErrorText());
1813 return;
1814 }
1815
1816 if (sz >= bufsz) {
1817 PTRACE(4, "RTP_UDP\tSetOption(" << sock.GetHandle() << ',' << buftype << ',' << bufsz << ") succeeded.");
1818 return;
1819 }
1820
1821 PTRACE(1, "RTP_UDP\tSetOption(" << sock.GetHandle() << ',' << buftype << ',' << bufsz << ") failed, even though it said it succeeded!");
1822 }
1823 }
1824
1825
RTP_UDP(const Params & params)1826 RTP_UDP::RTP_UDP(const Params & params)
1827 : RTP_Session(params),
1828 remoteAddress(0),
1829 remoteTransmitAddress(0),
1830 remoteIsNAT(params.remoteIsNAT)
1831 {
1832 PTRACE(4, "RTP_UDP\tSession " << sessionID << ", created with NAT flag set to " << remoteIsNAT);
1833 remoteDataPort = 0;
1834 remoteControlPort = 0;
1835 shutdownRead = false;
1836 shutdownWrite = false;
1837 dataSocket = NULL;
1838 controlSocket = NULL;
1839 appliedQOS = false;
1840 localHasNAT = false;
1841 badTransmitCounter = 0;
1842
1843 timerWriteDataIdle.SetNotifier(PCREATE_NOTIFIER(OnWriteDataIdle));
1844 }
1845
1846
~RTP_UDP()1847 RTP_UDP::~RTP_UDP()
1848 {
1849 timerWriteDataIdle.Stop();
1850 Close(true);
1851 Close(false);
1852
1853 // We need to do this to make sure that the sockets are not
1854 // deleted before select decides there is no more data coming
1855 // over them and exits the reading thread.
1856 SetJitterBufferSize(0, 0);
1857
1858 delete dataSocket;
1859 delete controlSocket;
1860 }
1861
1862
ApplyQOS(const PIPSocket::Address & addr)1863 void RTP_UDP::ApplyQOS(const PIPSocket::Address & addr)
1864 {
1865 if (controlSocket != NULL)
1866 controlSocket->SetSendAddress(addr,GetRemoteControlPort());
1867 if (dataSocket != NULL)
1868 dataSocket->SetSendAddress(addr,GetRemoteDataPort());
1869 appliedQOS = true;
1870 }
1871
1872
ModifyQOS(RTP_QOS * rtpqos)1873 PBoolean RTP_UDP::ModifyQOS(RTP_QOS * rtpqos)
1874 {
1875 PBoolean retval = false;
1876
1877 if (rtpqos == NULL)
1878 return retval;
1879
1880 if (controlSocket != NULL)
1881 retval = controlSocket->ModifyQoSSpec(&(rtpqos->ctrlQoS));
1882
1883 if (dataSocket != NULL)
1884 retval &= dataSocket->ModifyQoSSpec(&(rtpqos->dataQoS));
1885
1886 appliedQOS = false;
1887 return retval;
1888 }
1889
Open(PIPSocket::Address transportLocalAddress,WORD portBase,WORD portMax,BYTE tos,PNatMethod * natMethod,RTP_QOS * rtpQos)1890 PBoolean RTP_UDP::Open(PIPSocket::Address transportLocalAddress,
1891 WORD portBase, WORD portMax,
1892 BYTE tos,
1893 PNatMethod * natMethod,
1894 RTP_QOS * rtpQos)
1895 {
1896 PWaitAndSignal mutex(dataMutex);
1897
1898 m_firstControl = true;
1899
1900 // save local address
1901 localAddress = transportLocalAddress;
1902
1903 localDataPort = (WORD)(portBase&0xfffe);
1904 localControlPort = (WORD)(localDataPort + 1);
1905
1906 delete dataSocket;
1907 delete controlSocket;
1908 dataSocket = NULL;
1909 controlSocket = NULL;
1910
1911 byeSent = false;
1912
1913 PQoS * dataQos = NULL;
1914 PQoS * ctrlQos = NULL;
1915 if (rtpQos != NULL) {
1916 dataQos = &(rtpQos->dataQoS);
1917 ctrlQos = &(rtpQos->ctrlQoS);
1918 }
1919
1920 // allow for special case of portBase == 0 or portMax == 0, which indicates a shared RTP session
1921 if ((portBase != 0) || (portMax != 0)) {
1922 PIPSocket::Address bindingAddress = localAddress;
1923 if (natMethod != NULL && natMethod->IsAvailable(localAddress)) {
1924 switch (natMethod->GetRTPSupport()) {
1925 case PNatMethod::RTPIfSendMedia :
1926 /* This NAT variant will work if we send something out through the
1927 NAT port to "open" it so packets can then flow inward. We set
1928 this flag to make that happen as soon as we get the remotes IP
1929 address and port to send to.
1930 */
1931 localHasNAT = true;
1932 // Then do case for full cone support and create STUN sockets
1933
1934 case PNatMethod::RTPSupported :
1935 if (natMethod->CreateSocketPair(dataSocket, controlSocket, localAddress)) {
1936 PTRACE(4, "RTP\tSession " << sessionID << ", " << natMethod->GetName() << " created STUN RTP/RTCP socket pair.");
1937 dataSocket->GetLocalAddress(localAddress, localDataPort);
1938 controlSocket->GetLocalAddress(localAddress, localControlPort);
1939 }
1940 else {
1941 PTRACE(2, "RTP\tSession " << sessionID << ", " << natMethod->GetName()
1942 << " could not create RTP/RTCP socket pair; trying to create individual sockets.");
1943 if (natMethod->CreateSocket(dataSocket, localAddress) && natMethod->CreateSocket(controlSocket, localAddress)) {
1944 dataSocket->GetLocalAddress(localAddress, localDataPort);
1945 controlSocket->GetLocalAddress(localAddress, localControlPort);
1946 }
1947 else {
1948 delete dataSocket;
1949 delete controlSocket;
1950 dataSocket = NULL;
1951 controlSocket = NULL;
1952 PTRACE(2, "RTP\tSession " << sessionID << ", " << natMethod->GetName()
1953 << " could not create RTP/RTCP sockets individually either, using normal sockets.");
1954 }
1955 }
1956 break;
1957
1958 default :
1959 /* We canot use NAT traversal method (e.g. STUN) to create sockets
1960 in the remaining modes as the NAT router will then not let us
1961 talk to the real RTP destination. All we can so is bind to the
1962 local interface the NAT is on and hope the NAT router is doing
1963 something sneaky like symmetric port forwarding. */
1964 PTRACE(2, "RTP\tSession " << sessionID << ", " << natMethod->GetName()
1965 << " cannot create RTP/RTCP socket pair; creating individual sockets.");
1966 natMethod->GetInterfaceAddress(bindingAddress);
1967 break;
1968 }
1969 }
1970
1971 if (dataSocket == NULL || controlSocket == NULL) {
1972 dataSocket = new PUDPSocket(dataQos);
1973 controlSocket = new PUDPSocket(ctrlQos);
1974 while (! dataSocket->Listen(bindingAddress, 1, localDataPort) ||
1975 !controlSocket->Listen(bindingAddress, 1, localControlPort)) {
1976 dataSocket->Close();
1977 controlSocket->Close();
1978 if ((localDataPort > portMax) || (localDataPort > 0xfffd))
1979 return false; // If it ever gets to here the OS has some SERIOUS problems!
1980 localDataPort += 2;
1981 localControlPort += 2;
1982 }
1983 }
1984
1985 # ifndef __BEOS__
1986 // Set the IP Type Of Service field for prioritisation of media UDP packets
1987 // through some Cisco routers and Linux boxes
1988 if (!dataSocket->SetOption(IP_TOS, tos, IPPROTO_IP)) {
1989 PTRACE(1, "RTP_UDP\tSession " << sessionID << ", could not set TOS field in IP header: " << dataSocket->GetErrorText());
1990 }
1991
1992 // Increase internal buffer size on media UDP sockets
1993 SetMinBufferSize(*dataSocket, SO_RCVBUF, isAudio ? RTP_AUDIO_RX_BUFFER_SIZE : RTP_VIDEO_RX_BUFFER_SIZE);
1994 SetMinBufferSize(*dataSocket, SO_SNDBUF, RTP_DATA_TX_BUFFER_SIZE);
1995 SetMinBufferSize(*controlSocket, SO_RCVBUF, RTP_CTRL_BUFFER_SIZE);
1996 SetMinBufferSize(*controlSocket, SO_SNDBUF, RTP_CTRL_BUFFER_SIZE);
1997 # endif
1998 }
1999
2000 shutdownRead = false;
2001 shutdownWrite = false;
2002
2003 if (canonicalName.Find('@') == P_MAX_INDEX)
2004 canonicalName += '@' + GetLocalHostName();
2005
2006 PTRACE(3, "RTP_UDP\tSession " << sessionID << " created: "
2007 << localAddress << ':' << localDataPort << '-' << localControlPort
2008 << " ssrc=" << syncSourceOut);
2009
2010 m_reportTimer.RunContinuous(m_reportTimer.GetResetTime());
2011 return true;
2012 }
2013
2014
Reopen(PBoolean reading)2015 void RTP_UDP::Reopen(PBoolean reading)
2016 {
2017 PWaitAndSignal mutex(dataMutex);
2018
2019 if (reading) {
2020 if (!shutdownRead)
2021 return;
2022 shutdownRead = false;
2023 }
2024 else {
2025 if (!shutdownWrite)
2026 return;
2027 shutdownWrite = false;
2028 }
2029
2030 badTransmitCounter = 0;
2031 m_reportTimer.RunContinuous(m_reportTimer.GetResetTime());
2032
2033 PTRACE(3, "RTP_UDP\tSession " << sessionID << " reopened for " << (reading ? "reading" : "writing"));
2034 }
2035
2036
Close(PBoolean reading)2037 bool RTP_UDP::Close(PBoolean reading)
2038 {
2039 if (reading) {
2040 {
2041 PWaitAndSignal mutex(dataMutex);
2042
2043 if (shutdownRead) {
2044 PTRACE(4, "RTP_UDP\tSession " << sessionID << ", read already shut down .");
2045 return false;
2046 }
2047
2048 PTRACE(3, "RTP_UDP\tSession " << sessionID << ", Shutting down read.");
2049
2050 syncSourceIn = 0;
2051 shutdownRead = true;
2052
2053 if (dataSocket != NULL && controlSocket != NULL) {
2054 PIPSocket::Address addr;
2055 WORD port;
2056 controlSocket->PUDPSocket::GetLocalAddress(addr, port);
2057 if (addr.IsAny())
2058 PIPSocket::GetHostAddress(addr);
2059 dataSocket->WriteTo("", 1, addr, port);
2060 }
2061 }
2062
2063 SetJitterBufferSize(0, 0); // Kill jitter buffer too, but outside mutex
2064 }
2065 else {
2066 if (shutdownWrite) {
2067 PTRACE(4, "RTP_UDP\tSession " << sessionID << ", write already shut down .");
2068 return false;
2069 }
2070
2071 PTRACE(3, "RTP_UDP\tSession " << sessionID << ", shutting down write.");
2072 shutdownWrite = true;
2073 }
2074
2075 if (shutdownRead && shutdownWrite)
2076 m_reportTimer.Stop(false);
2077
2078 return true;
2079 }
2080
2081
GetLocalHostName()2082 PString RTP_UDP::GetLocalHostName()
2083 {
2084 return PIPSocket::GetHostName();
2085 }
2086
2087
SetRemoteSocketInfo(PIPSocket::Address address,WORD port,PBoolean isDataPort)2088 PBoolean RTP_UDP::SetRemoteSocketInfo(PIPSocket::Address address, WORD port, PBoolean isDataPort)
2089 {
2090 if (remoteIsNAT) {
2091 PTRACE(2, "RTP_UDP\tSession " << sessionID << ", ignoring remote socket info as remote is behind NAT");
2092 return true;
2093 }
2094
2095 if (!PAssert(address.IsValid() && port != 0,PInvalidParameter))
2096 return false;
2097
2098 PTRACE(3, "RTP_UDP\tSession " << sessionID << ", SetRemoteSocketInfo: "
2099 << (isDataPort ? "data" : "control") << " channel, "
2100 "new=" << address << ':' << port << ", "
2101 "local=" << localAddress << ':' << localDataPort << '-' << localControlPort << ", "
2102 "remote=" << remoteAddress << ':' << remoteDataPort << '-' << remoteControlPort);
2103
2104 if (localAddress == address && remoteAddress == address && (isDataPort ? localDataPort : localControlPort) == port)
2105 return true;
2106
2107 remoteAddress = address;
2108
2109 allowOneSyncSourceChange = true;
2110 allowRemoteTransmitAddressChange = true;
2111 allowSequenceChange = packetsReceived != 0;
2112
2113 if (isDataPort) {
2114 remoteDataPort = port;
2115 if (remoteControlPort == 0 || allowRemoteTransmitAddressChange)
2116 remoteControlPort = (WORD)(port + 1);
2117 }
2118 else {
2119 remoteControlPort = port;
2120 if (remoteDataPort == 0 || allowRemoteTransmitAddressChange)
2121 remoteDataPort = (WORD)(port - 1);
2122 }
2123
2124 if (!appliedQOS)
2125 ApplyQOS(remoteAddress);
2126
2127 if (localHasNAT) {
2128 // If have Port Restricted NAT on local host then send a datagram
2129 // to remote to open up the port in the firewall for return data.
2130 static const BYTE dummy[1] = { 0 };
2131 WriteDataOrControlPDU(dummy, sizeof(dummy), true);
2132 WriteDataOrControlPDU(dummy, sizeof(dummy), false);
2133 PTRACE(2, "RTP_UDP\tSession " << sessionID << ", sending empty datagrams to open local Port Restricted NAT");
2134 }
2135
2136 return true;
2137 }
2138
2139
ReadData(RTP_DataFrame & frame)2140 PBoolean RTP_UDP::ReadData(RTP_DataFrame & frame)
2141 {
2142 return EncodingLock(*this)->ReadData(frame);
2143 }
2144
Internal_ReadData(RTP_DataFrame & frame)2145 PBoolean RTP_UDP::Internal_ReadData(RTP_DataFrame & frame)
2146 {
2147 SendReceiveStatus receiveStatus = e_IgnorePacket;
2148 while (receiveStatus == e_IgnorePacket) {
2149 if (shutdownRead || PAssertNULL(dataSocket) == NULL || PAssertNULL(controlSocket) == NULL)
2150 return false;
2151
2152 int selectStatus = WaitForPDU(*dataSocket, *controlSocket, PMaxTimeInterval);
2153 if (shutdownRead)
2154 return false;
2155
2156 if (selectStatus > 0) {
2157 PTRACE(1, "RTP_UDP\tSession " << sessionID << ", Select error: "
2158 << PChannel::GetErrorText((PChannel::Errors)selectStatus));
2159 return false;
2160 }
2161
2162 if (selectStatus == 0)
2163 receiveStatus = OnReadTimeout(frame);
2164
2165 if ((-selectStatus & 2) != 0) {
2166 if (ReadControlPDU() == e_AbortTransport)
2167 return false;
2168 }
2169
2170 if ((-selectStatus & 1) != 0)
2171 receiveStatus = ReadDataPDU(frame);
2172 }
2173
2174 return receiveStatus == e_ProcessPacket;
2175 }
2176
2177
FlushData()2178 void RTP_UDP::FlushData()
2179 {
2180 if (shutdownRead || dataSocket == NULL)
2181 return;
2182
2183 PTimeInterval oldTimeout = dataSocket->GetReadTimeout();
2184 dataSocket->SetReadTimeout(0);
2185
2186 PINDEX count = 0;
2187 BYTE buffer[2000];
2188 while (dataSocket->Read(buffer, sizeof(buffer)))
2189 ++count;
2190
2191 dataSocket->SetReadTimeout(oldTimeout);
2192
2193 PTRACE_IF(3, count > 0, "RTP_UDP\tSession " << sessionID << ", flushed "
2194 << count << " RTP data packets before activating jitter buffer");
2195 }
2196
2197
WaitForPDU(PUDPSocket & dataSocket,PUDPSocket & controlSocket,const PTimeInterval & timeout)2198 int RTP_UDP::WaitForPDU(PUDPSocket & dataSocket, PUDPSocket & controlSocket, const PTimeInterval & timeout)
2199 {
2200 return EncodingLock(*this)->WaitForPDU(dataSocket, controlSocket, timeout);
2201 }
2202
Internal_WaitForPDU(PUDPSocket & dataSocket,PUDPSocket & controlSocket,const PTimeInterval & timeout)2203 int RTP_UDP::Internal_WaitForPDU(PUDPSocket & dataSocket, PUDPSocket & controlSocket, const PTimeInterval & timeout)
2204 {
2205 return PSocket::Select(dataSocket, controlSocket, timeout);
2206 }
2207
ReadDataOrControlPDU(BYTE * framePtr,PINDEX frameSize,PBoolean fromDataChannel)2208 RTP_Session::SendReceiveStatus RTP_UDP::ReadDataOrControlPDU(BYTE * framePtr,
2209 PINDEX frameSize,
2210 PBoolean fromDataChannel)
2211 {
2212 #if PTRACING
2213 const char * channelName = fromDataChannel ? "Data" : "Control";
2214 #endif
2215 PUDPSocket & socket = *(fromDataChannel ? dataSocket : controlSocket);
2216 PIPSocket::Address addr;
2217 WORD port;
2218
2219 if (socket.ReadFrom(framePtr, frameSize, addr, port)) {
2220 // If remote address never set from higher levels, then try and figure
2221 // it out from the first packet received.
2222 if (!remoteAddress.IsValid()) {
2223 remoteAddress = addr;
2224 PTRACE(4, "RTP\tSession " << sessionID << ", set remote address from first "
2225 << channelName << " PDU from " << addr << ':' << port);
2226 }
2227 if (fromDataChannel) {
2228 if (remoteDataPort == 0)
2229 remoteDataPort = port;
2230 }
2231 else {
2232 if (remoteControlPort == 0)
2233 remoteControlPort = port;
2234 }
2235
2236 if (!remoteTransmitAddress.IsValid())
2237 remoteTransmitAddress = addr;
2238 else if (allowRemoteTransmitAddressChange && remoteAddress == addr) {
2239 remoteTransmitAddress = addr;
2240 allowRemoteTransmitAddressChange = false;
2241 }
2242 else if (remoteTransmitAddress != addr && !allowRemoteTransmitAddressChange) {
2243 PTRACE(2, "RTP_UDP\tSession " << sessionID << ", "
2244 << channelName << " PDU from incorrect host, "
2245 " is " << addr << " should be " << remoteTransmitAddress);
2246 return RTP_Session::e_IgnorePacket;
2247 }
2248
2249 if (remoteAddress.IsValid() && !appliedQOS)
2250 ApplyQOS(remoteAddress);
2251
2252 badTransmitCounter = 0;
2253
2254 return RTP_Session::e_ProcessPacket;
2255 }
2256
2257 switch (socket.GetErrorNumber(PChannel::LastReadError)) {
2258 case ECONNRESET :
2259 case ECONNREFUSED :
2260 PTRACE(2, "RTP_UDP\tSession " << sessionID << ", " << channelName << " port on remote not ready.");
2261 if (++badTransmitCounter == 1)
2262 badTransmitStart = PTime();
2263 else {
2264 if (badTransmitCounter < 5 || (PTime()- badTransmitStart).GetSeconds() < BAD_TRANSMIT_TIME_MAX)
2265 return RTP_Session::e_IgnorePacket;
2266 PTRACE(2, "RTP_UDP\tSession " << sessionID << ", " << channelName << " " << BAD_TRANSMIT_TIME_MAX << " seconds of transmit fails - informing connection");
2267 userData->SessionFailing(*this);
2268 }
2269 return RTP_Session::e_IgnorePacket;
2270
2271 case EMSGSIZE :
2272 PTRACE(2, "RTP_UDP\tSession " << sessionID << ", " << channelName
2273 << " read packet too large for buffer of " << frameSize << " bytes.");
2274 return RTP_Session::e_IgnorePacket;
2275
2276 case EAGAIN :
2277 PTRACE(4, "RTP_UDP\tSession " << sessionID << ", " << channelName
2278 << " read packet interrupted.");
2279 // Shouldn't happen, but it does.
2280 return RTP_Session::e_IgnorePacket;
2281
2282 case 0 :
2283 PTRACE(4, "RTP_UDP\tSession " << sessionID << ", " << channelName
2284 << " received UDP packet with no payload.");
2285 return e_IgnorePacket;
2286
2287 default:
2288 PTRACE(1, "RTP_UDP\tSession " << sessionID << ", " << channelName
2289 << " read error (" << socket.GetErrorNumber(PChannel::LastReadError) << "): "
2290 << socket.GetErrorText(PChannel::LastReadError));
2291 return RTP_Session::e_AbortTransport;
2292 }
2293 }
2294
2295
ReadDataPDU(RTP_DataFrame & frame)2296 RTP_Session::SendReceiveStatus RTP_UDP::ReadDataPDU(RTP_DataFrame & frame)
2297 {
2298 return EncodingLock(*this)->ReadDataPDU(frame);
2299 }
2300
Internal_ReadDataPDU(RTP_DataFrame & frame)2301 RTP_Session::SendReceiveStatus RTP_UDP::Internal_ReadDataPDU(RTP_DataFrame & frame)
2302 {
2303 SendReceiveStatus status = ReadDataOrControlPDU(frame.GetPointer(), frame.GetSize(), true);
2304 if (status != e_ProcessPacket)
2305 return status;
2306
2307 // Check received PDU is big enough
2308 if (frame.SetPacketSize(dataSocket->GetLastReadCount()))
2309 return OnReceiveData(frame);
2310 return e_IgnorePacket;
2311 }
2312
2313
WriteDataPDU(RTP_DataFrame & frame)2314 bool RTP_UDP::WriteDataPDU(RTP_DataFrame & frame)
2315 {
2316 if (!EncodingLock(*this)->WriteDataPDU(frame))
2317 return false;
2318
2319 PWaitAndSignal mutex(dataMutex);
2320 EncodingLock(*this)->SetWriteDataIdleTimer(timerWriteDataIdle);
2321
2322 return true;
2323 }
2324
2325
OnReadTimeout(RTP_DataFrame & frame)2326 RTP_Session::SendReceiveStatus RTP_UDP::OnReadTimeout(RTP_DataFrame & frame)
2327 {
2328 return EncodingLock(*this)->OnReadTimeout(frame);
2329 }
2330
Internal_OnReadTimeout(RTP_DataFrame &)2331 RTP_Session::SendReceiveStatus RTP_UDP::Internal_OnReadTimeout(RTP_DataFrame & /*frame*/)
2332 {
2333 return e_IgnorePacket;
2334 }
2335
2336
ReadControlPDU()2337 RTP_Session::SendReceiveStatus RTP_UDP::ReadControlPDU()
2338 {
2339 RTP_ControlFrame frame(2048);
2340
2341 SendReceiveStatus status = ReadDataOrControlPDU(frame.GetPointer(), frame.GetSize(), false);
2342 if (status != e_ProcessPacket)
2343 return status;
2344
2345 PINDEX pduSize = controlSocket->GetLastReadCount();
2346 if (pduSize < 4 || pduSize < 4+frame.GetPayloadSize()) {
2347 PTRACE_IF(2, pduSize != 1 || !m_firstControl, "RTP_UDP\tSession " << sessionID
2348 << ", Received control packet too small: " << pduSize << " bytes");
2349 return e_IgnorePacket;
2350 }
2351
2352 m_firstControl = false;
2353 frame.SetSize(pduSize);
2354 return OnReceiveControl(frame);
2355 }
2356
2357
WriteOOBData(RTP_DataFrame & frame,bool rewriteTimeStamp)2358 PBoolean RTP_UDP::WriteOOBData(RTP_DataFrame & frame, bool rewriteTimeStamp)
2359 {
2360 PWaitAndSignal m(dataMutex);
2361
2362 // set timestamp offset if not already set
2363 // otherwise offset timestamp
2364 if (!oobTimeStampBaseEstablished) {
2365 oobTimeStampBaseEstablished = true;
2366 oobTimeStampBase = PTimer::Tick();
2367 if (rewriteTimeStamp)
2368 oobTimeStampOutBase = PRandom::Number();
2369 else
2370 oobTimeStampOutBase = frame.GetTimestamp();
2371 }
2372
2373 // set new timestamp
2374 if (rewriteTimeStamp)
2375 frame.SetTimestamp(oobTimeStampOutBase + ((PTimer::Tick() - oobTimeStampBase).GetInterval() * 8));
2376
2377 // write the data
2378 return EncodingLock(*this)->WriteData(frame, true);
2379 }
2380
WriteData(RTP_DataFrame & frame)2381 PBoolean RTP_UDP::WriteData(RTP_DataFrame & frame)
2382 {
2383 return EncodingLock(*this)->WriteData(frame, false);
2384 }
2385
2386
Internal_WriteData(RTP_DataFrame & frame)2387 PBoolean RTP_UDP::Internal_WriteData(RTP_DataFrame & frame)
2388 {
2389 if (shutdownWrite || dataSocket == NULL) {
2390 PTRACE(3, "RTP_UDP\tSession " << sessionID << ", write shutdown.");
2391 return false;
2392 }
2393
2394 // Trying to send a PDU before we are set up!
2395 if (!remoteAddress.IsValid() || remoteDataPort == 0)
2396 return true;
2397
2398 switch (OnSendData(frame)) {
2399 case e_ProcessPacket :
2400 break;
2401 case e_IgnorePacket :
2402 return true;
2403 case e_AbortTransport :
2404 return false;
2405 }
2406
2407 return WriteDataPDU(frame);
2408 }
2409
2410
OnWriteDataIdle(PTimer &,INT)2411 void RTP_UDP::OnWriteDataIdle(PTimer &, INT)
2412 {
2413 {
2414 PWaitAndSignal mutex(dataMutex);
2415 if (shutdownWrite) {
2416 PTRACE(3, "RTP_UDP\tSession " << sessionID << ", write shutdown.");
2417 return;
2418 }
2419 }
2420
2421 // Trying to send a PDU before we are set up!
2422 if (!remoteAddress.IsValid() || remoteDataPort == 0)
2423 return;
2424
2425 EncodingLock(*this)->OnWriteDataIdle();
2426
2427 PWaitAndSignal mutex(dataMutex);
2428 EncodingLock(*this)->SetWriteDataIdleTimer(timerWriteDataIdle);
2429 }
2430
2431
SetEncoding(const PString & newEncoding)2432 void RTP_UDP::SetEncoding(const PString & newEncoding)
2433 {
2434 dataMutex.Wait();
2435 timerWriteDataIdle.Stop(false);
2436 dataMutex.Signal();
2437
2438 RTP_Session::SetEncoding(newEncoding);
2439 }
2440
2441
WriteControl(RTP_ControlFrame & frame)2442 PBoolean RTP_UDP::WriteControl(RTP_ControlFrame & frame)
2443 {
2444 // Trying to send a PDU before we are set up!
2445 if (!remoteAddress.IsValid() || remoteControlPort == 0 || controlSocket == NULL)
2446 return true;
2447
2448 PINDEX len = frame.GetCompoundSize();
2449 switch (OnSendControl(frame, len)) {
2450 case e_ProcessPacket :
2451 break;
2452 case e_IgnorePacket :
2453 return true;
2454 case e_AbortTransport :
2455 return false;
2456 }
2457
2458 return WriteDataOrControlPDU(frame.GetPointer(), len, false);
2459 }
2460
2461
WriteDataOrControlPDU(const BYTE * framePtr,PINDEX frameSize,bool toDataChannel)2462 bool RTP_UDP::WriteDataOrControlPDU(const BYTE * framePtr, PINDEX frameSize, bool toDataChannel)
2463 {
2464 PUDPSocket & socket = *(toDataChannel ? dataSocket : controlSocket);
2465 WORD port = toDataChannel ? remoteDataPort : remoteControlPort;
2466 int retry = 0;
2467
2468 while (!socket.WriteTo(framePtr, frameSize, remoteAddress, port)) {
2469 switch (socket.GetErrorNumber()) {
2470 case ECONNRESET :
2471 case ECONNREFUSED :
2472 break;
2473
2474 default:
2475 PTRACE(1, "RTP_UDP\tSession " << sessionID
2476 << ", write (" << frameSize << " bytes) error on "
2477 << (toDataChannel ? "data" : "control") << " port ("
2478 << socket.GetErrorNumber(PChannel::LastWriteError) << "): "
2479 << socket.GetErrorText(PChannel::LastWriteError));
2480 return false;
2481 }
2482
2483 if (++retry >= 10)
2484 break;
2485 }
2486
2487 PTRACE_IF(2, retry > 0, "RTP_UDP\tSession " << sessionID << ", " << (toDataChannel ? "data" : "control")
2488 << " port on remote not ready " << retry << " time" << (retry > 1 ? "s" : "")
2489 << (retry < 10 ? "" : ", data never sent"));
2490 return true;
2491 }
2492
2493
SendIntraFrameRequest(bool rfc2032,bool pictureLoss)2494 void RTP_Session::SendIntraFrameRequest(bool rfc2032, bool pictureLoss)
2495 {
2496 PTRACE(3, "RTP\tSession " << sessionID << ", SendIntraFrameRequest using "
2497 << (rfc2032 ? "RFC2032" : (pictureLoss ? "RFC4585 PLI" : "RFC5104 FIR")));
2498
2499 // Create packet
2500 RTP_ControlFrame request;
2501 InsertReportPacket(request);
2502
2503 request.StartNewPacket();
2504
2505 if (rfc2032) {
2506 // Create packet
2507 request.SetPayloadType(RTP_ControlFrame::e_IntraFrameRequest);
2508 request.SetPayloadSize(4);
2509 // Insert SSRC
2510 request.SetCount(1);
2511 BYTE * payload = request.GetPayloadPtr();
2512 *(PUInt32b *)payload = syncSourceOut;
2513 }
2514 else {
2515 request.SetPayloadType(RTP_ControlFrame::e_PayloadSpecificFeedBack);
2516 if (pictureLoss)
2517 request.SetFbType(RTP_ControlFrame::e_PictureLossIndication, 0);
2518 else {
2519 request.SetFbType(RTP_ControlFrame::e_FullIntraRequest, sizeof(RTP_ControlFrame::FbFIR));
2520 RTP_ControlFrame::FbFIR * fir = (RTP_ControlFrame::FbFIR *)request.GetPayloadPtr();
2521 fir->requestSSRC = syncSourceIn;
2522 }
2523 RTP_ControlFrame::FbFCI * fci = (RTP_ControlFrame::FbFCI *)request.GetPayloadPtr();
2524 fci->senderSSRC = syncSourceOut;
2525 }
2526
2527 // Send it
2528 request.EndPacket();
2529 WriteControl(request);
2530 }
2531
2532
SendTemporalSpatialTradeOff(unsigned tradeOff)2533 void RTP_Session::SendTemporalSpatialTradeOff(unsigned tradeOff)
2534 {
2535 PTRACE(3, "RTP\tSession " << sessionID << ", SendTemporalSpatialTradeOff " << tradeOff);
2536
2537 RTP_ControlFrame request;
2538 InsertReportPacket(request);
2539
2540 request.StartNewPacket();
2541 request.SetPayloadType(RTP_ControlFrame::e_PayloadSpecificFeedBack);
2542 request.SetFbType(RTP_ControlFrame::e_TemporalSpatialTradeOffRequest, sizeof(RTP_ControlFrame::FbTSTO));
2543 RTP_ControlFrame::FbTSTO * tsto = (RTP_ControlFrame::FbTSTO *)request.GetPayloadPtr();
2544 tsto->requestSSRC = syncSourceIn;
2545 tsto->tradeOff = (BYTE)tradeOff;
2546
2547 // Send it
2548 request.EndPacket();
2549 WriteControl(request);
2550 }
2551
2552
SetEncoding(const PString & newEncoding)2553 void RTP_Session::SetEncoding(const PString & newEncoding)
2554 {
2555 {
2556 PWaitAndSignal m(m_encodingMutex);
2557
2558 if (newEncoding == m_encoding)
2559 return;
2560
2561 RTP_Encoding * newHandler = PFactory<RTP_Encoding>::CreateInstance(newEncoding);
2562 if (newHandler == NULL) {
2563 PTRACE(2, "RTP\tUnable to identify new RTP format '" << newEncoding << "' - retaining old format '" << m_encoding << "'");
2564 return;
2565 }
2566
2567 if (m_encodingHandler != NULL) {
2568 --m_encodingHandler->refCount;
2569 if (m_encodingHandler->refCount == 0)
2570 delete m_encodingHandler;
2571 m_encodingHandler = NULL;
2572 }
2573
2574 PTRACE_IF(2, !m_encoding.IsEmpty(), "RTP\tChanged RTP session format from '" << m_encoding << "' to '" << newEncoding << "'");
2575
2576 m_encoding = newEncoding;
2577 m_encodingHandler = newHandler;
2578 }
2579
2580 ClearStatistics();
2581
2582 EncodingLock(*this)->OnStart(*this);
2583 }
2584
2585 /////////////////////////////////////////////////////////////////////////////
2586
EncodingLock(RTP_Session & _session)2587 RTP_Session::EncodingLock::EncodingLock(RTP_Session & _session)
2588 : session(_session)
2589 {
2590 PWaitAndSignal m(session.m_encodingMutex);
2591
2592 m_encodingHandler = session.m_encodingHandler;
2593 ++m_encodingHandler->refCount;
2594 }
2595
~EncodingLock()2596 RTP_Session::EncodingLock::~EncodingLock()
2597 {
2598 PWaitAndSignal m(session.m_encodingMutex);
2599
2600 --m_encodingHandler->refCount;
2601 if (m_encodingHandler->refCount == 0)
2602 delete m_encodingHandler;
2603 }
2604
2605
2606 /////////////////////////////////////////////////////////////////////////////
2607
RTP_Encoding()2608 RTP_Encoding::RTP_Encoding()
2609 {
2610 refCount = 1;
2611 }
2612
~RTP_Encoding()2613 RTP_Encoding::~RTP_Encoding()
2614 {
2615 OnFinish();
2616 }
2617
2618
OnStart(RTP_Session & _rtpSession)2619 void RTP_Encoding::OnStart(RTP_Session & _rtpSession)
2620 {
2621 //rtpSession = &_rtpSession;
2622 rtpUDP = (RTP_UDP *)&_rtpSession;
2623 }
2624
OnFinish()2625 void RTP_Encoding::OnFinish()
2626 {
2627 }
2628
OnSendData(RTP_DataFrame & frame)2629 RTP_Session::SendReceiveStatus RTP_Encoding::OnSendData(RTP_DataFrame & frame)
2630 {
2631 return rtpUDP->Internal_OnSendData(frame);
2632 }
2633
WriteData(RTP_DataFrame & frame,bool)2634 PBoolean RTP_Encoding::WriteData(RTP_DataFrame & frame, bool)
2635 {
2636 return rtpUDP->Internal_WriteData(frame);
2637 }
2638
OnSendControl(RTP_ControlFrame & frame,PINDEX & len)2639 RTP_Session::SendReceiveStatus RTP_Encoding::OnSendControl(RTP_ControlFrame & frame, PINDEX & len)
2640 {
2641 return rtpUDP->Internal_OnSendControl(frame, len);
2642 }
2643
WriteDataPDU(RTP_DataFrame & frame)2644 bool RTP_Encoding::WriteDataPDU(RTP_DataFrame & frame)
2645 {
2646 return rtpUDP->WriteDataOrControlPDU(frame.GetPointer(), frame.GetHeaderSize()+frame.GetPayloadSize(), true);
2647 }
2648
ReadDataPDU(RTP_DataFrame & frame)2649 RTP_Session::SendReceiveStatus RTP_Encoding::ReadDataPDU(RTP_DataFrame & frame)
2650 {
2651 return rtpUDP->Internal_ReadDataPDU(frame);
2652 }
2653
OnReceiveData(RTP_DataFrame & frame)2654 RTP_Session::SendReceiveStatus RTP_Encoding::OnReceiveData(RTP_DataFrame & frame)
2655 {
2656 return rtpUDP->Internal_OnReceiveData(frame);
2657 }
2658
OnReadTimeout(RTP_DataFrame & frame)2659 RTP_Session::SendReceiveStatus RTP_Encoding::OnReadTimeout(RTP_DataFrame & frame)
2660 {
2661 return rtpUDP->Internal_OnReadTimeout(frame);
2662 }
2663
ReadData(RTP_DataFrame & frame)2664 PBoolean RTP_Encoding::ReadData(RTP_DataFrame & frame)
2665 {
2666 return rtpUDP->Internal_ReadData(frame);
2667 }
2668
WaitForPDU(PUDPSocket & dataSocket,PUDPSocket & controlSocket,const PTimeInterval & t)2669 int RTP_Encoding::WaitForPDU(PUDPSocket & dataSocket, PUDPSocket & controlSocket, const PTimeInterval & t)
2670 {
2671 return rtpUDP->Internal_WaitForPDU(dataSocket, controlSocket, t);
2672 }
2673
2674 /////////////////////////////////////////////////////////////////////////////
2675
SecureRTP_UDP(const Params & params)2676 SecureRTP_UDP::SecureRTP_UDP(const Params & params)
2677 : RTP_UDP(params)
2678 {
2679 securityParms = NULL;
2680 }
2681
~SecureRTP_UDP()2682 SecureRTP_UDP::~SecureRTP_UDP()
2683 {
2684 delete securityParms;
2685 }
2686
SetSecurityMode(OpalSecurityMode * newParms)2687 void SecureRTP_UDP::SetSecurityMode(OpalSecurityMode * newParms)
2688 {
2689 if (securityParms != NULL)
2690 delete securityParms;
2691 securityParms = newParms;
2692 }
2693
GetSecurityParms() const2694 OpalSecurityMode * SecureRTP_UDP::GetSecurityParms() const
2695 {
2696 return securityParms;
2697 }
2698
2699 /////////////////////////////////////////////////////////////////////////////
2700