1 /*
2  * rtp.cxx
3  *
4  * RTP protocol handler
5  *
6  * Open H323 Library
7  *
8  * Copyright (c) 1998-2000 Equivalence Pty. Ltd.
9  *
10  * The contents of this file are subject to the Mozilla Public License
11  * Version 1.0 (the "License"); you may not use this file except in
12  * compliance with the License. You may obtain a copy of the License at
13  * http://www.mozilla.org/MPL/
14  *
15  * Software distributed under the License is distributed on an "AS IS"
16  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
17  * the License for the specific language governing rights and limitations
18  * under the License.
19  *
20  * The Original Code is Open H323 Library.
21  *
22  * The Initial Developer of the Original Code is Equivalence Pty. Ltd.
23  *
24  * Portions of this code were written with the assisance of funding from
25  * Vovida Networks, Inc. http://www.vovida.com.
26  *
27  * Contributor(s): ______________________________________.
28  *
29  * $Revision: 29118 $
30  * $Author: rjongbloed $
31  * $Date: 2013-02-15 20:10:30 -0600 (Fri, 15 Feb 2013) $
32  */
33 
34 #include <ptlib.h>
35 
36 #ifdef __GNUC__
37 #pragma implementation "rtp.h"
38 #endif
39 
40 #include <opal/buildopts.h>
41 
42 #include <rtp/rtp.h>
43 
44 #include <rtp/jitter.h>
45 
46 #include <rtp/metrics.h>
47 
48 #include <ptclib/random.h>
49 #include <ptclib/pstun.h>
50 #include <opal/rtpconn.h>
51 
52 #include <algorithm>
53 
54 #define new PNEW
55 
56 #define BAD_TRANSMIT_TIME_MAX 10    //  maximum of seconds of transmit fails before session is killed
57 
58 const unsigned SecondsFrom1900to1970 = (70*365+17)*24*60*60U;
59 
60 #define RTP_VIDEO_RX_BUFFER_SIZE 0x100000 // 1Mb
61 #define RTP_AUDIO_RX_BUFFER_SIZE 0x4000   // 16kb
62 #define RTP_DATA_TX_BUFFER_SIZE  0x2000   // 8kb
63 #define RTP_CTRL_BUFFER_SIZE     0x1000   // 4kb
64 
65 
66 PFACTORY_CREATE(PFactory<RTP_Encoding>, RTP_Encoding, "rtp/avp", false);
67 
68 
69 /////////////////////////////////////////////////////////////////////////////
70 
RTP_DataFrame(PINDEX payloadSz,PINDEX bufferSz)71 RTP_DataFrame::RTP_DataFrame(PINDEX payloadSz, PINDEX bufferSz)
72   : PBYTEArray(std::max(bufferSz, MinHeaderSize+payloadSz))
73   , m_headerSize(MinHeaderSize)
74   , m_payloadSize(payloadSz)
75   , m_paddingSize(0)
76 {
77   theArray[0] = '\x80'; // Default to version 2
78   theArray[1] = '\x7f'; // Default to MaxPayloadType
79 }
80 
81 
RTP_DataFrame(const BYTE * data,PINDEX len,PBoolean dynamic)82 RTP_DataFrame::RTP_DataFrame(const BYTE * data, PINDEX len, PBoolean dynamic)
83   : PBYTEArray(data, len, dynamic)
84   , m_headerSize(MinHeaderSize)
85   , m_payloadSize(0)
86   , m_paddingSize(0)
87 {
88   SetPacketSize(len);
89 }
90 
91 
SetPacketSize(PINDEX sz)92 bool RTP_DataFrame::SetPacketSize(PINDEX sz)
93 {
94   if (sz < RTP_DataFrame::MinHeaderSize) {
95     PTRACE(2, "RTP\tInvalid RTP packet, "
96               "smaller than minimum header size, " << sz << " < " << RTP_DataFrame::MinHeaderSize);
97     m_payloadSize = m_paddingSize = 0;
98     return false;
99   }
100 
101   m_headerSize = MinHeaderSize + 4*GetContribSrcCount();
102 
103   if (GetExtension())
104     m_headerSize += (GetExtensionSizeDWORDs()+1)*4;
105 
106   if (sz < m_headerSize) {
107     PTRACE(2, "RTP\tInvalid RTP packet, "
108               "smaller than indicated header size, " << sz << " < " << m_headerSize);
109     m_payloadSize = m_paddingSize = 0;
110     return false;
111   }
112 
113   if (!GetPadding()) {
114     m_payloadSize = sz - m_headerSize;
115     return true;
116   }
117 
118   /* We do this as some systems send crap at the end of the packet, giving
119      incorrect results for the padding size. So we do a sanity check that
120      the indicating padding size is not larger than the payload itself. Not
121      100% accurate, but you do whatever you can.
122    */
123   PINDEX pos = sz;
124   do {
125     if (pos-- <= m_headerSize) {
126       PTRACE(2, "RTP\tInvalid RTP packet, padding indicated but not enough data, "
127                 "size=" << sz << ", header=" << m_headerSize);
128       m_payloadSize = m_paddingSize = 0;
129       return false;
130     }
131 
132     m_paddingSize = theArray[pos] & 0xff;
133   } while (m_paddingSize > (pos-m_headerSize));
134 
135   m_payloadSize = pos - m_headerSize - 1;
136 
137   return true;
138 }
139 
140 
SetExtension(PBoolean ext)141 void RTP_DataFrame::SetExtension(PBoolean ext)
142 {
143   if (ext)
144     theArray[0] |= 0x10;
145   else
146     theArray[0] &= 0xef;
147 }
148 
149 
SetMarker(PBoolean m)150 void RTP_DataFrame::SetMarker(PBoolean m)
151 {
152   if (m)
153     theArray[1] |= 0x80;
154   else
155     theArray[1] &= 0x7f;
156 }
157 
158 
SetPayloadType(PayloadTypes t)159 void RTP_DataFrame::SetPayloadType(PayloadTypes t)
160 {
161   PAssert(t <= 0x7f, PInvalidParameter);
162 
163   theArray[1] &= 0x80;
164   theArray[1] |= t;
165 }
166 
167 
GetContribSource(PINDEX idx) const168 DWORD RTP_DataFrame::GetContribSource(PINDEX idx) const
169 {
170   PAssert(idx < GetContribSrcCount(), PInvalidParameter);
171   return ((PUInt32b *)&theArray[MinHeaderSize])[idx];
172 }
173 
174 
SetContribSource(PINDEX idx,DWORD src)175 void RTP_DataFrame::SetContribSource(PINDEX idx, DWORD src)
176 {
177   PAssert(idx <= 15, PInvalidParameter);
178 
179   if (idx >= GetContribSrcCount()) {
180     BYTE * oldPayload = GetPayloadPtr();
181     theArray[0] &= 0xf0;
182     theArray[0] |= idx+1;
183     m_headerSize += 4;
184     PINDEX sz = m_payloadSize + m_paddingSize;
185     SetMinSize(m_headerSize+sz);
186     memmove(GetPayloadPtr(), oldPayload, sz);
187   }
188 
189   ((PUInt32b *)&theArray[MinHeaderSize])[idx] = src;
190 }
191 
192 
GetExtensionType() const193 int RTP_DataFrame::GetExtensionType() const
194 {
195   if (GetExtension())
196     return *(PUInt16b *)&theArray[MinHeaderSize + 4*GetContribSrcCount()];
197 
198   return -1;
199 }
200 
201 
SetExtensionType(int type)202 void RTP_DataFrame::SetExtensionType(int type)
203 {
204   if (type < 0)
205     SetExtension(false);
206   else {
207     if (!GetExtension())
208       SetExtensionSizeDWORDs(0);
209     *(PUInt16b *)&theArray[MinHeaderSize + 4*GetContribSrcCount()] = (WORD)type;
210   }
211 }
212 
213 
GetExtensionSizeDWORDs() const214 PINDEX RTP_DataFrame::GetExtensionSizeDWORDs() const
215 {
216   if (GetExtension())
217     return *(PUInt16b *)&theArray[MinHeaderSize + 4*GetContribSrcCount() + 2];
218 
219   return 0;
220 }
221 
222 
SetExtensionSizeDWORDs(PINDEX sz)223 PBoolean RTP_DataFrame::SetExtensionSizeDWORDs(PINDEX sz)
224 {
225   m_headerSize = MinHeaderSize + 4*GetContribSrcCount() + (sz+1)*4;
226   if (!SetMinSize(m_headerSize+m_payloadSize+m_paddingSize))
227     return false;
228 
229   SetExtension(true);
230   *(PUInt16b *)&theArray[MinHeaderSize + 4*GetContribSrcCount() + 2] = (WORD)sz;
231   return true;
232 }
233 
234 
GetExtensionPtr() const235 BYTE * RTP_DataFrame::GetExtensionPtr() const
236 {
237   if (GetExtension())
238     return (BYTE *)&theArray[MinHeaderSize + 4*GetContribSrcCount() + 4];
239 
240   return NULL;
241 }
242 
243 
SetPayloadSize(PINDEX sz)244 bool RTP_DataFrame::SetPayloadSize(PINDEX sz)
245 {
246   m_payloadSize = sz;
247   return SetMinSize(m_headerSize+m_payloadSize+m_paddingSize);
248 }
249 
250 
SetPaddingSize(PINDEX sz)251 bool RTP_DataFrame::SetPaddingSize(PINDEX sz)
252 {
253   m_paddingSize = sz;
254   return SetMinSize(m_headerSize+m_payloadSize+m_paddingSize);
255 }
256 
257 
PrintOn(ostream & strm) const258 void RTP_DataFrame::PrintOn(ostream & strm) const
259 {
260   strm <<  "V="  << GetVersion()
261        << " X="  << GetExtension()
262        << " M="  << GetMarker()
263        << " PT=" << GetPayloadType()
264        << " SN=" << GetSequenceNumber()
265        << " TS=" << GetTimestamp()
266        << " SSRC=" << hex << GetSyncSource() << dec
267        << " size=" << GetPayloadSize()
268        << '\n';
269 
270   int csrcCount = GetContribSrcCount();
271   for (int csrc = 0; csrc < csrcCount; csrc++)
272     strm << "  CSRC[" << csrc << "]=" << GetContribSource(csrc) << '\n';
273 
274   if (GetExtension())
275     strm << "  Header Extension Type: " << GetExtensionType() << '\n'
276          << hex << setfill('0') << PBYTEArray(GetExtensionPtr(), GetExtensionSizeDWORDs()*4, false) << setfill(' ') << dec << '\n';
277 
278   strm << hex << setfill('0') << PBYTEArray(GetPayloadPtr(), GetPayloadSize(), false) << setfill(' ') << dec;
279 }
280 
281 
282 #if PTRACING
283 static const char * const PayloadTypesNames[RTP_DataFrame::LastKnownPayloadType] = {
284   "PCMU",
285   "FS1016",
286   "G721",
287   "GSM",
288   "G723",
289   "DVI4_8k",
290   "DVI4_16k",
291   "LPC",
292   "PCMA",
293   "G722",
294   "L16_Stereo",
295   "L16_Mono",
296   "G723",
297   "CN",
298   "MPA",
299   "G728",
300   "DVI4_11k",
301   "DVI4_22k",
302   "G729",
303   "CiscoCN",
304   NULL, NULL, NULL, NULL, NULL,
305   "CelB",
306   "JPEG",
307   NULL, NULL, NULL, NULL,
308   "H261",
309   "MPV",
310   "MP2T",
311   "H263",
312   NULL, NULL, NULL,
313   "T38"
314 };
315 
operator <<(ostream & o,RTP_DataFrame::PayloadTypes t)316 ostream & operator<<(ostream & o, RTP_DataFrame::PayloadTypes t)
317 {
318   if ((PINDEX)t < PARRAYSIZE(PayloadTypesNames) && PayloadTypesNames[t] != NULL)
319     o << PayloadTypesNames[t];
320   else
321     o << "[pt=" << (int)t << ']';
322   return o;
323 }
324 
325 #endif
326 
327 
328 /////////////////////////////////////////////////////////////////////////////
329 
RTP_ControlFrame(PINDEX sz)330 RTP_ControlFrame::RTP_ControlFrame(PINDEX sz)
331   : PBYTEArray(sz)
332 {
333   compoundOffset = 0;
334   payloadSize = 0;
335 }
336 
Reset(PINDEX size)337 void RTP_ControlFrame::Reset(PINDEX size)
338 {
339   SetSize(size);
340   compoundOffset = 0;
341   payloadSize = 0;
342 }
343 
344 
SetCount(unsigned count)345 void RTP_ControlFrame::SetCount(unsigned count)
346 {
347   PAssert(count < 32, PInvalidParameter);
348   theArray[compoundOffset] &= 0xe0;
349   theArray[compoundOffset] |= count;
350 }
351 
352 
SetFbType(unsigned type,PINDEX fciSize)353 void RTP_ControlFrame::SetFbType(unsigned type, PINDEX fciSize)
354 {
355   PAssert(type < 32, PInvalidParameter);
356   theArray[compoundOffset] &= 0xe0;
357   theArray[compoundOffset] |= type;
358   SetPayloadSize(fciSize+8);
359 }
360 
361 
SetPayloadType(unsigned t)362 void RTP_ControlFrame::SetPayloadType(unsigned t)
363 {
364   PAssert(t < 256, PInvalidParameter);
365   theArray[compoundOffset+1] = (BYTE)t;
366 }
367 
GetCompoundSize() const368 PINDEX RTP_ControlFrame::GetCompoundSize() const
369 {
370   // transmitted length is the offset of the last compound block
371   // plus the compound length of the last block
372   return compoundOffset + *(PUInt16b *)&theArray[compoundOffset+2]*4;
373 }
374 
SetPayloadSize(PINDEX sz)375 void RTP_ControlFrame::SetPayloadSize(PINDEX sz)
376 {
377   payloadSize = sz;
378 
379   // compound size is in words, rounded up to nearest word
380   PINDEX compoundSize = (payloadSize + 3) & ~3;
381   PAssert(compoundSize <= 0xffff, PInvalidParameter);
382 
383   // make sure buffer is big enough for previous packets plus packet header plus payload
384   SetMinSize(compoundOffset + 4 + 4*(compoundSize));
385 
386   // put the new compound size into the packet (always at offset 2)
387   *(PUInt16b *)&theArray[compoundOffset+2] = (WORD)(compoundSize / 4);
388 }
389 
GetPayloadPtr() const390 BYTE * RTP_ControlFrame::GetPayloadPtr() const
391 {
392   // payload for current packet is always one DWORD after the current compound start
393   if ((GetPayloadSize() == 0) || ((compoundOffset + 4) >= GetSize()))
394     return NULL;
395   return (BYTE *)(theArray + compoundOffset + 4);
396 }
397 
ReadNextPacket()398 PBoolean RTP_ControlFrame::ReadNextPacket()
399 {
400   // skip over current packet
401   compoundOffset += GetPayloadSize() + 4;
402 
403   // see if another packet is feasible
404   if (compoundOffset + 4 > GetSize())
405     return false;
406 
407   // check if payload size for new packet is legal
408   return compoundOffset + GetPayloadSize() + 4 <= GetSize();
409 }
410 
411 
StartNewPacket()412 PBoolean RTP_ControlFrame::StartNewPacket()
413 {
414   // allocate storage for new packet header
415   if (!SetMinSize(compoundOffset + 4))
416     return false;
417 
418   theArray[compoundOffset] = '\x80'; // Set version 2
419   theArray[compoundOffset+1] = 0;    // Set payload type to illegal
420   theArray[compoundOffset+2] = 0;    // Set payload size to zero
421   theArray[compoundOffset+3] = 0;
422 
423   // payload is now zero bytes
424   payloadSize = 0;
425   SetPayloadSize(payloadSize);
426 
427   return true;
428 }
429 
EndPacket()430 void RTP_ControlFrame::EndPacket()
431 {
432   // all packets must align to DWORD boundaries
433   while (((4 + payloadSize) & 3) != 0) {
434     theArray[compoundOffset + 4 + payloadSize - 1] = 0;
435     ++payloadSize;
436   }
437 
438   compoundOffset += 4 + payloadSize;
439   payloadSize = 0;
440 }
441 
StartSourceDescription(DWORD src)442 void RTP_ControlFrame::StartSourceDescription(DWORD src)
443 {
444   // extend payload to include SSRC + END
445   SetPayloadSize(payloadSize + 4 + 1);
446   SetPayloadType(RTP_ControlFrame::e_SourceDescription);
447   SetCount(GetCount()+1); // will be incremented automatically
448 
449   // get ptr to new item SDES
450   BYTE * payload = GetPayloadPtr();
451   *(PUInt32b *)payload = src;
452   payload[4] = e_END;
453 }
454 
455 
AddSourceDescriptionItem(unsigned type,const PString & data)456 void RTP_ControlFrame::AddSourceDescriptionItem(unsigned type, const PString & data)
457 {
458   // get ptr to new item, remembering that END was inserted previously
459   BYTE * payload = GetPayloadPtr() + payloadSize - 1;
460 
461   // length of new item
462   PINDEX dataLength = data.GetLength();
463 
464   // add storage for new item (note that END has already been included)
465   SetPayloadSize(payloadSize + 1 + 1 + dataLength);
466 
467   // insert new item
468   payload[0] = (BYTE)type;
469   payload[1] = (BYTE)dataLength;
470   memcpy(payload+2, (const char *)data, dataLength);
471 
472   // insert new END
473   payload[2+dataLength] = (BYTE)e_END;
474 }
475 
476 
SetLostPackets(unsigned packets)477 void RTP_ControlFrame::ReceiverReport::SetLostPackets(unsigned packets)
478 {
479   lost[0] = (BYTE)(packets >> 16);
480   lost[1] = (BYTE)(packets >> 8);
481   lost[2] = (BYTE)packets;
482 }
483 
484 
485 ///////////////////////////////////////////////////////////////////////////////
486 
487 #if OPAL_STATISTICS
488 
OpalMediaStatistics()489 OpalMediaStatistics::OpalMediaStatistics()
490   : m_totalBytes(0)
491   , m_totalPackets(0)
492   , m_packetsLost(0)
493   , m_packetsOutOfOrder(0)
494   , m_packetsTooLate(0)
495   , m_packetOverruns(0)
496   , m_minimumPacketTime(0)
497   , m_averagePacketTime(0)
498   , m_maximumPacketTime(0)
499 
500     // Audio
501   , m_averageJitter(0)
502   , m_maximumJitter(0)
503   , m_jitterBufferDelay(0)
504 
505     // Video
506   , m_totalFrames(0)
507   , m_keyFrames(0)
508 {
509 }
510 
511 #if OPAL_FAX
Fax()512 OpalMediaStatistics::Fax::Fax()
513   : m_result(OpalMediaStatistics::FaxNotStarted)
514   , m_phase(' ')
515   , m_bitRate(9600)
516   , m_compression(FaxCompressionUnknown)
517   , m_errorCorrection(false)
518   , m_txPages(-1)
519   , m_rxPages(-1)
520   , m_totalPages(0)
521   , m_imageSize(0)
522   , m_resolutionX(0)
523   , m_resolutionY(0)
524   , m_pageWidth(0)
525   , m_pageHeight(0)
526   , m_badRows(0)
527   , m_mostBadRows(0)
528   , m_errorCorrectionRetries(0)
529 {
530 }
531 
operator <<(ostream & strm,OpalMediaStatistics::FaxCompression compression)532 ostream & operator<<(ostream & strm, OpalMediaStatistics::FaxCompression compression)
533 {
534   static const char * const Names[] = { "N/A", "T.4 1d", "T.4 2d", "T.6" };
535   if (compression >= 0 && compression < PARRAYSIZE(Names))
536     strm << Names[compression];
537   else
538     strm << (unsigned)compression;
539   return strm;
540 }
541 #endif
542 
543 #endif
544 
545 /////////////////////////////////////////////////////////////////////////////
546 
OnTxStatistics(const RTP_Session &) const547 void RTP_UserData::OnTxStatistics(const RTP_Session & /*session*/) const
548 {
549 }
550 
551 
OnRxStatistics(const RTP_Session &) const552 void RTP_UserData::OnRxStatistics(const RTP_Session & /*session*/) const
553 {
554 }
555 
SessionFailing(RTP_Session &)556 void RTP_UserData::SessionFailing(RTP_Session & /*session*/)
557 {
558 }
559 
560 #if OPAL_VIDEO
OnRxIntraFrameRequest(const RTP_Session &) const561 void RTP_UserData::OnRxIntraFrameRequest(const RTP_Session & /*session*/) const
562 {
563 }
564 
OnTxIntraFrameRequest(const RTP_Session &) const565 void RTP_UserData::OnTxIntraFrameRequest(const RTP_Session & /*session*/) const
566 {
567 }
568 #endif
569 
570 
571 /////////////////////////////////////////////////////////////////////////////
572 
573 #if P_CONFIG_FILE
GetDefaultOutOfOrderWaitTime()574 static PTimeInterval GetDefaultOutOfOrderWaitTime()
575 {
576   static PTimeInterval ooowt(PConfig(PConfig::Environment).GetInteger("OPAL_RTP_OUT_OF_ORDER_TIME", 100));
577   return ooowt;
578 }
579 #else
580 #define GetDefaultOutOfOrderWaitTime() (100)
581 #endif
582 
RTP_Session(const Params & params)583 RTP_Session::RTP_Session(const Params & params)
584   : m_timeUnits(params.isAudio ? 8 : 90)
585   , canonicalName(PProcess::Current().GetUserName())
586   , toolName(PProcess::Current().GetName())
587   , lastSRTimestamp(0)
588   , lastSRReceiveTime(0)
589   , outOfOrderWaitTime(GetDefaultOutOfOrderWaitTime())
590   , firstPacketSent(0)
591   , firstPacketReceived(0)
592 #if OPAL_RTCP_XR
593   , m_metrics(NULL)
594 #endif
595   , m_reportTimer(0, 12)  // Seconds
596   , failed(false)
597 {
598   PAssert(params.id > 0, PInvalidParameter);
599   sessionID = params.id;
600   isAudio = params.isAudio;
601 
602   userData = params.userData;
603   autoDeleteUserData = params.autoDelete;
604 
605   ignorePayloadTypeChanges = true;
606   syncSourceOut = PRandom::Number();
607 
608   timeStampOffs = 0;
609   oobTimeStampBaseEstablished = false;
610   lastSentPacketTime = PTimer::Tick();
611 
612   syncSourceIn = 0;
613   allowAnySyncSource = true;
614   allowOneSyncSourceChange = false;
615   allowRemoteTransmitAddressChange = false;
616   allowSequenceChange = false;
617   txStatisticsInterval = 100;  // Number of data packets between tx reports
618   rxStatisticsInterval = 100;  // Number of data packets between rx reports
619   lastSentSequenceNumber = (WORD)PRandom::Number();
620   expectedSequenceNumber = 0;
621   lastRRSequenceNumber = 0;
622   resequenceOutOfOrderPackets = true;
623   senderReportsReceived = 0;
624   consecutiveOutOfOrderPackets = 0;
625 
626   ClearStatistics();
627 
628   lastReceivedPayloadType = RTP_DataFrame::IllegalPayloadType;
629 
630   closeOnBye = false;
631   byeSent    = false;
632 
633   lastSentTimestamp = 0;  // should be calculated, but we'll settle for initialising it
634 
635   m_encodingHandler = NULL;
636   SetEncoding(params.encoding);
637 
638   m_reportTimer.SetNotifier(PCREATE_NOTIFIER(SendReport));
639 }
640 
641 
~RTP_Session()642 RTP_Session::~RTP_Session()
643 {
644   m_reportTimer.Stop(true);
645 
646 #if OPAL_RTCP_XR
647   delete m_metrics;
648 #endif
649 
650 #if PTRACING
651   PTime now;
652   int sentDuration = (now-firstPacketSent).GetSeconds();
653   if (sentDuration == 0)
654     sentDuration = 1;
655   int receiveDuration = (now-firstPacketReceived).GetSeconds();
656   if (receiveDuration == 0)
657     receiveDuration = 1;
658  #endif
659   PTRACE_IF(3, packetsSent != 0 || packetsReceived != 0,
660       "RTP\tSession " << sessionID << ", final statistics:\n"
661       "    firstPacketSent    = " << firstPacketSent << "\n"
662       "    packetsSent        = " << packetsSent << "\n"
663       "    octetsSent         = " << octetsSent << "\n"
664       "    bitRateSent        = " << (8*octetsSent/sentDuration) << "\n"
665       "    averageSendTime    = " << averageSendTime << "\n"
666       "    maximumSendTime    = " << maximumSendTime << "\n"
667       "    minimumSendTime    = " << minimumSendTime << "\n"
668       "    packetsLostByRemote= " << packetsLostByRemote << "\n"
669       "    jitterLevelOnRemote= " << jitterLevelOnRemote << "\n"
670       "    firstPacketReceived= " << firstPacketReceived << "\n"
671       "    packetsReceived    = " << packetsReceived << "\n"
672       "    octetsReceived     = " << octetsReceived << "\n"
673       "    bitRateReceived    = " << (8*octetsReceived/receiveDuration) << "\n"
674       "    packetsLost        = " << packetsLost << "\n"
675       "    packetsTooLate     = " << GetPacketsTooLate() << "\n"
676       "    packetOverruns     = " << GetPacketOverruns() << "\n"
677       "    packetsOutOfOrder  = " << packetsOutOfOrder << "\n"
678       "    averageReceiveTime = " << averageReceiveTime << "\n"
679       "    maximumReceiveTime = " << maximumReceiveTime << "\n"
680       "    minimumReceiveTime = " << minimumReceiveTime << "\n"
681       "    averageJitter      = " << GetAvgJitterTime() << "\n"
682       "    maximumJitter      = " << GetMaxJitterTime()
683    );
684   if (autoDeleteUserData)
685     delete userData;
686   delete m_encodingHandler;
687 }
688 
689 
ClearStatistics()690 void RTP_Session::ClearStatistics()
691 {
692   firstPacketSent.SetTimestamp(0);
693   packetsSent = 0;
694   rtcpPacketsSent = 0;
695   octetsSent = 0;
696   firstPacketReceived.SetTimestamp(0);
697   packetsReceived = 0;
698   octetsReceived = 0;
699   packetsLost = 0;
700   packetsLostByRemote = 0;
701   packetsOutOfOrder = 0;
702   averageSendTime = 0;
703   maximumSendTime = 0;
704   minimumSendTime = 0;
705   averageReceiveTime = 0;
706   maximumReceiveTime = 0;
707   minimumReceiveTime = 0;
708   jitterLevel = 0;
709   maximumJitterLevel = 0;
710   jitterLevelOnRemote = 0;
711   markerRecvCount = 0;
712   markerSendCount = 0;
713 
714   txStatisticsCount = 0;
715   rxStatisticsCount = 0;
716   averageSendTimeAccum = 0;
717   maximumSendTimeAccum = 0;
718   minimumSendTimeAccum = 0xffffffff;
719   averageReceiveTimeAccum = 0;
720   maximumReceiveTimeAccum = 0;
721   minimumReceiveTimeAccum = 0xffffffff;
722   packetsLostSinceLastRR = 0;
723   lastTransitTime = 0;
724 }
725 
726 
SendBYE()727 void RTP_Session::SendBYE()
728 {
729   {
730     PWaitAndSignal mutex(dataMutex);
731     if (byeSent)
732       return;
733 
734     byeSent = true;
735   }
736 
737   RTP_ControlFrame report;
738   InsertReportPacket(report);
739 
740   static char const ReasonStr[] = "Session ended";
741   static size_t ReasonLen = sizeof(ReasonStr);
742 
743   // insert BYE
744   report.StartNewPacket();
745   report.SetPayloadType(RTP_ControlFrame::e_Goodbye);
746   report.SetPayloadSize(4+1+ReasonLen);  // length is SSRC + ReasonLen + reason
747 
748   BYTE * payload = report.GetPayloadPtr();
749 
750   // one SSRC
751   report.SetCount(1);
752   *(PUInt32b *)payload = syncSourceOut;
753 
754   // insert reason
755   payload[4] = (BYTE)ReasonLen;
756   memcpy((char *)(payload+5), ReasonStr, ReasonLen);
757 
758   report.EndPacket();
759   WriteControl(report);
760 }
761 
GetCanonicalName() const762 PString RTP_Session::GetCanonicalName() const
763 {
764   PWaitAndSignal mutex(m_reportMutex);
765   PString s = canonicalName;
766   s.MakeUnique();
767   return s;
768 }
769 
770 
SetCanonicalName(const PString & name)771 void RTP_Session::SetCanonicalName(const PString & name)
772 {
773   PWaitAndSignal mutex(m_reportMutex);
774   canonicalName = name;
775   canonicalName.MakeUnique();
776 }
777 
778 
GetToolName() const779 PString RTP_Session::GetToolName() const
780 {
781   PWaitAndSignal mutex(m_reportMutex);
782   PString s = toolName;
783   s.MakeUnique();
784   return s;
785 }
786 
787 
SetToolName(const PString & name)788 void RTP_Session::SetToolName(const PString & name)
789 {
790   PWaitAndSignal mutex(m_reportMutex);
791   toolName = name;
792   toolName.MakeUnique();
793 }
794 
795 
SetUserData(RTP_UserData * data,PBoolean autoDelete)796 void RTP_Session::SetUserData(RTP_UserData * data, PBoolean autoDelete)
797 {
798   if (autoDeleteUserData)
799     delete userData;
800   userData = data;
801   autoDeleteUserData = autoDelete;
802 }
803 
804 
SetJitterBufferSize(unsigned minJitterDelay,unsigned maxJitterDelay,unsigned timeUnits,PINDEX packetSize)805 void RTP_Session::SetJitterBufferSize(unsigned minJitterDelay,
806                                       unsigned maxJitterDelay,
807                                       unsigned timeUnits,
808                                         PINDEX packetSize)
809 {
810   PWaitAndSignal mutex(dataMutex);
811 
812   if (timeUnits > 0)
813     m_timeUnits = timeUnits;
814 
815   if (minJitterDelay == 0 && maxJitterDelay == 0) {
816     PTRACE_IF(4, m_jitterBuffer != NULL, "RTP\tSwitching off jitter buffer " << *m_jitterBuffer);
817     // This can block waiting for JB thread to end, signal mutex to avoid deadlock
818     dataMutex.Signal();
819     m_jitterBuffer.SetNULL();
820     dataMutex.Wait();
821   }
822   else {
823     resequenceOutOfOrderPackets = false;
824     FlushData();
825     if (m_jitterBuffer != NULL) {
826       PTRACE(4, "RTP\tSetting jitter buffer time from " << minJitterDelay << " to " << maxJitterDelay);
827       m_jitterBuffer->SetDelay(minJitterDelay, maxJitterDelay, packetSize);
828     }
829     else {
830       m_jitterBuffer = new RTP_JitterBuffer(*this, minJitterDelay, maxJitterDelay, m_timeUnits, packetSize);
831       PTRACE(4, "RTP\tCreated RTP jitter buffer " << *m_jitterBuffer);
832       m_jitterBuffer->StartThread();
833     }
834   }
835 }
836 
837 
GetJitterBufferSize() const838 unsigned RTP_Session::GetJitterBufferSize() const
839 {
840   JitterBufferPtr jitter = m_jitterBuffer; // Increase reference count
841   return jitter != NULL ? jitter->GetCurrentJitterDelay() : 0;
842 }
843 
844 
ReadBufferedData(RTP_DataFrame & frame)845 PBoolean RTP_Session::ReadBufferedData(RTP_DataFrame & frame)
846 {
847   JitterBufferPtr jitter = m_jitterBuffer; // Increase reference count
848   if (jitter != NULL)
849     return jitter->ReadData(frame);
850 
851   if (m_outOfOrderPackets.empty())
852     return ReadData(frame);
853 
854   unsigned sequenceNumber = m_outOfOrderPackets.back().GetSequenceNumber();
855   if (sequenceNumber != expectedSequenceNumber) {
856     PTRACE(5, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
857            << ", still out of order packets, next "
858            << sequenceNumber << " expected " << expectedSequenceNumber);
859     return ReadData(frame);
860   }
861 
862   frame = m_outOfOrderPackets.back();
863   m_outOfOrderPackets.pop_back();
864   expectedSequenceNumber = (WORD)(sequenceNumber + 1);
865 
866   PTRACE(m_outOfOrderPackets.empty() ? 2 : 5,
867          "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn << ", resequenced "
868          << (m_outOfOrderPackets.empty() ? "last" : "next") << " out of order packet " << sequenceNumber);
869   return true;
870 }
871 
872 
FlushData()873 void RTP_Session::FlushData()
874 {
875 }
876 
877 
SetTxStatisticsInterval(unsigned packets)878 void RTP_Session::SetTxStatisticsInterval(unsigned packets)
879 {
880   txStatisticsInterval = PMAX(packets, 2);
881   txStatisticsCount = 0;
882   averageSendTimeAccum = 0;
883   maximumSendTimeAccum = 0;
884   minimumSendTimeAccum = 0xffffffff;
885 }
886 
887 
SetRxStatisticsInterval(unsigned packets)888 void RTP_Session::SetRxStatisticsInterval(unsigned packets)
889 {
890   rxStatisticsInterval = PMAX(packets, 2);
891   rxStatisticsCount = 0;
892   averageReceiveTimeAccum = 0;
893   maximumReceiveTimeAccum = 0;
894   minimumReceiveTimeAccum = 0xffffffff;
895 }
896 
897 
AddReceiverReport(RTP_ControlFrame::ReceiverReport & receiver)898 void RTP_Session::AddReceiverReport(RTP_ControlFrame::ReceiverReport & receiver)
899 {
900   receiver.ssrc = syncSourceIn;
901   receiver.SetLostPackets(GetPacketsLost()+GetPacketsTooLate());
902 
903   if (expectedSequenceNumber > lastRRSequenceNumber)
904     receiver.fraction = (BYTE)((packetsLostSinceLastRR<<8)/(expectedSequenceNumber - lastRRSequenceNumber));
905   else
906     receiver.fraction = 0;
907   packetsLostSinceLastRR = 0;
908 
909   receiver.last_seq = lastRRSequenceNumber;
910   lastRRSequenceNumber = expectedSequenceNumber;
911 
912   receiver.jitter = jitterLevel >> JitterRoundingGuardBits; // Allow for rounding protection bits
913 
914   if (senderReportsReceived > 0) {
915     // Calculate the last SR timestamp
916     PUInt32b lsr_ntp_sec  = (DWORD)(lastSRTimestamp.GetTimeInSeconds()+SecondsFrom1900to1970); // Convert from 1970 to 1900
917     PUInt32b lsr_ntp_frac = lastSRTimestamp.GetMicrosecond()*4294; // Scale microseconds to "fraction" from 0 to 2^32
918     receiver.lsr = (((lsr_ntp_sec << 16) & 0xFFFF0000) | ((lsr_ntp_frac >> 16) & 0x0000FFFF));
919 
920     // Calculate the delay since last SR
921     PTime now;
922     delaySinceLastSR = now - lastSRReceiveTime;
923     receiver.dlsr = (DWORD)(delaySinceLastSR.GetMilliSeconds()*65536/1000);
924   }
925   else {
926     receiver.lsr = 0;
927     receiver.dlsr = 0;
928   }
929 
930   PTRACE(3, "RTP\tSession " << sessionID << ", SentReceiverReport:"
931             " ssrc=" << receiver.ssrc
932          << " fraction=" << (unsigned)receiver.fraction
933          << " lost=" << receiver.GetLostPackets()
934          << " last_seq=" << receiver.last_seq
935          << " jitter=" << receiver.jitter
936          << " lsr=" << receiver.lsr
937          << " dlsr=" << receiver.dlsr);
938 }
939 
940 
OnSendData(RTP_DataFrame & frame)941 RTP_Session::SendReceiveStatus RTP_Session::OnSendData(RTP_DataFrame & frame)
942 {
943   return EncodingLock(*this)->OnSendData(frame);
944 }
945 
946 
Internal_OnSendData(RTP_DataFrame & frame)947 RTP_Session::SendReceiveStatus RTP_Session::Internal_OnSendData(RTP_DataFrame & frame)
948 {
949   PWaitAndSignal mutex(dataMutex);
950 
951   PTimeInterval tick = PTimer::Tick();  // Timestamp set now
952 
953   frame.SetSequenceNumber(++lastSentSequenceNumber);
954   frame.SetSyncSource(syncSourceOut);
955 
956   DWORD frameTimestamp = frame.GetTimestamp();
957 
958   // special handling for first packet
959   if (packetsSent == 0) {
960 
961     // establish timestamp offset
962     if (oobTimeStampBaseEstablished)  {
963       timeStampOffs = oobTimeStampOutBase - frameTimestamp + ((PTimer::Tick() - oobTimeStampBase).GetInterval() * m_timeUnits);
964       frameTimestamp += timeStampOffs;
965     }
966     else {
967       oobTimeStampBaseEstablished = true;
968       timeStampOffs               = 0;
969       oobTimeStampOutBase         = frameTimestamp;
970       oobTimeStampBase            = PTimer::Tick();
971     }
972 
973     firstPacketSent.SetCurrentTime();
974 
975     // display stuff
976     PTRACE(3, "RTP\tSession " << sessionID << ", first sent data:"
977               " ver=" << frame.GetVersion()
978            << " pt=" << frame.GetPayloadType()
979            << " psz=" << frame.GetPayloadSize()
980            << " m=" << frame.GetMarker()
981            << " x=" << frame.GetExtension()
982            << " seq=" << frame.GetSequenceNumber()
983            << " ts=" << frameTimestamp
984            << " src=" << hex << frame.GetSyncSource()
985            << " ccnt=" << frame.GetContribSrcCount() << dec);
986   }
987 
988   else {
989     // set timestamp
990     frameTimestamp += timeStampOffs;
991 
992     // reset OOB timestamp every marker bit
993     if (frame.GetMarker()) {
994       oobTimeStampOutBase = frameTimestamp;
995       oobTimeStampBase    = PTimer::Tick();
996     }
997 
998     // Only do statistics on subsequent packets
999     if ( ! (isAudio && frame.GetMarker()) ) {
1000       DWORD diff = (tick - lastSentPacketTime).GetInterval();
1001 
1002       averageSendTimeAccum += diff;
1003       if (diff > maximumSendTimeAccum)
1004         maximumSendTimeAccum = diff;
1005       if (diff < minimumSendTimeAccum)
1006         minimumSendTimeAccum = diff;
1007       txStatisticsCount++;
1008     }
1009   }
1010 
1011   frame.SetTimestamp(frameTimestamp);
1012   lastSentTimestamp = frameTimestamp;
1013   lastSentPacketTime = tick;
1014 
1015   octetsSent += frame.GetPayloadSize();
1016   packetsSent++;
1017 
1018   if (frame.GetMarker())
1019     markerSendCount++;
1020 
1021   // Call the statistics call-back on the first PDU with total count == 1
1022   if (packetsSent == 1 && userData != NULL)
1023     userData->OnTxStatistics(*this);
1024 
1025   if (txStatisticsCount < txStatisticsInterval)
1026     return e_ProcessPacket;
1027 
1028   txStatisticsCount = 0;
1029 
1030   averageSendTime = averageSendTimeAccum/txStatisticsInterval;
1031   maximumSendTime = maximumSendTimeAccum;
1032   minimumSendTime = minimumSendTimeAccum;
1033 
1034   averageSendTimeAccum = 0;
1035   maximumSendTimeAccum = 0;
1036   minimumSendTimeAccum = 0xffffffff;
1037 
1038   PTRACE(3, "RTP\tSession " << sessionID << ", transmit statistics: "
1039    " packets=" << packetsSent <<
1040    " octets=" << octetsSent <<
1041    " avgTime=" << averageSendTime <<
1042    " maxTime=" << maximumSendTime <<
1043    " minTime=" << minimumSendTime
1044   );
1045 
1046   if (userData != NULL)
1047     userData->OnTxStatistics(*this);
1048 
1049   return e_ProcessPacket;
1050 }
1051 
OnSendControl(RTP_ControlFrame & frame,PINDEX & len)1052 RTP_Session::SendReceiveStatus RTP_Session::OnSendControl(RTP_ControlFrame & frame, PINDEX & len)
1053 {
1054   return EncodingLock(*this)->OnSendControl(frame, len);
1055 }
1056 
1057 #if OPAL_VIDEO
Internal_OnSendControl(RTP_ControlFrame & frame,PINDEX &)1058 RTP_Session::SendReceiveStatus RTP_Session::Internal_OnSendControl(RTP_ControlFrame & frame, PINDEX & /*len*/)
1059 {
1060   rtcpPacketsSent++;
1061 
1062   if(frame.GetPayloadType() == RTP_ControlFrame::e_IntraFrameRequest && userData != NULL)
1063     userData->OnTxIntraFrameRequest(*this);
1064 
1065   return e_ProcessPacket;
1066 }
1067 #else
Internal_OnSendControl(RTP_ControlFrame &,PINDEX &)1068 RTP_Session::SendReceiveStatus RTP_Session::Internal_OnSendControl(RTP_ControlFrame & /*frame*/, PINDEX & /*len*/)
1069 {
1070   rtcpPacketsSent++;
1071   return e_ProcessPacket;
1072 }
1073 #endif
1074 
1075 
OnReceiveData(RTP_DataFrame & frame)1076 RTP_Session::SendReceiveStatus RTP_Session::OnReceiveData(RTP_DataFrame & frame)
1077 {
1078   return EncodingLock(*this)->OnReceiveData(frame);
1079 }
1080 
Internal_OnReceiveData(RTP_DataFrame & frame)1081 RTP_Session::SendReceiveStatus RTP_Session::Internal_OnReceiveData(RTP_DataFrame & frame)
1082 {
1083   // Check that the PDU is the right version
1084   if (frame.GetVersion() != RTP_DataFrame::ProtocolVersion)
1085     return e_IgnorePacket; // Non fatal error, just ignore
1086 
1087   // Check if expected payload type
1088   if (lastReceivedPayloadType == RTP_DataFrame::IllegalPayloadType)
1089     lastReceivedPayloadType = frame.GetPayloadType();
1090 
1091   if (lastReceivedPayloadType != frame.GetPayloadType() && !ignorePayloadTypeChanges) {
1092 
1093     PTRACE(4, "RTP\tSession " << sessionID << ", got payload type "
1094            << frame.GetPayloadType() << ", but was expecting " << lastReceivedPayloadType);
1095     return e_IgnorePacket;
1096   }
1097 
1098   // Check for if a control packet rather than data packet.
1099   if (frame.GetPayloadType() > RTP_DataFrame::MaxPayloadType)
1100     return e_IgnorePacket; // Non fatal error, just ignore
1101 
1102   PTimeInterval tick = PTimer::Tick();  // Get timestamp now
1103 
1104   // Have not got SSRC yet, so grab it now
1105   if (syncSourceIn == 0)
1106     syncSourceIn = frame.GetSyncSource();
1107 
1108   // Check packet sequence numbers
1109   if (packetsReceived == 0) {
1110     firstPacketReceived.SetCurrentTime();
1111     PTRACE(3, "RTP\tSession " << sessionID << ", first receive data:"
1112               " ver=" << frame.GetVersion()
1113            << " pt=" << frame.GetPayloadType()
1114            << " psz=" << frame.GetPayloadSize()
1115            << " m=" << frame.GetMarker()
1116            << " x=" << frame.GetExtension()
1117            << " seq=" << frame.GetSequenceNumber()
1118            << " ts=" << frame.GetTimestamp()
1119            << " src=" << hex << frame.GetSyncSource()
1120            << " ccnt=" << frame.GetContribSrcCount() << dec);
1121 
1122 #if OPAL_RTCP_XR
1123     delete m_metrics; // Should be NULL, but just in case ...
1124     m_metrics = RTCP_XR_Metrics::Create(frame);
1125 #endif
1126 
1127     if ((frame.GetPayloadType() == RTP_DataFrame::T38) &&
1128         (frame.GetSequenceNumber() >= 0x8000) &&
1129          (frame.GetPayloadSize() == 0)) {
1130       PTRACE(4, "RTP\tSession " << sessionID << ", ignoring left over audio packet from switch to T.38");
1131       return e_IgnorePacket; // Non fatal error, just ignore
1132     }
1133 
1134     expectedSequenceNumber = (WORD)(frame.GetSequenceNumber() + 1);
1135   }
1136   else {
1137     if (frame.GetSyncSource() != syncSourceIn) {
1138       if (allowAnySyncSource) {
1139         PTRACE(2, "RTP\tSession " << sessionID << ", SSRC changed from " << hex << frame.GetSyncSource() << " to " << syncSourceIn << dec);
1140         syncSourceIn = frame.GetSyncSource();
1141         allowSequenceChange = true;
1142       }
1143       else if (allowOneSyncSourceChange) {
1144         PTRACE(2, "RTP\tSession " << sessionID << ", allowed one SSRC change from SSRC=" << hex << syncSourceIn << " to =" << dec << frame.GetSyncSource() << dec);
1145         syncSourceIn = frame.GetSyncSource();
1146         allowSequenceChange = true;
1147         allowOneSyncSourceChange = false;
1148       }
1149       else {
1150         PTRACE(2, "RTP\tSession " << sessionID << ", packet from SSRC=" << hex << frame.GetSyncSource() << " ignored, expecting SSRC=" << syncSourceIn << dec);
1151         return e_IgnorePacket; // Non fatal error, just ignore
1152       }
1153     }
1154 
1155     WORD sequenceNumber = frame.GetSequenceNumber();
1156     if (sequenceNumber == expectedSequenceNumber) {
1157       expectedSequenceNumber++;
1158       consecutiveOutOfOrderPackets = 0;
1159 
1160       if (!m_outOfOrderPackets.empty()) {
1161         PTRACE(5, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
1162                << ", received out of order packet " << sequenceNumber);
1163         outOfOrderPacketTime = tick;
1164         packetsOutOfOrder++;
1165       }
1166 
1167       /* For audio we do not do statistics on start of talk burst (marker bit)
1168          as that could be a substantial time and is not really "jitter".
1169          For video we measure jitter between whoile frames which is indicated
1170          by the marker bit being on. */
1171       if (frame.GetMarker() != isAudio) {
1172         DWORD diff = (tick - lastReceivedPacketTime).GetInterval();
1173 
1174         averageReceiveTimeAccum += diff;
1175         if (diff > maximumReceiveTimeAccum)
1176           maximumReceiveTimeAccum = diff;
1177         if (diff < minimumReceiveTimeAccum)
1178           minimumReceiveTimeAccum = diff;
1179         rxStatisticsCount++;
1180 
1181         // As per RFC3550 Appendix 8
1182         diff *= GetJitterTimeUnits(); // Convert to timestamp units
1183         long variance = diff > lastTransitTime ? (diff - lastTransitTime) : (lastTransitTime - diff);
1184         lastTransitTime = diff;
1185         jitterLevel += variance - ((jitterLevel+(1<<(JitterRoundingGuardBits-1))) >> JitterRoundingGuardBits);
1186         if (jitterLevel > maximumJitterLevel)
1187           maximumJitterLevel = jitterLevel;
1188       }
1189 
1190       if (frame.GetMarker())
1191         markerRecvCount++;
1192     }
1193     else if (allowSequenceChange) {
1194       expectedSequenceNumber = (WORD) (sequenceNumber + 1);
1195       allowSequenceChange = false;
1196       m_outOfOrderPackets.clear();
1197       PTRACE(2, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
1198              << ", adjusting sequence numbers to expect " << expectedSequenceNumber);
1199     }
1200     else if (sequenceNumber < expectedSequenceNumber) {
1201 #if OPAL_RTCP_XR
1202       if (m_metrics != NULL) m_metrics->OnPacketDiscarded();
1203 #endif
1204 
1205       // Check for Cisco bug where sequence numbers suddenly start incrementing
1206       // from a different base.
1207       if (++consecutiveOutOfOrderPackets > 10) {
1208         expectedSequenceNumber = (WORD)(sequenceNumber + 1);
1209         PTRACE(2, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
1210                << ", abnormal change of sequence numbers, adjusting to expect " << expectedSequenceNumber);
1211       }
1212       else {
1213         PTRACE(2, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
1214                << ", incorrect sequence, got " << sequenceNumber << " expected " << expectedSequenceNumber);
1215 
1216         if (resequenceOutOfOrderPackets)
1217           return e_IgnorePacket; // Non fatal error, just ignore
1218 
1219         packetsOutOfOrder++;
1220       }
1221     }
1222     else if (resequenceOutOfOrderPackets &&
1223                 (m_outOfOrderPackets.empty() || (tick - outOfOrderPacketTime) < outOfOrderWaitTime)) {
1224       if (m_outOfOrderPackets.empty())
1225         outOfOrderPacketTime = tick;
1226       // Maybe packet lost, maybe out of order, save for now
1227       SaveOutOfOrderPacket(frame);
1228       return e_IgnorePacket;
1229     }
1230     else {
1231       if (!m_outOfOrderPackets.empty()) {
1232         // Give up on the packet, probably never coming in. Save current and switch in
1233         // the lowest numbered packet.
1234         SaveOutOfOrderPacket(frame);
1235 
1236         for (;;) {
1237           if (m_outOfOrderPackets.empty())
1238             return e_IgnorePacket;
1239 
1240           frame = m_outOfOrderPackets.back();
1241           m_outOfOrderPackets.pop_back();
1242 
1243           sequenceNumber = frame.GetSequenceNumber();
1244           if (sequenceNumber >= expectedSequenceNumber)
1245             break;
1246 
1247           PTRACE(2, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
1248                  << ", incorrect sequence after re-ordering, got "
1249                  << sequenceNumber << " expected " << expectedSequenceNumber);
1250         }
1251 
1252         outOfOrderPacketTime = tick;
1253       }
1254 
1255       unsigned dropped = sequenceNumber - expectedSequenceNumber;
1256       packetsLost += dropped;
1257       packetsLostSinceLastRR += dropped;
1258       PTRACE(2, "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn
1259              << ", " << dropped << " packet(s) missing at " << sequenceNumber);
1260       expectedSequenceNumber = (WORD)(sequenceNumber + 1);
1261       consecutiveOutOfOrderPackets = 0;
1262 #if OPAL_RTCP_XR
1263       if (m_metrics != NULL) m_metrics->OnPacketLost(dropped);
1264 #endif
1265     }
1266   }
1267 
1268   lastReceivedPacketTime = tick;
1269 
1270   octetsReceived += frame.GetPayloadSize();
1271   packetsReceived++;
1272 
1273 #if OPAL_RTCP_XR
1274   if (m_metrics != NULL) m_metrics->OnPacketReceived();
1275 #endif
1276 
1277   // Call the statistics call-back on the first PDU with total count == 1
1278   if (packetsReceived == 1 && userData != NULL)
1279     userData->OnRxStatistics(*this);
1280 
1281   if (rxStatisticsCount >= rxStatisticsInterval) {
1282 
1283     rxStatisticsCount = 0;
1284 
1285     averageReceiveTime = averageReceiveTimeAccum/rxStatisticsInterval;
1286     maximumReceiveTime = maximumReceiveTimeAccum;
1287     minimumReceiveTime = minimumReceiveTimeAccum;
1288 
1289     averageReceiveTimeAccum = 0;
1290     maximumReceiveTimeAccum = 0;
1291     minimumReceiveTimeAccum = 0xffffffff;
1292 
1293     PTRACE(4, "RTP\tSession " << sessionID << ", receive statistics:"
1294               " packets=" << packetsReceived <<
1295               " octets=" << octetsReceived <<
1296               " lost=" << packetsLost <<
1297               " tooLate=" << GetPacketsTooLate() <<
1298               " order=" << packetsOutOfOrder <<
1299               " avgTime=" << averageReceiveTime <<
1300               " maxTime=" << maximumReceiveTime <<
1301               " minTime=" << minimumReceiveTime <<
1302               " jitter=" << GetAvgJitterTime() <<
1303               " maxJitter=" << GetMaxJitterTime());
1304 
1305     if (userData != NULL)
1306       userData->OnRxStatistics(*this);
1307   }
1308 
1309   SendReceiveStatus status = e_ProcessPacket;
1310   for (list<FilterNotifier>::iterator filter = m_filters.begin(); filter != m_filters.end(); ++filter)
1311     (*filter)(frame, status);
1312 
1313   return status;
1314 }
1315 
1316 
SaveOutOfOrderPacket(RTP_DataFrame & frame)1317 void RTP_Session::SaveOutOfOrderPacket(RTP_DataFrame & frame)
1318 {
1319   WORD sequenceNumber = frame.GetSequenceNumber();
1320 
1321   PTRACE(m_outOfOrderPackets.empty() ? 2 : 5,
1322          "RTP\tSession " << sessionID << ", ssrc=" << syncSourceIn << ", "
1323          << (m_outOfOrderPackets.empty() ? "first" : "next") << " out of order packet, got "
1324          << sequenceNumber << " expected " << expectedSequenceNumber);
1325 
1326   std::list<RTP_DataFrame>::iterator it;
1327   for (it  = m_outOfOrderPackets.begin(); it != m_outOfOrderPackets.end(); ++it) {
1328     if (sequenceNumber > it->GetSequenceNumber())
1329       break;
1330   }
1331 
1332   m_outOfOrderPackets.insert(it, frame);
1333   frame.MakeUnique();
1334 }
1335 
1336 
InsertReportPacket(RTP_ControlFrame & report)1337 PBoolean RTP_Session::InsertReportPacket(RTP_ControlFrame & report)
1338 {
1339   report.StartNewPacket();
1340 
1341   // No packets sent yet, so only set RR
1342   if (packetsSent == 0) {
1343 
1344     // Send RR as we are not transmitting
1345     report.SetPayloadType(RTP_ControlFrame::e_ReceiverReport);
1346 
1347     // if no packets received, put in an empty report
1348     if (packetsReceived == 0) {
1349       report.SetPayloadSize(sizeof(PUInt32b));  // length is SSRC
1350       report.SetCount(0);
1351 
1352       // add the SSRC to the start of the payload
1353       *(PUInt32b *)report.GetPayloadPtr() = syncSourceOut;
1354     }
1355     else {
1356       report.SetPayloadSize(sizeof(PUInt32b) + sizeof(RTP_ControlFrame::ReceiverReport));  // length is SSRC of packet sender plus RR
1357       report.SetCount(1);
1358       BYTE * payload = report.GetPayloadPtr();
1359 
1360       // add the SSRC to the start of the payload
1361       *(PUInt32b *)payload = syncSourceOut;
1362 
1363       // add the RR after the SSRC
1364       AddReceiverReport(*(RTP_ControlFrame::ReceiverReport *)(payload+sizeof(PUInt32b)));
1365     }
1366   }
1367   else {
1368     // send SR and RR
1369     report.SetPayloadType(RTP_ControlFrame::e_SenderReport);
1370     report.SetPayloadSize(sizeof(PUInt32b) + sizeof(RTP_ControlFrame::SenderReport));  // length is SSRC of packet sender plus SR
1371     report.SetCount(0);
1372     BYTE * payload = report.GetPayloadPtr();
1373 
1374     // add the SSRC to the start of the payload
1375     *(PUInt32b *)payload = syncSourceOut;
1376 
1377     // add the SR after the SSRC
1378     RTP_ControlFrame::SenderReport * sender = (RTP_ControlFrame::SenderReport *)(payload+sizeof(PUInt32b));
1379     PTime now;
1380     sender->ntp_sec  = (DWORD)(now.GetTimeInSeconds()+SecondsFrom1900to1970); // Convert from 1970 to 1900
1381     sender->ntp_frac = now.GetMicrosecond()*4294; // Scale microseconds to "fraction" from 0 to 2^32
1382     sender->rtp_ts   = lastSentTimestamp;
1383     sender->psent    = packetsSent;
1384     sender->osent    = octetsSent;
1385 
1386     PTRACE(3, "RTP\tSession " << sessionID << ", SentSenderReport:"
1387               " ssrc=" << syncSourceOut
1388            << " ntp=" << sender->ntp_sec << '.' << sender->ntp_frac
1389            << " rtp=" << sender->rtp_ts
1390            << " psent=" << sender->psent
1391            << " osent=" << sender->osent);
1392 
1393     if (syncSourceIn != 0) {
1394       report.SetPayloadSize(sizeof(PUInt32b) + sizeof(RTP_ControlFrame::SenderReport) + sizeof(RTP_ControlFrame::ReceiverReport));
1395       report.SetCount(1);
1396       AddReceiverReport(*(RTP_ControlFrame::ReceiverReport *)(payload+sizeof(PUInt32b)+sizeof(RTP_ControlFrame::SenderReport)));
1397     }
1398   }
1399 
1400   report.EndPacket();
1401   return true;
1402 }
1403 
1404 
SendReport(PTimer &,INT)1405 void RTP_Session::SendReport(PTimer&, INT)
1406 {
1407   PWaitAndSignal mutex(m_reportMutex);
1408 
1409   // Have not got anything yet, do nothing
1410   if (packetsSent == 0 && packetsReceived == 0)
1411     return;
1412 
1413   RTP_ControlFrame report;
1414   InsertReportPacket(report);
1415 
1416   // Add the SDES part to compound RTCP packet
1417   PTRACE(3, "RTP\tSession " << sessionID << ", sending SDES: " << canonicalName);
1418   report.StartNewPacket();
1419 
1420   report.SetCount(0); // will be incremented automatically
1421   report.StartSourceDescription  (syncSourceOut);
1422   report.AddSourceDescriptionItem(RTP_ControlFrame::e_CNAME, canonicalName);
1423   report.AddSourceDescriptionItem(RTP_ControlFrame::e_TOOL, toolName);
1424   report.EndPacket();
1425 
1426 #if OPAL_RTCP_XR
1427   //Generate and send RTCP-XR packet
1428   if (m_metrics != NULL) m_metrics->InsertExtendedReportPacket(sessionID, syncSourceOut, m_jitterBuffer, report);
1429 #endif
1430 
1431   WriteControl(report);
1432 }
1433 
1434 
1435 #if OPAL_STATISTICS
GetStatistics(OpalMediaStatistics & statistics,bool receiver) const1436 void RTP_Session::GetStatistics(OpalMediaStatistics & statistics, bool receiver) const
1437 {
1438   statistics.m_totalBytes        = receiver ? GetOctetsReceived()     : GetOctetsSent();
1439   statistics.m_totalPackets      = receiver ? GetPacketsReceived()    : GetPacketsSent();
1440   statistics.m_packetsLost       = receiver ? GetPacketsLost()        : GetPacketsLostByRemote();
1441   statistics.m_packetsOutOfOrder = receiver ? GetPacketsOutOfOrder()  : 0;
1442   statistics.m_packetsTooLate    = receiver ? GetPacketsTooLate()     : 0;
1443   statistics.m_packetOverruns    = receiver ? GetPacketOverruns()     : 0;
1444   statistics.m_minimumPacketTime = receiver ? GetMinimumReceiveTime() : GetMinimumSendTime();
1445   statistics.m_averagePacketTime = receiver ? GetAverageReceiveTime() : GetAverageSendTime();
1446   statistics.m_maximumPacketTime = receiver ? GetMaximumReceiveTime() : GetMaximumSendTime();
1447   statistics.m_averageJitter     = receiver ? GetAvgJitterTime()      : GetJitterTimeOnRemote();
1448   statistics.m_maximumJitter     = receiver ? GetMaxJitterTime()      : 0;
1449   statistics.m_jitterBufferDelay = receiver ? GetJitterBufferDelay()  : 0;
1450 }
1451 #endif
1452 
1453 
1454 RTP_Session::ReceiverReportArray
BuildReceiverReportArray(const RTP_ControlFrame & frame,PINDEX offset)1455 RTP_Session::BuildReceiverReportArray(const RTP_ControlFrame & frame, PINDEX offset)
1456 {
1457   RTP_Session::ReceiverReportArray reports;
1458 
1459   const RTP_ControlFrame::ReceiverReport * rr = (const RTP_ControlFrame::ReceiverReport *)(frame.GetPayloadPtr()+offset);
1460   for (PINDEX repIdx = 0; repIdx < (PINDEX)frame.GetCount(); repIdx++) {
1461     RTP_Session::ReceiverReport * report = new RTP_Session::ReceiverReport;
1462     report->sourceIdentifier = rr->ssrc;
1463     report->fractionLost = rr->fraction;
1464     report->totalLost = rr->GetLostPackets();
1465     report->lastSequenceNumber = rr->last_seq;
1466     report->jitter = rr->jitter;
1467     report->lastTimestamp = (PInt64)(DWORD)rr->lsr;
1468     report->delay = ((PInt64)rr->dlsr << 16)/1000;
1469     reports.SetAt(repIdx, report);
1470 #if OPAL_RTCP_XR
1471     if (m_metrics != NULL) m_metrics->OnRxSenderReport(rr->lsr, rr->dlsr);
1472 #endif
1473     rr++;
1474   }
1475 
1476   return reports;
1477 }
1478 
1479 
OnReceiveControl(RTP_ControlFrame & frame)1480 RTP_Session::SendReceiveStatus RTP_Session::OnReceiveControl(RTP_ControlFrame & frame)
1481 {
1482   do {
1483     BYTE * payload = frame.GetPayloadPtr();
1484     PINDEX size = frame.GetPayloadSize();
1485     if ((payload == NULL) || (size == 0) || ((payload + size) > (frame.GetPointer() + frame.GetSize()))){
1486       /* TODO: 1.shall we test for a maximum size ? Indeed but what's the value ? *
1487                2. what's the correct exit status ? */
1488       PTRACE(2, "RTP\tSession " << sessionID << ", OnReceiveControl invalid frame");
1489       break;
1490     }
1491 
1492     switch (frame.GetPayloadType()) {
1493       case RTP_ControlFrame::e_SenderReport :
1494         if (size >= (PINDEX)(sizeof(PUInt32b)+sizeof(RTP_ControlFrame::SenderReport)+frame.GetCount()*sizeof(RTP_ControlFrame::ReceiverReport))) {
1495           SenderReport sender;
1496           sender.sourceIdentifier = *(const PUInt32b *)payload;
1497           const RTP_ControlFrame::SenderReport & sr = *(const RTP_ControlFrame::SenderReport *)(payload+sizeof(PUInt32b));
1498           sender.realTimestamp = PTime(sr.ntp_sec-SecondsFrom1900to1970, sr.ntp_frac/4294);
1499 
1500           // Save the receive time
1501           lastSRTimestamp = sender.realTimestamp;
1502           lastSRReceiveTime.SetCurrentTime();
1503           senderReportsReceived++;
1504 
1505           sender.rtpTimestamp = sr.rtp_ts;
1506           sender.packetsSent = sr.psent;
1507           sender.octetsSent = sr.osent;
1508 
1509           OnRxSenderReport(sender, BuildReceiverReportArray(frame, sizeof(PUInt32b)+sizeof(RTP_ControlFrame::SenderReport)));
1510         }
1511         else {
1512           PTRACE(2, "RTP\tSession " << sessionID << ", SenderReport packet truncated");
1513         }
1514         break;
1515 
1516       case RTP_ControlFrame::e_ReceiverReport :
1517         if (size >= (PINDEX)(sizeof(PUInt32b)+frame.GetCount()*sizeof(RTP_ControlFrame::ReceiverReport)))
1518           OnRxReceiverReport(*(const PUInt32b *)payload, BuildReceiverReportArray(frame, sizeof(PUInt32b)));
1519         else {
1520           PTRACE(2, "RTP\tSession " << sessionID << ", ReceiverReport packet truncated");
1521         }
1522         break;
1523 
1524       case RTP_ControlFrame::e_SourceDescription :
1525         if (size >= (PINDEX)(frame.GetCount()*sizeof(RTP_ControlFrame::SourceDescription))) {
1526           SourceDescriptionArray descriptions;
1527           const RTP_ControlFrame::SourceDescription * sdes = (const RTP_ControlFrame::SourceDescription *)payload;
1528           PINDEX srcIdx;
1529           for (srcIdx = 0; srcIdx < (PINDEX)frame.GetCount(); srcIdx++) {
1530             descriptions.SetAt(srcIdx, new SourceDescription(sdes->src));
1531             const RTP_ControlFrame::SourceDescription::Item * item = sdes->item;
1532             PINDEX uiSizeCurrent = 0;   /* current size of the items already parsed */
1533             while ((item != NULL) && (item->type != RTP_ControlFrame::e_END)) {
1534               descriptions[srcIdx].items.SetAt(item->type, PString(item->data, item->length));
1535               uiSizeCurrent += item->GetLengthTotal();
1536               PTRACE(4,"RTP\tSession " << sessionID << ", SourceDescription item " << item << ", current size = " << uiSizeCurrent);
1537 
1538               /* avoid reading where GetNextItem() shall not */
1539               if (uiSizeCurrent >= size){
1540                 PTRACE(4,"RTP\tSession " << sessionID << ", SourceDescription end of items");
1541                 item = NULL;
1542                 break;
1543               }
1544 
1545               item = item->GetNextItem();
1546             }
1547 
1548             /* RTP_ControlFrame::e_END doesn't have a length field, so do NOT call item->GetNextItem()
1549                otherwise it reads over the buffer */
1550             if (item == NULL ||
1551                 item->type == RTP_ControlFrame::e_END ||
1552                 (sdes = (const RTP_ControlFrame::SourceDescription *)item->GetNextItem()) == NULL)
1553               break;
1554           }
1555           OnRxSourceDescription(descriptions);
1556         }
1557         else {
1558           PTRACE(2, "RTP\tSession " << sessionID << ", SourceDescription packet truncated");
1559         }
1560         break;
1561 
1562       case RTP_ControlFrame::e_Goodbye :
1563         if (size >= 4) {
1564           PString str;
1565           PINDEX count = frame.GetCount()*4;
1566 
1567           if (size > count) {
1568             if (size >= (PINDEX)(payload[count] + sizeof(DWORD) /*SSRC*/ + sizeof(unsigned char) /* length */))
1569               str = PString((const char *)(payload+count+1), payload[count]);
1570             else {
1571               PTRACE(2, "RTP\tSession " << sessionID << ", Goodbye packet invalid");
1572             }
1573           }
1574 
1575           PDWORDArray sources(count);
1576           for (PINDEX i = 0; i < count; i++)
1577             sources[i] = ((const PUInt32b *)payload)[i];
1578           OnRxGoodbye(sources, str);
1579         }
1580         else {
1581           PTRACE(2, "RTP\tSession " << sessionID << ", Goodbye packet truncated");
1582         }
1583 
1584         if (closeOnBye) {
1585           PTRACE(3, "RTP\tSession " << sessionID << ", Goodbye packet closing transport");
1586           return e_AbortTransport;
1587         }
1588         break;
1589 
1590       case RTP_ControlFrame::e_ApplDefined :
1591         if (size >= 4) {
1592           PString str((const char *)(payload+4), 4);
1593           OnRxApplDefined(str, frame.GetCount(), *(const PUInt32b *)payload,
1594           payload+8, frame.GetPayloadSize()-8);
1595         }
1596         else {
1597           PTRACE(2, "RTP\tSession " << sessionID << ", ApplDefined packet truncated");
1598         }
1599         break;
1600 
1601 #if OPAL_RTCP_XR
1602       case RTP_ControlFrame::e_ExtendedReport :
1603         if (size >= (PINDEX)(sizeof(PUInt32b)+frame.GetCount()*sizeof(RTP_ControlFrame::ExtendedReport)))
1604           OnRxExtendedReport(*(const PUInt32b *)payload, RTCP_XR_Metrics::BuildExtendedReportArray(frame, sizeof(PUInt32b)));
1605         else {
1606           PTRACE(2, "RTP\tSession " << sessionID << ", ReceiverReport packet truncated");
1607         }
1608         break;
1609 #endif
1610 
1611   #if OPAL_VIDEO
1612       case RTP_ControlFrame::e_IntraFrameRequest :
1613         PTRACE(4, "RTP\tSession " << sessionID << ", received RFC2032 FIR");
1614         if(userData != NULL)
1615           userData->OnRxIntraFrameRequest(*this);
1616         break;
1617 
1618       case RTP_ControlFrame::e_PayloadSpecificFeedBack :
1619         switch (frame.GetFbType()) {
1620           case RTP_ControlFrame::e_PictureLossIndication :
1621             PTRACE(4, "RTP\tSession " << sessionID << ", received RFC5104 PLI");
1622             if(userData != NULL)
1623               userData->OnRxIntraFrameRequest(*this);
1624             break;
1625 
1626           case RTP_ControlFrame::e_FullIntraRequest :
1627             PTRACE(4, "RTP\tSession " << sessionID << ", received RFC5104 FIR");
1628             if(userData != NULL)
1629               userData->OnRxIntraFrameRequest(*this);
1630             break;
1631 
1632           default :
1633             PTRACE(2, "RTP\tSession " << sessionID << ", Unknown Payload Specific feedback type: " << frame.GetFbType());
1634         }
1635         break;
1636   #endif
1637 
1638       default :
1639         PTRACE(2, "RTP\tSession " << sessionID << ", Unknown control payload type: " << frame.GetPayloadType());
1640     }
1641   } while (frame.ReadNextPacket());
1642 
1643   return e_ProcessPacket;
1644 }
1645 
1646 
OnRxSenderReport(const SenderReport & PTRACE_PARAM (sender),const ReceiverReportArray & reports)1647 void RTP_Session::OnRxSenderReport(const SenderReport & PTRACE_PARAM(sender), const ReceiverReportArray & reports)
1648 {
1649 #if PTRACING
1650   if (PTrace::CanTrace(3)) {
1651     ostream & strm = PTrace::Begin(3, __FILE__, __LINE__);
1652     strm << "RTP\tSession " << sessionID << ", OnRxSenderReport: " << sender << '\n';
1653     for (PINDEX i = 0; i < reports.GetSize(); i++)
1654       strm << "  RR: " << reports[i] << '\n';
1655     strm << PTrace::End;
1656   }
1657 #endif
1658   OnReceiverReports(reports);
1659 }
1660 
1661 
OnRxReceiverReport(DWORD PTRACE_PARAM (src),const ReceiverReportArray & reports)1662 void RTP_Session::OnRxReceiverReport(DWORD PTRACE_PARAM(src), const ReceiverReportArray & reports)
1663 {
1664 #if PTRACING
1665   if (PTrace::CanTrace(3)) {
1666     ostream & strm = PTrace::Begin(2, __FILE__, __LINE__);
1667     strm << "RTP\tSession " << sessionID << ", OnReceiverReport: ssrc=" << src << '\n';
1668     for (PINDEX i = 0; i < reports.GetSize(); i++)
1669       strm << "  RR: " << reports[i] << '\n';
1670     strm << PTrace::End;
1671   }
1672 #endif
1673   OnReceiverReports(reports);
1674 }
1675 
1676 
OnReceiverReports(const ReceiverReportArray & reports)1677 void RTP_Session::OnReceiverReports(const ReceiverReportArray & reports)
1678 {
1679   for (PINDEX i = 0; i < reports.GetSize(); i++) {
1680     if (reports[i].sourceIdentifier == syncSourceOut) {
1681       packetsLostByRemote = reports[i].totalLost;
1682       jitterLevelOnRemote = reports[i].jitter;
1683       break;
1684     }
1685   }
1686 }
1687 
1688 
OnRxSourceDescription(const SourceDescriptionArray & PTRACE_PARAM (description))1689 void RTP_Session::OnRxSourceDescription(const SourceDescriptionArray & PTRACE_PARAM(description))
1690 {
1691 #if PTRACING
1692   if (PTrace::CanTrace(3)) {
1693     ostream & strm = PTrace::Begin(3, __FILE__, __LINE__);
1694     strm << "RTP\tSession " << sessionID << ", OnSourceDescription: " << description.GetSize() << " entries";
1695     for (PINDEX i = 0; i < description.GetSize(); i++)
1696       strm << "\n  " << description[i];
1697     strm << PTrace::End;
1698   }
1699 #endif
1700 }
1701 
1702 
OnRxGoodbye(const PDWORDArray & PTRACE_PARAM (src),const PString & PTRACE_PARAM (reason))1703 void RTP_Session::OnRxGoodbye(const PDWORDArray & PTRACE_PARAM(src), const PString & PTRACE_PARAM(reason))
1704 {
1705   PTRACE(3, "RTP\tSession " << sessionID << ", OnGoodbye: \"" << reason << "\" srcs=" << src);
1706 }
1707 
1708 
OnRxApplDefined(const PString & PTRACE_PARAM (type),unsigned PTRACE_PARAM (subtype),DWORD PTRACE_PARAM (src),const BYTE *,PINDEX PTRACE_PARAM (size))1709 void RTP_Session::OnRxApplDefined(const PString & PTRACE_PARAM(type),
1710           unsigned PTRACE_PARAM(subtype), DWORD PTRACE_PARAM(src),
1711           const BYTE * /*data*/, PINDEX PTRACE_PARAM(size))
1712 {
1713   PTRACE(3, "RTP\tSession " << sessionID << ", OnApplDefined: \""
1714          << type << "\"-" << subtype << " " << src << " [" << size << ']');
1715 }
1716 
1717 
PrintOn(ostream & strm) const1718 void RTP_Session::ReceiverReport::PrintOn(ostream & strm) const
1719 {
1720   strm << "ssrc=" << sourceIdentifier
1721        << " fraction=" << fractionLost
1722        << " lost=" << totalLost
1723        << " last_seq=" << lastSequenceNumber
1724        << " jitter=" << jitter
1725        << " lsr=" << lastTimestamp
1726        << " dlsr=" << delay;
1727 }
1728 
1729 
PrintOn(ostream & strm) const1730 void RTP_Session::SenderReport::PrintOn(ostream & strm) const
1731 {
1732   strm << "ssrc=" << sourceIdentifier
1733        << " ntp=" << realTimestamp.AsString("yyyy/M/d-h:m:s.uuuu")
1734        << " rtp=" << rtpTimestamp
1735        << " psent=" << packetsSent
1736        << " osent=" << octetsSent;
1737 }
1738 
1739 
PrintOn(ostream & strm) const1740 void RTP_Session::SourceDescription::PrintOn(ostream & strm) const
1741 {
1742   static const char * const DescriptionNames[RTP_ControlFrame::NumDescriptionTypes] = {
1743     "END", "CNAME", "NAME", "EMAIL", "PHONE", "LOC", "TOOL", "NOTE", "PRIV"
1744   };
1745 
1746   strm << "ssrc=" << sourceIdentifier;
1747   for (PINDEX i = 0; i < items.GetSize(); i++) {
1748     strm << "\n  item[" << i << "]: type=";
1749     unsigned typeNum = items.GetKeyAt(i);
1750     if (typeNum < PARRAYSIZE(DescriptionNames))
1751       strm << DescriptionNames[typeNum];
1752     else
1753       strm << typeNum;
1754     strm << " data=\""
1755       << items.GetDataAt(i)
1756       << '"';
1757   }
1758 }
1759 
1760 
GetPacketsTooLate() const1761 DWORD RTP_Session::GetPacketsTooLate() const
1762 {
1763   JitterBufferPtr jitter = m_jitterBuffer; // Increase reference count
1764   return jitter != NULL ? jitter->GetPacketsTooLate() : 0;
1765 }
1766 
1767 
GetPacketOverruns() const1768 DWORD RTP_Session::GetPacketOverruns() const
1769 {
1770   JitterBufferPtr jitter = m_jitterBuffer; // Increase reference count
1771   return jitter != NULL ? jitter->GetBufferOverruns() : 0;
1772 }
1773 
1774 
WriteOOBData(RTP_DataFrame &,bool)1775 PBoolean RTP_Session::WriteOOBData(RTP_DataFrame &, bool)
1776 {
1777   return true;
1778 }
1779 
1780 
AddFilter(const FilterNotifier & filter)1781 void RTP_Session::AddFilter(const FilterNotifier & filter)
1782 {
1783   // ensures that a filter is added only once
1784   if (find(m_filters.begin(), m_filters.end(), filter) == m_filters.end())
1785     m_filters.push_back(filter);
1786 }
1787 
1788 
1789 /////////////////////////////////////////////////////////////////////////////
1790 
SetMinBufferSize(PUDPSocket & sock,int buftype,int bufsz)1791 static void SetMinBufferSize(PUDPSocket & sock, int buftype, int bufsz)
1792 {
1793   int sz = 0;
1794   if (!sock.GetOption(buftype, sz)) {
1795     PTRACE(1, "RTP_UDP\tGetOption(" << sock.GetHandle() << ',' << buftype << ") failed: " << sock.GetErrorText());
1796     return;
1797   }
1798 
1799   // Already big enough
1800   if (sz >= bufsz)
1801     return;
1802 
1803   for (; bufsz >= 1024; bufsz /= 2) {
1804     // Set to new size
1805     if (!sock.SetOption(buftype, bufsz)) {
1806       PTRACE(1, "RTP_UDP\tSetOption(" << sock.GetHandle() << ',' << buftype << ',' << bufsz << ") failed: " << sock.GetErrorText());
1807       continue;
1808     }
1809 
1810     // As some stacks lie about setting the buffer size, we double check.
1811     if (!sock.GetOption(buftype, sz)) {
1812       PTRACE(1, "RTP_UDP\tGetOption(" << sock.GetHandle() << ',' << buftype << ") failed: " << sock.GetErrorText());
1813       return;
1814     }
1815 
1816     if (sz >= bufsz) {
1817       PTRACE(4, "RTP_UDP\tSetOption(" << sock.GetHandle() << ',' << buftype << ',' << bufsz << ") succeeded.");
1818       return;
1819     }
1820 
1821     PTRACE(1, "RTP_UDP\tSetOption(" << sock.GetHandle() << ',' << buftype << ',' << bufsz << ") failed, even though it said it succeeded!");
1822   }
1823 }
1824 
1825 
RTP_UDP(const Params & params)1826 RTP_UDP::RTP_UDP(const Params & params)
1827   : RTP_Session(params),
1828     remoteAddress(0),
1829     remoteTransmitAddress(0),
1830     remoteIsNAT(params.remoteIsNAT)
1831 {
1832   PTRACE(4, "RTP_UDP\tSession " << sessionID << ", created with NAT flag set to " << remoteIsNAT);
1833   remoteDataPort    = 0;
1834   remoteControlPort = 0;
1835   shutdownRead      = false;
1836   shutdownWrite     = false;
1837   dataSocket        = NULL;
1838   controlSocket     = NULL;
1839   appliedQOS        = false;
1840   localHasNAT       = false;
1841   badTransmitCounter = 0;
1842 
1843   timerWriteDataIdle.SetNotifier(PCREATE_NOTIFIER(OnWriteDataIdle));
1844 }
1845 
1846 
~RTP_UDP()1847 RTP_UDP::~RTP_UDP()
1848 {
1849   timerWriteDataIdle.Stop();
1850   Close(true);
1851   Close(false);
1852 
1853   // We need to do this to make sure that the sockets are not
1854   // deleted before select decides there is no more data coming
1855   // over them and exits the reading thread.
1856   SetJitterBufferSize(0, 0);
1857 
1858   delete dataSocket;
1859   delete controlSocket;
1860 }
1861 
1862 
ApplyQOS(const PIPSocket::Address & addr)1863 void RTP_UDP::ApplyQOS(const PIPSocket::Address & addr)
1864 {
1865   if (controlSocket != NULL)
1866     controlSocket->SetSendAddress(addr,GetRemoteControlPort());
1867   if (dataSocket != NULL)
1868     dataSocket->SetSendAddress(addr,GetRemoteDataPort());
1869   appliedQOS = true;
1870 }
1871 
1872 
ModifyQOS(RTP_QOS * rtpqos)1873 PBoolean RTP_UDP::ModifyQOS(RTP_QOS * rtpqos)
1874 {
1875   PBoolean retval = false;
1876 
1877   if (rtpqos == NULL)
1878     return retval;
1879 
1880   if (controlSocket != NULL)
1881     retval = controlSocket->ModifyQoSSpec(&(rtpqos->ctrlQoS));
1882 
1883   if (dataSocket != NULL)
1884     retval &= dataSocket->ModifyQoSSpec(&(rtpqos->dataQoS));
1885 
1886   appliedQOS = false;
1887   return retval;
1888 }
1889 
Open(PIPSocket::Address transportLocalAddress,WORD portBase,WORD portMax,BYTE tos,PNatMethod * natMethod,RTP_QOS * rtpQos)1890 PBoolean RTP_UDP::Open(PIPSocket::Address transportLocalAddress,
1891                    WORD portBase, WORD portMax,
1892                    BYTE tos,
1893                    PNatMethod * natMethod,
1894                    RTP_QOS * rtpQos)
1895 {
1896   PWaitAndSignal mutex(dataMutex);
1897 
1898   m_firstControl = true;
1899 
1900   // save local address
1901   localAddress = transportLocalAddress;
1902 
1903   localDataPort    = (WORD)(portBase&0xfffe);
1904   localControlPort = (WORD)(localDataPort + 1);
1905 
1906   delete dataSocket;
1907   delete controlSocket;
1908   dataSocket = NULL;
1909   controlSocket = NULL;
1910 
1911   byeSent = false;
1912 
1913   PQoS * dataQos = NULL;
1914   PQoS * ctrlQos = NULL;
1915   if (rtpQos != NULL) {
1916     dataQos = &(rtpQos->dataQoS);
1917     ctrlQos = &(rtpQos->ctrlQoS);
1918   }
1919 
1920   // allow for special case of portBase == 0 or portMax == 0, which indicates a shared RTP session
1921   if ((portBase != 0) || (portMax != 0)) {
1922     PIPSocket::Address bindingAddress = localAddress;
1923     if (natMethod != NULL && natMethod->IsAvailable(localAddress)) {
1924       switch (natMethod->GetRTPSupport()) {
1925         case PNatMethod::RTPIfSendMedia :
1926           /* This NAT variant will work if we send something out through the
1927              NAT port to "open" it so packets can then flow inward. We set
1928              this flag to make that happen as soon as we get the remotes IP
1929              address and port to send to.
1930             */
1931           localHasNAT = true;
1932           // Then do case for full cone support and create STUN sockets
1933 
1934         case PNatMethod::RTPSupported :
1935           if (natMethod->CreateSocketPair(dataSocket, controlSocket, localAddress)) {
1936             PTRACE(4, "RTP\tSession " << sessionID << ", " << natMethod->GetName() << " created STUN RTP/RTCP socket pair.");
1937             dataSocket->GetLocalAddress(localAddress, localDataPort);
1938             controlSocket->GetLocalAddress(localAddress, localControlPort);
1939           }
1940           else {
1941             PTRACE(2, "RTP\tSession " << sessionID << ", " << natMethod->GetName()
1942                    << " could not create RTP/RTCP socket pair; trying to create individual sockets.");
1943             if (natMethod->CreateSocket(dataSocket, localAddress) && natMethod->CreateSocket(controlSocket, localAddress)) {
1944               dataSocket->GetLocalAddress(localAddress, localDataPort);
1945               controlSocket->GetLocalAddress(localAddress, localControlPort);
1946             }
1947             else {
1948               delete dataSocket;
1949               delete controlSocket;
1950               dataSocket = NULL;
1951               controlSocket = NULL;
1952               PTRACE(2, "RTP\tSession " << sessionID << ", " << natMethod->GetName()
1953                      << " could not create RTP/RTCP sockets individually either, using normal sockets.");
1954             }
1955           }
1956           break;
1957 
1958         default :
1959           /* We canot use NAT traversal method (e.g. STUN) to create sockets
1960              in the remaining modes as the NAT router will then not let us
1961              talk to the real RTP destination. All we can so is bind to the
1962              local interface the NAT is on and hope the NAT router is doing
1963              something sneaky like symmetric port forwarding. */
1964           PTRACE(2, "RTP\tSession " << sessionID << ", " << natMethod->GetName()
1965                   << " cannot create RTP/RTCP socket pair; creating individual sockets.");
1966           natMethod->GetInterfaceAddress(bindingAddress);
1967           break;
1968       }
1969     }
1970 
1971     if (dataSocket == NULL || controlSocket == NULL) {
1972       dataSocket = new PUDPSocket(dataQos);
1973       controlSocket = new PUDPSocket(ctrlQos);
1974       while (!   dataSocket->Listen(bindingAddress, 1, localDataPort) ||
1975              !controlSocket->Listen(bindingAddress, 1, localControlPort)) {
1976         dataSocket->Close();
1977         controlSocket->Close();
1978         if ((localDataPort > portMax) || (localDataPort > 0xfffd))
1979           return false; // If it ever gets to here the OS has some SERIOUS problems!
1980         localDataPort    += 2;
1981         localControlPort += 2;
1982       }
1983     }
1984 
1985 #   ifndef __BEOS__
1986     // Set the IP Type Of Service field for prioritisation of media UDP packets
1987     // through some Cisco routers and Linux boxes
1988     if (!dataSocket->SetOption(IP_TOS, tos, IPPROTO_IP)) {
1989       PTRACE(1, "RTP_UDP\tSession " << sessionID << ", could not set TOS field in IP header: " << dataSocket->GetErrorText());
1990     }
1991 
1992     // Increase internal buffer size on media UDP sockets
1993     SetMinBufferSize(*dataSocket,    SO_RCVBUF, isAudio ? RTP_AUDIO_RX_BUFFER_SIZE : RTP_VIDEO_RX_BUFFER_SIZE);
1994     SetMinBufferSize(*dataSocket,    SO_SNDBUF, RTP_DATA_TX_BUFFER_SIZE);
1995     SetMinBufferSize(*controlSocket, SO_RCVBUF, RTP_CTRL_BUFFER_SIZE);
1996     SetMinBufferSize(*controlSocket, SO_SNDBUF, RTP_CTRL_BUFFER_SIZE);
1997 #   endif
1998   }
1999 
2000   shutdownRead = false;
2001   shutdownWrite = false;
2002 
2003   if (canonicalName.Find('@') == P_MAX_INDEX)
2004     canonicalName += '@' + GetLocalHostName();
2005 
2006   PTRACE(3, "RTP_UDP\tSession " << sessionID << " created: "
2007          << localAddress << ':' << localDataPort << '-' << localControlPort
2008          << " ssrc=" << syncSourceOut);
2009 
2010   m_reportTimer.RunContinuous(m_reportTimer.GetResetTime());
2011   return true;
2012 }
2013 
2014 
Reopen(PBoolean reading)2015 void RTP_UDP::Reopen(PBoolean reading)
2016 {
2017   PWaitAndSignal mutex(dataMutex);
2018 
2019   if (reading) {
2020     if (!shutdownRead)
2021       return;
2022     shutdownRead = false;
2023   }
2024   else {
2025     if (!shutdownWrite)
2026       return;
2027     shutdownWrite = false;
2028   }
2029 
2030   badTransmitCounter = 0;
2031   m_reportTimer.RunContinuous(m_reportTimer.GetResetTime());
2032 
2033   PTRACE(3, "RTP_UDP\tSession " << sessionID << " reopened for " << (reading ? "reading" : "writing"));
2034 }
2035 
2036 
Close(PBoolean reading)2037 bool RTP_UDP::Close(PBoolean reading)
2038 {
2039   if (reading) {
2040     {
2041       PWaitAndSignal mutex(dataMutex);
2042 
2043       if (shutdownRead) {
2044         PTRACE(4, "RTP_UDP\tSession " << sessionID << ", read already shut down .");
2045         return false;
2046       }
2047 
2048       PTRACE(3, "RTP_UDP\tSession " << sessionID << ", Shutting down read.");
2049 
2050       syncSourceIn = 0;
2051       shutdownRead = true;
2052 
2053       if (dataSocket != NULL && controlSocket != NULL) {
2054         PIPSocket::Address addr;
2055         WORD port;
2056         controlSocket->PUDPSocket::GetLocalAddress(addr, port);
2057         if (addr.IsAny())
2058           PIPSocket::GetHostAddress(addr);
2059         dataSocket->WriteTo("", 1, addr, port);
2060       }
2061     }
2062 
2063     SetJitterBufferSize(0, 0); // Kill jitter buffer too, but outside mutex
2064   }
2065   else {
2066     if (shutdownWrite) {
2067       PTRACE(4, "RTP_UDP\tSession " << sessionID << ", write already shut down .");
2068       return false;
2069     }
2070 
2071     PTRACE(3, "RTP_UDP\tSession " << sessionID << ", shutting down write.");
2072     shutdownWrite = true;
2073   }
2074 
2075   if (shutdownRead && shutdownWrite)
2076     m_reportTimer.Stop(false);
2077 
2078   return true;
2079 }
2080 
2081 
GetLocalHostName()2082 PString RTP_UDP::GetLocalHostName()
2083 {
2084   return PIPSocket::GetHostName();
2085 }
2086 
2087 
SetRemoteSocketInfo(PIPSocket::Address address,WORD port,PBoolean isDataPort)2088 PBoolean RTP_UDP::SetRemoteSocketInfo(PIPSocket::Address address, WORD port, PBoolean isDataPort)
2089 {
2090   if (remoteIsNAT) {
2091     PTRACE(2, "RTP_UDP\tSession " << sessionID << ", ignoring remote socket info as remote is behind NAT");
2092     return true;
2093   }
2094 
2095   if (!PAssert(address.IsValid() && port != 0,PInvalidParameter))
2096     return false;
2097 
2098   PTRACE(3, "RTP_UDP\tSession " << sessionID << ", SetRemoteSocketInfo: "
2099          << (isDataPort ? "data" : "control") << " channel, "
2100             "new=" << address << ':' << port << ", "
2101             "local=" << localAddress << ':' << localDataPort << '-' << localControlPort << ", "
2102             "remote=" << remoteAddress << ':' << remoteDataPort << '-' << remoteControlPort);
2103 
2104   if (localAddress == address && remoteAddress == address && (isDataPort ? localDataPort : localControlPort) == port)
2105     return true;
2106 
2107   remoteAddress = address;
2108 
2109   allowOneSyncSourceChange = true;
2110   allowRemoteTransmitAddressChange = true;
2111   allowSequenceChange = packetsReceived != 0;
2112 
2113   if (isDataPort) {
2114     remoteDataPort = port;
2115     if (remoteControlPort == 0 || allowRemoteTransmitAddressChange)
2116       remoteControlPort = (WORD)(port + 1);
2117   }
2118   else {
2119     remoteControlPort = port;
2120     if (remoteDataPort == 0 || allowRemoteTransmitAddressChange)
2121       remoteDataPort = (WORD)(port - 1);
2122   }
2123 
2124   if (!appliedQOS)
2125       ApplyQOS(remoteAddress);
2126 
2127   if (localHasNAT) {
2128     // If have Port Restricted NAT on local host then send a datagram
2129     // to remote to open up the port in the firewall for return data.
2130     static const BYTE dummy[1] = { 0 };
2131     WriteDataOrControlPDU(dummy, sizeof(dummy), true);
2132     WriteDataOrControlPDU(dummy, sizeof(dummy), false);
2133     PTRACE(2, "RTP_UDP\tSession " << sessionID << ", sending empty datagrams to open local Port Restricted NAT");
2134   }
2135 
2136   return true;
2137 }
2138 
2139 
ReadData(RTP_DataFrame & frame)2140 PBoolean RTP_UDP::ReadData(RTP_DataFrame & frame)
2141 {
2142   return EncodingLock(*this)->ReadData(frame);
2143 }
2144 
Internal_ReadData(RTP_DataFrame & frame)2145 PBoolean RTP_UDP::Internal_ReadData(RTP_DataFrame & frame)
2146 {
2147   SendReceiveStatus receiveStatus = e_IgnorePacket;
2148   while (receiveStatus == e_IgnorePacket) {
2149     if (shutdownRead || PAssertNULL(dataSocket) == NULL || PAssertNULL(controlSocket) == NULL)
2150       return false;
2151 
2152     int selectStatus = WaitForPDU(*dataSocket, *controlSocket, PMaxTimeInterval);
2153     if (shutdownRead)
2154       return false;
2155 
2156     if (selectStatus > 0) {
2157       PTRACE(1, "RTP_UDP\tSession " << sessionID << ", Select error: "
2158               << PChannel::GetErrorText((PChannel::Errors)selectStatus));
2159       return false;
2160     }
2161 
2162     if (selectStatus == 0)
2163       receiveStatus = OnReadTimeout(frame);
2164 
2165     if ((-selectStatus & 2) != 0) {
2166       if (ReadControlPDU() == e_AbortTransport)
2167         return false;
2168     }
2169 
2170     if ((-selectStatus & 1) != 0)
2171       receiveStatus = ReadDataPDU(frame);
2172   }
2173 
2174   return receiveStatus == e_ProcessPacket;
2175 }
2176 
2177 
FlushData()2178 void RTP_UDP::FlushData()
2179 {
2180   if (shutdownRead || dataSocket == NULL)
2181     return;
2182 
2183   PTimeInterval oldTimeout = dataSocket->GetReadTimeout();
2184   dataSocket->SetReadTimeout(0);
2185 
2186   PINDEX count = 0;
2187   BYTE buffer[2000];
2188   while (dataSocket->Read(buffer, sizeof(buffer)))
2189     ++count;
2190 
2191   dataSocket->SetReadTimeout(oldTimeout);
2192 
2193   PTRACE_IF(3, count > 0, "RTP_UDP\tSession " << sessionID << ", flushed "
2194             << count << " RTP data packets before activating jitter buffer");
2195 }
2196 
2197 
WaitForPDU(PUDPSocket & dataSocket,PUDPSocket & controlSocket,const PTimeInterval & timeout)2198 int RTP_UDP::WaitForPDU(PUDPSocket & dataSocket, PUDPSocket & controlSocket, const PTimeInterval & timeout)
2199 {
2200   return EncodingLock(*this)->WaitForPDU(dataSocket, controlSocket, timeout);
2201 }
2202 
Internal_WaitForPDU(PUDPSocket & dataSocket,PUDPSocket & controlSocket,const PTimeInterval & timeout)2203 int RTP_UDP::Internal_WaitForPDU(PUDPSocket & dataSocket, PUDPSocket & controlSocket, const PTimeInterval & timeout)
2204 {
2205   return PSocket::Select(dataSocket, controlSocket, timeout);
2206 }
2207 
ReadDataOrControlPDU(BYTE * framePtr,PINDEX frameSize,PBoolean fromDataChannel)2208 RTP_Session::SendReceiveStatus RTP_UDP::ReadDataOrControlPDU(BYTE * framePtr,
2209                                                              PINDEX frameSize,
2210                                                              PBoolean fromDataChannel)
2211 {
2212 #if PTRACING
2213   const char * channelName = fromDataChannel ? "Data" : "Control";
2214 #endif
2215   PUDPSocket & socket = *(fromDataChannel ? dataSocket : controlSocket);
2216   PIPSocket::Address addr;
2217   WORD port;
2218 
2219   if (socket.ReadFrom(framePtr, frameSize, addr, port)) {
2220     // If remote address never set from higher levels, then try and figure
2221     // it out from the first packet received.
2222     if (!remoteAddress.IsValid()) {
2223       remoteAddress = addr;
2224       PTRACE(4, "RTP\tSession " << sessionID << ", set remote address from first "
2225              << channelName << " PDU from " << addr << ':' << port);
2226     }
2227     if (fromDataChannel) {
2228       if (remoteDataPort == 0)
2229         remoteDataPort = port;
2230     }
2231     else {
2232       if (remoteControlPort == 0)
2233         remoteControlPort = port;
2234     }
2235 
2236     if (!remoteTransmitAddress.IsValid())
2237       remoteTransmitAddress = addr;
2238     else if (allowRemoteTransmitAddressChange && remoteAddress == addr) {
2239       remoteTransmitAddress = addr;
2240       allowRemoteTransmitAddressChange = false;
2241     }
2242     else if (remoteTransmitAddress != addr && !allowRemoteTransmitAddressChange) {
2243       PTRACE(2, "RTP_UDP\tSession " << sessionID << ", "
2244              << channelName << " PDU from incorrect host, "
2245                 " is " << addr << " should be " << remoteTransmitAddress);
2246       return RTP_Session::e_IgnorePacket;
2247     }
2248 
2249     if (remoteAddress.IsValid() && !appliedQOS)
2250       ApplyQOS(remoteAddress);
2251 
2252     badTransmitCounter = 0;
2253 
2254     return RTP_Session::e_ProcessPacket;
2255   }
2256 
2257   switch (socket.GetErrorNumber(PChannel::LastReadError)) {
2258     case ECONNRESET :
2259     case ECONNREFUSED :
2260       PTRACE(2, "RTP_UDP\tSession " << sessionID << ", " << channelName << " port on remote not ready.");
2261       if (++badTransmitCounter == 1)
2262         badTransmitStart = PTime();
2263       else {
2264         if (badTransmitCounter < 5 || (PTime()- badTransmitStart).GetSeconds() < BAD_TRANSMIT_TIME_MAX)
2265           return RTP_Session::e_IgnorePacket;
2266         PTRACE(2, "RTP_UDP\tSession " << sessionID << ", " << channelName << " " << BAD_TRANSMIT_TIME_MAX << " seconds of transmit fails - informing connection");
2267         userData->SessionFailing(*this);
2268       }
2269       return RTP_Session::e_IgnorePacket;
2270 
2271     case EMSGSIZE :
2272       PTRACE(2, "RTP_UDP\tSession " << sessionID << ", " << channelName
2273              << " read packet too large for buffer of " << frameSize << " bytes.");
2274       return RTP_Session::e_IgnorePacket;
2275 
2276     case EAGAIN :
2277       PTRACE(4, "RTP_UDP\tSession " << sessionID << ", " << channelName
2278              << " read packet interrupted.");
2279       // Shouldn't happen, but it does.
2280       return RTP_Session::e_IgnorePacket;
2281 
2282     case 0 :
2283       PTRACE(4, "RTP_UDP\tSession " << sessionID << ", " << channelName
2284              << " received UDP packet with no payload.");
2285       return e_IgnorePacket;
2286 
2287     default:
2288       PTRACE(1, "RTP_UDP\tSession " << sessionID << ", " << channelName
2289              << " read error (" << socket.GetErrorNumber(PChannel::LastReadError) << "): "
2290              << socket.GetErrorText(PChannel::LastReadError));
2291       return RTP_Session::e_AbortTransport;
2292   }
2293 }
2294 
2295 
ReadDataPDU(RTP_DataFrame & frame)2296 RTP_Session::SendReceiveStatus RTP_UDP::ReadDataPDU(RTP_DataFrame & frame)
2297 {
2298   return EncodingLock(*this)->ReadDataPDU(frame);
2299 }
2300 
Internal_ReadDataPDU(RTP_DataFrame & frame)2301 RTP_Session::SendReceiveStatus RTP_UDP::Internal_ReadDataPDU(RTP_DataFrame & frame)
2302 {
2303   SendReceiveStatus status = ReadDataOrControlPDU(frame.GetPointer(), frame.GetSize(), true);
2304   if (status != e_ProcessPacket)
2305     return status;
2306 
2307   // Check received PDU is big enough
2308   if (frame.SetPacketSize(dataSocket->GetLastReadCount()))
2309     return OnReceiveData(frame);
2310   return e_IgnorePacket;
2311 }
2312 
2313 
WriteDataPDU(RTP_DataFrame & frame)2314 bool RTP_UDP::WriteDataPDU(RTP_DataFrame & frame)
2315 {
2316   if (!EncodingLock(*this)->WriteDataPDU(frame))
2317       return false;
2318 
2319   PWaitAndSignal mutex(dataMutex);
2320   EncodingLock(*this)->SetWriteDataIdleTimer(timerWriteDataIdle);
2321 
2322   return true;
2323 }
2324 
2325 
OnReadTimeout(RTP_DataFrame & frame)2326 RTP_Session::SendReceiveStatus RTP_UDP::OnReadTimeout(RTP_DataFrame & frame)
2327 {
2328   return EncodingLock(*this)->OnReadTimeout(frame);
2329 }
2330 
Internal_OnReadTimeout(RTP_DataFrame &)2331 RTP_Session::SendReceiveStatus RTP_UDP::Internal_OnReadTimeout(RTP_DataFrame & /*frame*/)
2332 {
2333   return e_IgnorePacket;
2334 }
2335 
2336 
ReadControlPDU()2337 RTP_Session::SendReceiveStatus RTP_UDP::ReadControlPDU()
2338 {
2339   RTP_ControlFrame frame(2048);
2340 
2341   SendReceiveStatus status = ReadDataOrControlPDU(frame.GetPointer(), frame.GetSize(), false);
2342   if (status != e_ProcessPacket)
2343     return status;
2344 
2345   PINDEX pduSize = controlSocket->GetLastReadCount();
2346   if (pduSize < 4 || pduSize < 4+frame.GetPayloadSize()) {
2347     PTRACE_IF(2, pduSize != 1 || !m_firstControl, "RTP_UDP\tSession " << sessionID
2348               << ", Received control packet too small: " << pduSize << " bytes");
2349     return e_IgnorePacket;
2350   }
2351 
2352   m_firstControl = false;
2353   frame.SetSize(pduSize);
2354   return OnReceiveControl(frame);
2355 }
2356 
2357 
WriteOOBData(RTP_DataFrame & frame,bool rewriteTimeStamp)2358 PBoolean RTP_UDP::WriteOOBData(RTP_DataFrame & frame, bool rewriteTimeStamp)
2359 {
2360   PWaitAndSignal m(dataMutex);
2361 
2362   // set timestamp offset if not already set
2363   // otherwise offset timestamp
2364   if (!oobTimeStampBaseEstablished) {
2365     oobTimeStampBaseEstablished = true;
2366     oobTimeStampBase            = PTimer::Tick();
2367     if (rewriteTimeStamp)
2368       oobTimeStampOutBase = PRandom::Number();
2369     else
2370       oobTimeStampOutBase = frame.GetTimestamp();
2371   }
2372 
2373   // set new timestamp
2374   if (rewriteTimeStamp)
2375     frame.SetTimestamp(oobTimeStampOutBase + ((PTimer::Tick() - oobTimeStampBase).GetInterval() * 8));
2376 
2377   // write the data
2378   return EncodingLock(*this)->WriteData(frame, true);
2379 }
2380 
WriteData(RTP_DataFrame & frame)2381 PBoolean RTP_UDP::WriteData(RTP_DataFrame & frame)
2382 {
2383   return EncodingLock(*this)->WriteData(frame, false);
2384 }
2385 
2386 
Internal_WriteData(RTP_DataFrame & frame)2387 PBoolean RTP_UDP::Internal_WriteData(RTP_DataFrame & frame)
2388 {
2389   if (shutdownWrite || dataSocket == NULL) {
2390     PTRACE(3, "RTP_UDP\tSession " << sessionID << ", write shutdown.");
2391     return false;
2392   }
2393 
2394   // Trying to send a PDU before we are set up!
2395   if (!remoteAddress.IsValid() || remoteDataPort == 0)
2396     return true;
2397 
2398   switch (OnSendData(frame)) {
2399     case e_ProcessPacket :
2400       break;
2401     case e_IgnorePacket :
2402       return true;
2403     case e_AbortTransport :
2404       return false;
2405   }
2406 
2407   return WriteDataPDU(frame);
2408 }
2409 
2410 
OnWriteDataIdle(PTimer &,INT)2411 void RTP_UDP::OnWriteDataIdle(PTimer &, INT)
2412 {
2413   {
2414     PWaitAndSignal mutex(dataMutex);
2415     if (shutdownWrite) {
2416       PTRACE(3, "RTP_UDP\tSession " << sessionID << ", write shutdown.");
2417       return;
2418     }
2419   }
2420 
2421   // Trying to send a PDU before we are set up!
2422   if (!remoteAddress.IsValid() || remoteDataPort == 0)
2423     return;
2424 
2425   EncodingLock(*this)->OnWriteDataIdle();
2426 
2427   PWaitAndSignal mutex(dataMutex);
2428   EncodingLock(*this)->SetWriteDataIdleTimer(timerWriteDataIdle);
2429 }
2430 
2431 
SetEncoding(const PString & newEncoding)2432 void RTP_UDP::SetEncoding(const PString & newEncoding)
2433 {
2434   dataMutex.Wait();
2435   timerWriteDataIdle.Stop(false);
2436   dataMutex.Signal();
2437 
2438   RTP_Session::SetEncoding(newEncoding);
2439 }
2440 
2441 
WriteControl(RTP_ControlFrame & frame)2442 PBoolean RTP_UDP::WriteControl(RTP_ControlFrame & frame)
2443 {
2444   // Trying to send a PDU before we are set up!
2445   if (!remoteAddress.IsValid() || remoteControlPort == 0 || controlSocket == NULL)
2446     return true;
2447 
2448   PINDEX len = frame.GetCompoundSize();
2449   switch (OnSendControl(frame, len)) {
2450     case e_ProcessPacket :
2451       break;
2452     case e_IgnorePacket :
2453       return true;
2454     case e_AbortTransport :
2455       return false;
2456   }
2457 
2458   return WriteDataOrControlPDU(frame.GetPointer(), len, false);
2459 }
2460 
2461 
WriteDataOrControlPDU(const BYTE * framePtr,PINDEX frameSize,bool toDataChannel)2462 bool RTP_UDP::WriteDataOrControlPDU(const BYTE * framePtr, PINDEX frameSize, bool toDataChannel)
2463 {
2464   PUDPSocket & socket = *(toDataChannel ? dataSocket : controlSocket);
2465   WORD port = toDataChannel ? remoteDataPort : remoteControlPort;
2466   int retry = 0;
2467 
2468   while (!socket.WriteTo(framePtr, frameSize, remoteAddress, port)) {
2469     switch (socket.GetErrorNumber()) {
2470       case ECONNRESET :
2471       case ECONNREFUSED :
2472         break;
2473 
2474       default:
2475         PTRACE(1, "RTP_UDP\tSession " << sessionID
2476                << ", write (" << frameSize << " bytes) error on "
2477                << (toDataChannel ? "data" : "control") << " port ("
2478                << socket.GetErrorNumber(PChannel::LastWriteError) << "): "
2479                << socket.GetErrorText(PChannel::LastWriteError));
2480         return false;
2481     }
2482 
2483     if (++retry >= 10)
2484       break;
2485   }
2486 
2487   PTRACE_IF(2, retry > 0, "RTP_UDP\tSession " << sessionID << ", " << (toDataChannel ? "data" : "control")
2488             << " port on remote not ready " << retry << " time" << (retry > 1 ? "s" : "")
2489             << (retry < 10 ? "" : ", data never sent"));
2490   return true;
2491 }
2492 
2493 
SendIntraFrameRequest(bool rfc2032,bool pictureLoss)2494 void RTP_Session::SendIntraFrameRequest(bool rfc2032, bool pictureLoss)
2495 {
2496   PTRACE(3, "RTP\tSession " << sessionID << ", SendIntraFrameRequest using "
2497          << (rfc2032 ? "RFC2032" : (pictureLoss ? "RFC4585 PLI" : "RFC5104 FIR")));
2498 
2499   // Create packet
2500   RTP_ControlFrame request;
2501   InsertReportPacket(request);
2502 
2503   request.StartNewPacket();
2504 
2505   if (rfc2032) {
2506     // Create packet
2507     request.SetPayloadType(RTP_ControlFrame::e_IntraFrameRequest);
2508     request.SetPayloadSize(4);
2509     // Insert SSRC
2510     request.SetCount(1);
2511     BYTE * payload = request.GetPayloadPtr();
2512     *(PUInt32b *)payload = syncSourceOut;
2513   }
2514   else {
2515     request.SetPayloadType(RTP_ControlFrame::e_PayloadSpecificFeedBack);
2516     if (pictureLoss)
2517       request.SetFbType(RTP_ControlFrame::e_PictureLossIndication, 0);
2518     else {
2519       request.SetFbType(RTP_ControlFrame::e_FullIntraRequest, sizeof(RTP_ControlFrame::FbFIR));
2520       RTP_ControlFrame::FbFIR * fir = (RTP_ControlFrame::FbFIR *)request.GetPayloadPtr();
2521       fir->requestSSRC = syncSourceIn;
2522     }
2523     RTP_ControlFrame::FbFCI * fci = (RTP_ControlFrame::FbFCI *)request.GetPayloadPtr();
2524     fci->senderSSRC = syncSourceOut;
2525   }
2526 
2527   // Send it
2528   request.EndPacket();
2529   WriteControl(request);
2530 }
2531 
2532 
SendTemporalSpatialTradeOff(unsigned tradeOff)2533 void RTP_Session::SendTemporalSpatialTradeOff(unsigned tradeOff)
2534 {
2535   PTRACE(3, "RTP\tSession " << sessionID << ", SendTemporalSpatialTradeOff " << tradeOff);
2536 
2537   RTP_ControlFrame request;
2538   InsertReportPacket(request);
2539 
2540   request.StartNewPacket();
2541   request.SetPayloadType(RTP_ControlFrame::e_PayloadSpecificFeedBack);
2542   request.SetFbType(RTP_ControlFrame::e_TemporalSpatialTradeOffRequest, sizeof(RTP_ControlFrame::FbTSTO));
2543   RTP_ControlFrame::FbTSTO * tsto = (RTP_ControlFrame::FbTSTO *)request.GetPayloadPtr();
2544   tsto->requestSSRC = syncSourceIn;
2545   tsto->tradeOff = (BYTE)tradeOff;
2546 
2547   // Send it
2548   request.EndPacket();
2549   WriteControl(request);
2550 }
2551 
2552 
SetEncoding(const PString & newEncoding)2553 void RTP_Session::SetEncoding(const PString & newEncoding)
2554 {
2555   {
2556     PWaitAndSignal m(m_encodingMutex);
2557 
2558     if (newEncoding == m_encoding)
2559       return;
2560 
2561     RTP_Encoding * newHandler = PFactory<RTP_Encoding>::CreateInstance(newEncoding);
2562     if (newHandler == NULL) {
2563       PTRACE(2, "RTP\tUnable to identify new RTP format '" << newEncoding << "' - retaining old format '" << m_encoding << "'");
2564       return;
2565     }
2566 
2567     if (m_encodingHandler != NULL) {
2568       --m_encodingHandler->refCount;
2569       if (m_encodingHandler->refCount == 0)
2570         delete m_encodingHandler;
2571       m_encodingHandler = NULL;
2572     }
2573 
2574     PTRACE_IF(2, !m_encoding.IsEmpty(), "RTP\tChanged RTP session format from '" << m_encoding << "' to '" << newEncoding << "'");
2575 
2576     m_encoding  = newEncoding;
2577     m_encodingHandler = newHandler;
2578   }
2579 
2580   ClearStatistics();
2581 
2582   EncodingLock(*this)->OnStart(*this);
2583 }
2584 
2585 /////////////////////////////////////////////////////////////////////////////
2586 
EncodingLock(RTP_Session & _session)2587 RTP_Session::EncodingLock::EncodingLock(RTP_Session & _session)
2588   : session(_session)
2589 {
2590   PWaitAndSignal m(session.m_encodingMutex);
2591 
2592   m_encodingHandler = session.m_encodingHandler;
2593   ++m_encodingHandler->refCount;
2594 }
2595 
~EncodingLock()2596 RTP_Session::EncodingLock::~EncodingLock()
2597 {
2598   PWaitAndSignal m(session.m_encodingMutex);
2599 
2600   --m_encodingHandler->refCount;
2601   if (m_encodingHandler->refCount == 0)
2602     delete m_encodingHandler;
2603 }
2604 
2605 
2606 /////////////////////////////////////////////////////////////////////////////
2607 
RTP_Encoding()2608 RTP_Encoding::RTP_Encoding()
2609 {
2610   refCount = 1;
2611 }
2612 
~RTP_Encoding()2613 RTP_Encoding::~RTP_Encoding()
2614 {
2615   OnFinish();
2616 }
2617 
2618 
OnStart(RTP_Session & _rtpSession)2619 void RTP_Encoding::OnStart(RTP_Session & _rtpSession)
2620 {
2621   //rtpSession = &_rtpSession;
2622   rtpUDP = (RTP_UDP *)&_rtpSession;
2623 }
2624 
OnFinish()2625 void RTP_Encoding::OnFinish()
2626 {
2627 }
2628 
OnSendData(RTP_DataFrame & frame)2629 RTP_Session::SendReceiveStatus RTP_Encoding::OnSendData(RTP_DataFrame & frame)
2630 {
2631   return rtpUDP->Internal_OnSendData(frame);
2632 }
2633 
WriteData(RTP_DataFrame & frame,bool)2634 PBoolean RTP_Encoding::WriteData(RTP_DataFrame & frame, bool)
2635 {
2636   return rtpUDP->Internal_WriteData(frame);
2637 }
2638 
OnSendControl(RTP_ControlFrame & frame,PINDEX & len)2639 RTP_Session::SendReceiveStatus RTP_Encoding::OnSendControl(RTP_ControlFrame & frame, PINDEX & len)
2640 {
2641   return rtpUDP->Internal_OnSendControl(frame, len);
2642 }
2643 
WriteDataPDU(RTP_DataFrame & frame)2644 bool RTP_Encoding::WriteDataPDU(RTP_DataFrame & frame)
2645 {
2646   return rtpUDP->WriteDataOrControlPDU(frame.GetPointer(), frame.GetHeaderSize()+frame.GetPayloadSize(), true);
2647 }
2648 
ReadDataPDU(RTP_DataFrame & frame)2649 RTP_Session::SendReceiveStatus RTP_Encoding::ReadDataPDU(RTP_DataFrame & frame)
2650 {
2651   return rtpUDP->Internal_ReadDataPDU(frame);
2652 }
2653 
OnReceiveData(RTP_DataFrame & frame)2654 RTP_Session::SendReceiveStatus RTP_Encoding::OnReceiveData(RTP_DataFrame & frame)
2655 {
2656   return rtpUDP->Internal_OnReceiveData(frame);
2657 }
2658 
OnReadTimeout(RTP_DataFrame & frame)2659 RTP_Session::SendReceiveStatus RTP_Encoding::OnReadTimeout(RTP_DataFrame & frame)
2660 {
2661   return rtpUDP->Internal_OnReadTimeout(frame);
2662 }
2663 
ReadData(RTP_DataFrame & frame)2664 PBoolean RTP_Encoding::ReadData(RTP_DataFrame & frame)
2665 {
2666   return rtpUDP->Internal_ReadData(frame);
2667 }
2668 
WaitForPDU(PUDPSocket & dataSocket,PUDPSocket & controlSocket,const PTimeInterval & t)2669 int RTP_Encoding::WaitForPDU(PUDPSocket & dataSocket, PUDPSocket & controlSocket, const PTimeInterval & t)
2670 {
2671   return rtpUDP->Internal_WaitForPDU(dataSocket, controlSocket, t);
2672 }
2673 
2674 /////////////////////////////////////////////////////////////////////////////
2675 
SecureRTP_UDP(const Params & params)2676 SecureRTP_UDP::SecureRTP_UDP(const Params & params)
2677   : RTP_UDP(params)
2678 {
2679   securityParms = NULL;
2680 }
2681 
~SecureRTP_UDP()2682 SecureRTP_UDP::~SecureRTP_UDP()
2683 {
2684   delete securityParms;
2685 }
2686 
SetSecurityMode(OpalSecurityMode * newParms)2687 void SecureRTP_UDP::SetSecurityMode(OpalSecurityMode * newParms)
2688 {
2689   if (securityParms != NULL)
2690     delete securityParms;
2691   securityParms = newParms;
2692 }
2693 
GetSecurityParms() const2694 OpalSecurityMode * SecureRTP_UDP::GetSecurityParms() const
2695 {
2696   return securityParms;
2697 }
2698 
2699 /////////////////////////////////////////////////////////////////////////////
2700