1 // Copyright (C) 2001-2015 Federico Montesino Pouzols <fedemp@altern.org>.
2 //
3 // This program is free software; you can redistribute it and/or modify
4 // it under the terms of the GNU General Public License as published by
5 // the Free Software Foundation; either version 2 of the License, or
6 // (at your option) any later version.
7 //
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 // GNU General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public License
14 // along with GNU ccRTP.  If not, see <http://www.gnu.org/licenses/>.
15 //
16 // As a special exception, you may use this file as part of a free software
17 // library without restriction.  Specifically, if other files instantiate
18 // templates or use macros or inline functions from this file, or you compile
19 // this file and link it with other files to produce an executable, this
20 // file does not by itself cause the resulting executable to be covered by
21 // the GNU General Public License.  This exception does not however
22 // invalidate any other reasons why the executable file might be covered by
23 // the GNU General Public License.
24 //
25 // This exception applies only to the code released under the name GNU
26 // ccRTP.  If you copy code from other releases into a copy of GNU
27 // ccRTP, as the General Public License permits, the exception does
28 // not apply to the code that you add in this way.  To avoid misleading
29 // anyone as to the status of such modified files, you must delete
30 // this exception notice from them.
31 //
32 // If you write modifications of your own for GNU ccRTP, it is your choice
33 // whether to permit this exception to apply to your modifications.
34 // If you do not wish that, delete this exception notice.
35 //
36 
37 /**
38  * @file iqueue.h
39  *
40  * @short Generic RTP input queues.
41  **/
42 
43 #ifndef CCXX_RTP_IQUEUE_H_
44 #define CCXX_RTP_IQUEUE_H_
45 
46 #include <ccrtp/queuebase.h>
47 #include <ccrtp/CryptoContext.h>
48 
49 #include <list>
50 
51 NAMESPACE_COMMONCPP
52 
53 /**
54  * @defgroup iqueue Generic RTP input queues.
55  * @{
56  **/
57 
58 /**
59  * @class Members rtp.h
60  * @short members and senders accounting
61  *
62  * Records the number of members as well as active senders. For now,
63  * it is too simple.
64  *
65  * @author Federico Montesino Pouzols <fedemp@altern.org>
66  **/
67 class __EXPORT Members
68 {
69 public:
70     inline void
setMembersCount(uint32 n)71     setMembersCount(uint32 n)
72     { members = n; }
73 
74     inline void
increaseMembersCount()75     increaseMembersCount()
76     { members++; }
77 
78     inline void
decreaseMembersCount()79     decreaseMembersCount()
80     { members--; }
81 
82     inline uint32
getMembersCount()83     getMembersCount() const
84     { return members; }
85 
86     inline void
setSendersCount(uint32 n)87     setSendersCount(uint32 n)
88     { activeSenders = n; }
89 
90     inline void
increaseSendersCount()91     increaseSendersCount()
92     { activeSenders++; }
93 
94     inline void
decreaseSendersCount()95     decreaseSendersCount()
96     { activeSenders--; }
97 
98     inline uint32
getSendersCount()99     getSendersCount() const
100     { return activeSenders; }
101 
102 protected:
Members()103     Members() :
104         members(0),
105         activeSenders(0)
106     { }
107 
~Members()108     inline virtual ~Members()
109     { }
110 
111 private:
112     /// number of identified members
113     uint32 members;
114     /// number of identified members that currently are active senders
115     uint32 activeSenders;
116 };
117 
118 /**
119  * @class SyncSourceHandler
120  * @short SyncSource objects modification methods.
121  *
122  * @author Federico Montesino Pouzols <fedemp@altern.org>
123  **/
124 class __EXPORT SyncSourceHandler
125 {
126 public:
127     /**
128      * This requires SyncSource - SyncSourceHandler friendship.
129      *
130      * Get the SyncSourceLink corresponding to a SyncSource
131      * object.
132      **/
133     inline void*
getLink(const SyncSource & source)134     getLink(const SyncSource& source) const
135     { return source.getLink(); }
136 
137     inline void
setLink(SyncSource & source,void * link)138     setLink(SyncSource& source, void* link)
139     { source.setLink(link); }
140 
141     inline void
setParticipant(SyncSource & source,Participant & p)142     setParticipant(SyncSource& source, Participant& p)
143     { source.setParticipant(p); }
144 
145     inline void
setState(SyncSource & source,SyncSource::State ns)146     setState(SyncSource& source, SyncSource::State ns)
147     { source.setState(ns); }
148 
149     inline void
setSender(SyncSource & source,bool active)150     setSender(SyncSource& source, bool active)
151     { source.setSender(active); }
152 
153     inline void
setDataTransportPort(SyncSource & source,tpport_t p)154     setDataTransportPort(SyncSource& source, tpport_t p)
155     { source.setDataTransportPort(p); }
156 
157     inline void
setControlTransportPort(SyncSource & source,tpport_t p)158     setControlTransportPort(SyncSource& source, tpport_t p)
159     { source.setControlTransportPort(p); }
160 
161     inline void
setNetworkAddress(SyncSource & source,InetAddress addr)162     setNetworkAddress(SyncSource& source, InetAddress addr)
163     { source.setNetworkAddress(addr); }
164 
165 protected:
SyncSourceHandler()166     SyncSourceHandler()
167     { }
168 
~SyncSourceHandler()169     inline virtual ~SyncSourceHandler()
170     { }
171 };
172 
173 /**
174  * @class ParticipantHandler
175  * @short Participant objects modification methods.
176  *
177  * @author Federico Montesino Pouzols <fedemp@altern.org>
178  **/
179 class __EXPORT ParticipantHandler
180 {
181 public:
182     inline void
setSDESItem(Participant * part,SDESItemType item,const std::string & val)183     setSDESItem(Participant* part, SDESItemType item,
184             const std::string& val)
185     { part->setSDESItem(item,val); }
186 
187     inline void
setPRIVPrefix(Participant * part,const std::string val)188     setPRIVPrefix(Participant* part, const std::string val)
189     { part->setPRIVPrefix(val); }
190 
191 protected:
ParticipantHandler()192     ParticipantHandler()
193     { }
194 
~ParticipantHandler()195     inline virtual ~ParticipantHandler()
196     { }
197 };
198 
199 /**
200  * @class ApplicationHandler
201  * @short Application objects modification methods.
202  *
203  * @author Federico Montesino Pouzols <fedemp@altern.org>
204  **/
205 class __EXPORT ApplicationHandler
206 {
207 public:
208     inline void
addParticipant(RTPApplication & app,Participant & part)209     addParticipant(RTPApplication& app, Participant& part)
210     { app.addParticipant(part); }
211 
212     inline void
removeParticipant(RTPApplication & app,RTPApplication::ParticipantLink * pl)213     removeParticipant(RTPApplication& app,
214               RTPApplication::ParticipantLink* pl)
215     { app.removeParticipant(pl); }
216 
217 protected:
ApplicationHandler()218     ApplicationHandler()
219     { }
220 
~ApplicationHandler()221     inline virtual ~ApplicationHandler()
222     { }
223 };
224 
225 /**
226  * @class ConflictHandler
227  * @short To track addresses of sources conflicting with the local
228  * one.
229  *
230  * @author Federico Montesino Pouzols <fedemp@altern.org>
231  **/
232 class __EXPORT ConflictHandler
233 {
234 public:
235     struct ConflictingTransportAddress
236     {
237         ConflictingTransportAddress(InetAddress na,
238                         tpport_t dtp, tpport_t ctp);
239 
setNextConflictingTransportAddress240         void setNext(ConflictingTransportAddress* nc)
241         { next = nc; }
242 
getNetworkAddressConflictingTransportAddress243         inline const InetAddress& getNetworkAddress( ) const
244         { return networkAddress; }
245 
getDataTransportPortConflictingTransportAddress246         inline tpport_t getDataTransportPort() const
247         { return dataTransportPort; }
248 
getControlTransportPortConflictingTransportAddress249         inline tpport_t getControlTransportPort() const
250         { return controlTransportPort; }
251 
252         InetAddress networkAddress;
253         tpport_t dataTransportPort;
254         tpport_t controlTransportPort;
255         ConflictingTransportAddress* next;
256         // arrival time of last data or control packet.
257         timeval lastPacketTime;
258     };
259 
260     /**
261      * @param na Inet network address.
262      * @param dtp Data transport port.
263      **/
264     ConflictingTransportAddress* searchDataConflict(InetAddress na,
265                             tpport_t dtp);
266     /**
267      * @param na Inet network address.
268      * @param ctp Data transport port.
269      **/
270     ConflictingTransportAddress* searchControlConflict(InetAddress na,
271                                tpport_t ctp);
272 
updateConflict(ConflictingTransportAddress & ca)273     void updateConflict(ConflictingTransportAddress& ca)
274     { SysTime::gettimeofday(&(ca.lastPacketTime),NULL); }
275 
276     void addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp);
277 
278 protected:
ConflictHandler()279     ConflictHandler()
280     { firstConflict = lastConflict = NULL; }
281 
~ConflictHandler()282     inline virtual ~ConflictHandler()
283     { }
284 
285     ConflictingTransportAddress* firstConflict, * lastConflict;
286 };
287 
288 /**
289  * @class MembershipBookkeeping
290  * @short Controls the group membership in the current session.
291  *
292  * For now, this class implements only a hash table of members, but
293  * its design and relation with other classes is intented to support
294  * group membership sampling in case scalability problems arise.
295  *
296  * @author Federico Montesino Pouzols <fedemp@altern.org>
297  */
298 class __EXPORT MembershipBookkeeping :
299     public SyncSourceHandler,
300     public ParticipantHandler,
301     public ApplicationHandler,
302     public ConflictHandler,
303     private Members
304 {
305 public:
getDefaultMembersHashSize()306     inline size_t getDefaultMembersHashSize()
307     { return defaultMembersHashSize; }
308 
309 protected:
310 
311     /**
312      * @short The initial size is a hint to allocate the resources
313      * needed in order to keep the members' identifiers and
314      * associated information.
315      *
316      * Although ccRTP will reallocate resources when it becomes
317      * necessary, a good hint may save a lot of unpredictable time
318      * penalties.
319      *
320      * @param initialSize an estimation of how many participants
321      * the session will consist of.
322      *
323      */
324     MembershipBookkeeping(uint32 initialSize = defaultMembersHashSize);
325 
326     /**
327      * Purges all RTPSource structures created during the session,
328      * as well as the hash table and the list of sources.
329      **/
330     inline virtual
~MembershipBookkeeping()331     ~MembershipBookkeeping()
332     { endMembers(); }
333 
334     struct SyncSourceLink;
335 
getLink(const SyncSource & source)336     inline SyncSourceLink* getLink(const SyncSource& source) const
337     { return static_cast<SyncSourceLink*>(SyncSourceHandler::getLink(source)); }
338     /**
339      * Get whether a synchronization source is recorded in this
340      * membership controller.
341      **/
isMine(const SyncSource & source)342     inline bool isMine(const SyncSource& source) const
343     { return getLink(source)->getMembership() == this; }
344 
345     /**
346      * @struct IncomingRTPPktLink
347      *
348      * @short Incoming RTP data packets control structure within
349      * the incoming packet queue class.
350      **/
351     struct IncomingRTPPktLink
352     {
IncomingRTPPktLinkIncomingRTPPktLink353         IncomingRTPPktLink(IncomingRTPPkt* pkt, SyncSourceLink* sLink,
354                    struct timeval& recv_ts,
355                    uint32 shifted_ts,
356                    IncomingRTPPktLink* sp,
357                    IncomingRTPPktLink* sn,
358                    IncomingRTPPktLink* p,
359                    IncomingRTPPktLink* n) :
360             packet(pkt),
361             sourceLink(sLink),
362             prev(p), next(n),
363             srcPrev(sp), srcNext(sn),
364             receptionTime(recv_ts),
365             shiftedTimestamp(shifted_ts)
366         { }
367 
~IncomingRTPPktLinkIncomingRTPPktLink368         ~IncomingRTPPktLink()
369         { }
370 
getSourceLinkIncomingRTPPktLink371         inline SyncSourceLink* getSourceLink() const
372         { return sourceLink; }
373 
setSourceLinkIncomingRTPPktLink374         inline void setSourceLink(SyncSourceLink* src)
375         { sourceLink = src; }
376 
getNextIncomingRTPPktLink377         inline IncomingRTPPktLink* getNext() const
378         { return next; }
379 
setNextIncomingRTPPktLink380         inline void setNext(IncomingRTPPktLink* nl)
381         { next = nl; }
382 
getPrevIncomingRTPPktLink383         inline IncomingRTPPktLink* getPrev() const
384         { return prev; }
385 
setPrevIncomingRTPPktLink386         inline void setPrev(IncomingRTPPktLink* pl)
387         { prev = pl; }
388 
getSrcNextIncomingRTPPktLink389         inline IncomingRTPPktLink* getSrcNext() const
390         { return srcNext; }
391 
setSrcNextIncomingRTPPktLink392         inline void setSrcNext(IncomingRTPPktLink* sn)
393         { srcNext = sn; }
394 
getSrcPrevIncomingRTPPktLink395         inline IncomingRTPPktLink* getSrcPrev() const
396         { return srcPrev; }
397 
setSrcPrevIncomingRTPPktLink398         inline void setSrcPrev(IncomingRTPPktLink* sp)
399         { srcPrev = sp; }
400 
getPacketIncomingRTPPktLink401         inline IncomingRTPPkt* getPacket() const
402         { return packet; }
403 
setPacketIncomingRTPPktLink404         inline void setPacket(IncomingRTPPkt* pkt)
405         { packet = pkt; }
406 
407         /**
408          * Set the time this packet was received at.
409          *
410          * @param t time of reception.
411          * @note this has almost nothing to do with the 32-bit
412          * timestamp contained in the packet header.
413          **/
setRecvTimeIncomingRTPPktLink414         inline void setRecvTime(const timeval &t)
415         { receptionTime = t; }
416 
417         /**
418          * Get the time this packet was received at.
419          **/
getRecvTimeIncomingRTPPktLink420         inline timeval getRecvTime() const
421         { return receptionTime; }
422 
423         /**
424          * Get timestamp of this packet. The timestamp of
425          * incoming packets is filtered so that the timestamp
426          * this method provides for the first packet received
427          * from every source starts from 0.
428          *
429          * @return 32 bit timestamp starting from 0 for each source.
430          */
getTimestampIncomingRTPPktLink431         inline uint32 getTimestamp() const
432         { return shiftedTimestamp; }
433 
setTimestampIncomingRTPPktLink434         inline void setTimestamp(uint32 ts)
435         { shiftedTimestamp = ts;}
436 
437         // the packet this link refers to.
438         IncomingRTPPkt* packet;
439         // the synchronization source this packet comes from.
440         SyncSourceLink* sourceLink;
441         // global incoming packet queue links.
442         IncomingRTPPktLink* prev, * next;
443         // source specific incoming packet queue links.
444         IncomingRTPPktLink* srcPrev, * srcNext;
445         // time this packet was received at
446         struct timeval receptionTime;
447         // timestamp of the packet in host order and after
448         // substracting the initial timestamp for its source
449         // (it is an increment from the initial timestamp).
450         uint32 shiftedTimestamp;
451     };
452 
453     /**
454      * @struct SyncSourceLink
455      *
456      * @short Synchronization Source internal handler within the
457      * incoming packets queue.
458      *
459      * Incoming packets queue objects hold a hash table and a
460      * linked list of synchronization sources. For each of these
461      * sources, there is also a linked list of incoming rtp
462      * packets, which are linked in an "all incoming packets" list
463      * as well. SyncSourceLink objects hold the necessary data to
464      * maintain these data estructures, as well as source specific
465      * information and statistics for RTCP,
466      *
467      * @author Federico Montesino Pouzols <fedemp@altern.org>
468      **/
469     struct SyncSourceLink
470     {
471         // 2^16
472         static const uint32 SEQNUMMOD;
473 
474         SyncSourceLink(MembershipBookkeeping* m,
475                    SyncSource* s,
476                    IncomingRTPPktLink* fp = NULL,
477                    IncomingRTPPktLink* lp = NULL,
478                    SyncSourceLink* ps = NULL,
479                    SyncSourceLink* ns = NULL,
480                    SyncSourceLink* ncollis = NULL) :
membershipSyncSourceLink481             membership(m), source(s), first(fp), last(lp),
482             prev(ps), next(ns), nextCollis(ncollis),
483             prevConflict(NULL)
484         { m->setLink(*s,this); // record that the source is associated
485           initStats();         // to this link.
486         }
487 
488         /**
489          * Note it deletes the source.
490          **/
491         ~SyncSourceLink();
492 
getMembershipSyncSourceLink493         inline MembershipBookkeeping* getMembership()
494         { return membership; }
495 
496         /**
497          * Get the synchronization source object this link
498          * objet holds information for.
499          **/
getSourceSyncSourceLink500         inline SyncSource* getSource() { return source; }
501 
502         /**
503          * Get first RTP (data) packet in the queue of packets
504          * received from this socket.
505          **/
getFirstSyncSourceLink506         inline IncomingRTPPktLink* getFirst()
507         { return first; }
508 
setFirstSyncSourceLink509         inline void setFirst(IncomingRTPPktLink* fp)
510         { first = fp; }
511 
512         /**
513          * Get last RTP (data) packet in the queue of packets
514          * received from this socket.
515          **/
getLastSyncSourceLink516         inline IncomingRTPPktLink* getLast()
517         { return last; }
518 
setLastSyncSourceLink519         inline void setLast(IncomingRTPPktLink* lp)
520         { last = lp; }
521 
522         /**
523          * Get the link object for the previous RTP source.
524          **/
getPrevSyncSourceLink525         inline SyncSourceLink* getPrev()
526         { return prev; }
527 
setPrevSyncSourceLink528         inline void setPrev(SyncSourceLink* ps)
529         { prev = ps; }
530 
531         /**
532          * Get the link object for the next RTP source.
533          **/
getNextSyncSourceLink534         inline SyncSourceLink* getNext()
535         { return next; }
536 
setNextSyncSourceLink537         inline void setNext(SyncSourceLink *ns)
538         { next = ns; }
539 
540         /**
541          * Get the link object for the next RTP source in the
542          * hash table entry collision list.  Note that
543          * collision does not refer to SSRC collision, but
544          * hash table collision.
545          **/
getNextCollisSyncSourceLink546         inline SyncSourceLink* getNextCollis()
547         { return nextCollis; }
548 
setNextCollisSyncSourceLink549         inline void setNextCollis(SyncSourceLink* ns)
550         { nextCollis = ns; }
551 
getPrevConflictSyncSourceLink552         inline ConflictingTransportAddress* getPrevConflict() const
553         { return prevConflict; }
554 
555         /**
556          * Get conflicting address.
557          **/
558         void setPrevConflict(InetAddress& addr, tpport_t dataPort,
559                      tpport_t controlPort);
560 
getSenderInfoSyncSourceLink561         unsigned char* getSenderInfo()
562         { return senderInfo; }
563 
564         void setSenderInfo(unsigned char* si);
565 
getReceiverInfoSyncSourceLink566         unsigned char* getReceiverInfo()
567         { return receiverInfo; }
568 
569         void setReceiverInfo(unsigned char* ri);
570 
getLastPacketTimeSyncSourceLink571         inline timeval getLastPacketTime() const
572         { return lastPacketTime; }
573 
getLastRTCPPacketTimeSyncSourceLink574         inline timeval getLastRTCPPacketTime() const
575         { return lastRTCPPacketTime; }
576 
getLastRTCPSRTimeSyncSourceLink577         inline timeval getLastRTCPSRTime() const
578         { return lastRTCPSRTime; }
579 
580         /**
581          * Get the total number of RTP packets received from this
582          * source.
583          */
getObservedPacketCountSyncSourceLink584         inline uint32 getObservedPacketCount() const
585         { return obsPacketCount; }
586 
incObservedPacketCountSyncSourceLink587         inline void incObservedPacketCount()
588         { obsPacketCount++; }
589 
590         /**
591          * Get the total number of payload octets received from this
592          * source.
593          **/
getObservedOctetCountSyncSourceLink594         inline uint32 getObservedOctetCount() const
595         { return obsOctetCount; }
596 
incObservedOctetCountSyncSourceLink597         inline void incObservedOctetCount(uint32 n)
598         { obsOctetCount += n; }
599 
600         /**
601          * Get the highest valid sequence number received.
602          **/
603         uint16
getMaxSeqNumSyncSourceLink604         getMaxSeqNum() const
605         { return maxSeqNum; }
606 
607         /**
608          * Set the highest valid sequence number recived.
609          * @param max Sequence number.
610          **/
611         void
setMaxSeqNumSyncSourceLink612         setMaxSeqNum(uint16 max)
613         { maxSeqNum = max; }
614 
615         inline uint32
getExtendedMaxSeqNumSyncSourceLink616         getExtendedMaxSeqNum() const
617         { return extendedMaxSeqNum; }
618 
619         inline void
setExtendedMaxSeqNumSyncSourceLink620         setExtendedMaxSeqNum(uint32 seq)
621         { extendedMaxSeqNum = seq; }
622 
getCumulativePacketLostSyncSourceLink623         inline uint32 getCumulativePacketLost() const
624         { return cumulativePacketLost; }
625 
setCumulativePacketLostSyncSourceLink626         inline void setCumulativePacketLost(uint32 pl)
627         { cumulativePacketLost = pl; }
628 
getFractionLostSyncSourceLink629         inline uint8 getFractionLost() const
630         { return fractionLost; }
631 
setFractionLostSyncSourceLink632         inline void setFractionLost(uint8 fl)
633         { fractionLost = fl; }
634 
getLastPacketTransitTimeSyncSourceLink635         inline uint32 getLastPacketTransitTime()
636         { return lastPacketTransitTime; }
637 
setLastPacketTransitTimeSyncSourceLink638         inline void setLastPacketTransitTime(uint32 time)
639         { lastPacketTransitTime = time; }
640 
getJitterSyncSourceLink641         inline float getJitter() const
642         { return jitter; }
643 
setJitterSyncSourceLink644         inline void setJitter(float j)
645         { jitter = j; }
646 
getInitialDataTimestampSyncSourceLink647         inline uint32 getInitialDataTimestamp() const
648         { return initialDataTimestamp; }
649 
setInitialDataTimestampSyncSourceLink650         inline void setInitialDataTimestamp(uint32 ts)
651         { initialDataTimestamp = ts; }
652 
getInitialDataTimeSyncSourceLink653         inline timeval getInitialDataTime() const
654         { return initialDataTime; }
655 
setInitialDataTimeSyncSourceLink656         inline void setInitialDataTime(timeval it)
657         { initialDataTime = it; }
658 
659         /**
660          * Mark this source as having sent a BYE control packet.
661          *
662          * @return whether some packet from this source had
663          * been received before (getHello() has been called at
664          * least once)
665          **/
getGoodbyeSyncSourceLink666         bool getGoodbye()
667         {
668             if(!flag)
669                 return false;
670             flag = false;
671             return true;
672         }
673 
674         /**
675          * Mark this source as having sent some packet.
676          *
677          * @return whether no packet from this source had been
678          * received before
679          **/
getHelloSyncSourceLink680         bool getHello() {
681             if(flag)
682                 return false;
683             flag = true;
684             return true;
685         }
686 
getBadSeqNumSyncSourceLink687         inline uint32 getBadSeqNum() const
688         { return badSeqNum; }
689 
setBadSeqNumSyncSourceLink690         inline void setBadSeqNum(uint32 seq)
691         { badSeqNum = seq; }
692 
getProbationSyncSourceLink693         uint8 getProbation() const
694         { return probation; }
695 
setProbationSyncSourceLink696         inline void setProbation(uint8 p)
697         { probation = p; }
698 
decProbationSyncSourceLink699         inline void decProbation()
700         { --probation; }
701 
isValidSyncSourceLink702         bool isValid() const
703         { return 0 == probation; }
704 
getBaseSeqNumSyncSourceLink705         inline uint16 getBaseSeqNum() const
706         { return baseSeqNum; }
707 
setBaseSeqNumSyncSourceLink708         inline void setBaseSeqNum(uint16 seqnum)
709         { baseSeqNum = seqnum; }
710 
getSeqNumAccumSyncSourceLink711         inline uint32 getSeqNumAccum() const
712         { return seqNumAccum; }
713 
incSeqNumAccumSyncSourceLink714         inline void incSeqNumAccum()
715         { seqNumAccum += SEQNUMMOD; }
716 
717         /**
718          * Start a new sequence of received packets.
719          **/
initSequenceSyncSourceLink720         inline void initSequence(uint16 seqnum)
721         { maxSeqNum = seqNumAccum = seqnum; }
722 
723         /**
724          * Record the insertion of an RTP packet from this
725          * source into the scheduled reception queue. All
726          * received packets should be registered with
727          * recordReception(), but only those actually inserted
728          * into the queue should be registered via this
729          * method.
730          *
731          * @param pl Link structure for packet inserted into the queue.
732          **/
733         void recordInsertion(const IncomingRTPPktLink& pl);
734 
735         void initStats();
736 
737         /**
738          * Compute cumulative packet lost and fraction of
739          * packets lost during the last reporting interval.
740          **/
741         void computeStats();
742 
743         MembershipBookkeeping* membership;
744         // The source this link object refers to.
745         SyncSource* source;
746         // first/last packets from this source in the queue.
747         IncomingRTPPktLink* first, * last;
748         // Links for synchronization sources located before
749         // and after this one in the list of sources.
750         SyncSourceLink* prev, * next;
751         // Prev and next inside the hash table collision list.
752         SyncSourceLink* nextCollis;
753         ConflictingTransportAddress* prevConflict;
754         unsigned char* senderInfo;
755         unsigned char* receiverInfo;
756         // time the last RTP packet from this source was
757         // received at.
758         timeval lastPacketTime;
759         // time the last RTCP packet was received.
760         timeval lastRTCPPacketTime;
761         // time the lasrt RTCP SR was received. Required for
762         // DLSR computation.
763         timeval lastRTCPSRTime;
764 
765         // for outgoing RR reports.
766         // number of packets received from this source.
767         uint32 obsPacketCount;
768         // number of octets received from this source.
769         uint32 obsOctetCount;
770         // the higher sequence number seen from this source
771         uint16 maxSeqNum;
772         uint32 extendedMaxSeqNum;
773         uint32 cumulativePacketLost;
774         uint8 fractionLost;
775         // for interarrivel jitter computation
776         uint32 lastPacketTransitTime;
777         // interarrival jitter of packets from this source.
778         float jitter;
779         uint32 initialDataTimestamp;
780         timeval initialDataTime;
781 
782         // this flag assures we only call one gotHello and one
783         // gotGoodbye for this src.
784         bool flag;
785 
786         // for source validation:
787         uint32 badSeqNum;
788         uint8 probation;  // packets in sequence before valid.
789         uint16 baseSeqNum;
790         uint32 expectedPrior;
791         uint32 receivedPrior;
792         uint32 seqNumAccum;
793     };
794 
795     /**
796      * Returns whether there is already a synchronizacion source
797      * with "ssrc" SSRC identifier.
798      **/
799     bool
800     isRegistered(uint32 ssrc);
801 
802     /**
803      * Get the description of a source by its <code>ssrc</code> identifier.
804      *
805      * @param ssrc SSRC identifier, in host order.
806      * @param created whether a new source has been created.
807      * @return Pointer to the SyncSource object identified by
808      * <code>ssrc</code>.
809      */
810     SyncSourceLink*
811     getSourceBySSRC(uint32 ssrc, bool& created);
812 
813     /**
814      * Mark the source identified by <code>ssrc</code> as having
815      * sent a BYE packet. It is not deleted until a timeout
816      * expires, so that in case some packets from this source
817      * arrive a bit later the source is not inserted again in the
818      * table of known sources.
819      *
820      * @return true if the source had been previously identified.
821      * false if it was not in the table of known sources.
822      **/
823     bool
824     BYESource(uint32 ssrc);
825 
826     /**
827      * Remove the description of the source identified by
828      * <code>ssrc</code>
829      *
830      * @return whether the source has been actually removed or it
831      * did not exist.
832      */
833     bool
834     removeSource(uint32 ssrc);
835 
getFirst()836     inline SyncSourceLink* getFirst()
837     { return first; }
838 
getLast()839     inline SyncSourceLink* getLast()
840     { return last; }
841 
842     inline uint32
getMembersCount()843     getMembersCount()
844     { return Members::getMembersCount(); }
845 
846     inline void
setMembersCount(uint32 n)847     setMembersCount(uint32 n)
848     { Members::setMembersCount(n); }
849 
850     inline uint32
getSendersCount()851     getSendersCount()
852     { return Members::getSendersCount(); }
853 
854     static const size_t defaultMembersHashSize;
855     static const uint32 SEQNUMMOD;
856 
857 private:
858     MembershipBookkeeping(const MembershipBookkeeping &o);
859 
860     MembershipBookkeeping&
861     operator=(const MembershipBookkeeping &o);
862 
863     /**
864      * Purge all RTPSource structures, the hash table and the list
865      * of sources.
866      **/
867     void
868     endMembers();
869 
870     // Hash table with sources of RTP and RTCP packets
871     uint32 sourceBucketsNum;
872     SyncSourceLink** sourceLinks;
873     // List of sources, ordered from older to newer
874     SyncSourceLink* first, * last;
875 };
876 
877 /**
878  * @class IncomingDataQueue
879  * @short Queue for incoming RTP data packets in an RTP session.
880  *
881  * @author Federico Montesino Pouzols <fedemp@altern.org>
882  **/
883 class __EXPORT IncomingDataQueue: public IncomingDataQueueBase,
884     protected MembershipBookkeeping
885 {
886 public:
887     /**
888      * @class SyncSourcesIterator
889      * @short iterator through the list of synchronizations
890      * sources in this session
891      **/
892     class SyncSourcesIterator
893     {
894     public:
895         typedef std::forward_iterator_tag iterator_category;
896         typedef SyncSource value_type;
897         typedef std::ptrdiff_t difference_type;
898         typedef const SyncSource* pointer;
899         typedef const SyncSource& reference;
900 
901         SyncSourcesIterator(SyncSourceLink* l = NULL) :
link(l)902             link(l)
903         { }
904 
SyncSourcesIterator(const SyncSourcesIterator & si)905         SyncSourcesIterator(const SyncSourcesIterator& si) :
906             link(si.link)
907         { }
908 
909         reference operator*() const
910         { return *(link->getSource()); }
911 
912         pointer operator->() const
913         { return link->getSource(); }
914 
915         SyncSourcesIterator& operator++() {
916             link = link->getNext();
917             return *this;
918         }
919 
920         SyncSourcesIterator operator++(int) {
921             SyncSourcesIterator result(*this);
922             ++(*this);
923             return result;
924         }
925 
926         friend bool operator==(const SyncSourcesIterator& l,
927                        const SyncSourcesIterator& r)
928         { return l.link == r.link; }
929 
930         friend bool operator!=(const SyncSourcesIterator& l,
931                        const SyncSourcesIterator& r)
932         { return l.link != r.link; }
933 
934     private:
935         SyncSourceLink *link;
936     };
937 
begin()938     SyncSourcesIterator begin()
939     { return SyncSourcesIterator(MembershipBookkeeping::getFirst()); }
940 
end()941     SyncSourcesIterator end()
942     { return SyncSourcesIterator(NULL); }
943 
944     /**
945      * Retreive data from a specific timestamped packet if such a
946      * packet is currently available in the receive buffer.
947      *
948      * @param stamp Data unit timestamp.
949      * @param src Optional synchronization source selector.
950      * @return data retrieved from the reception buffer.
951      * @retval null pointer if no packet with such timestamp is available.
952      **/
953     const AppDataUnit*
954     getData(uint32 stamp, const SyncSource* src = NULL);
955 
956 
957     /**
958      * Determine if packets are waiting in the reception queue.
959      *
960      * @param src Optional synchronization source selector.
961      * @return True if packets are waiting.
962      */
963     bool
964     isWaiting(const SyncSource* src = NULL) const;
965 
966     /**
967      * Get timestamp of first packet waiting in the queue.
968      *
969      * @param src optional source selector.
970      * @return timestamp of first arrival packet.
971      **/
972     uint32
973     getFirstTimestamp(const SyncSource* src = NULL) const;
974 
975     /**
976      * When receiving packets from a new source, it may be
977      * convenient to reject a first few packets before we are
978      * really sure the source is valid. This method sets how many
979      * data packets must be received in sequence before the source
980      * is considered valid and the stack starts to accept its
981      * packets.
982      *
983      * @note the default (see defaultMinValidPacketSequence())
984      * value for this parameter is 0, so that no packets are
985      * rejected (data packets are accepted from the first one).
986      *
987      * @note this validation is performed after the generic header
988      * validation and the additional validation done in
989      * onRTPPacketRecv().
990      *
991      * @note if any valid RTCP packet is received from this
992      * source, it will be immediatly considered valid regardless
993      * of the number of sequential data packets received.
994      *
995      * @param packets number of sequential packet required
996      **/
997     void
setMinValidPacketSequence(uint8 packets)998     setMinValidPacketSequence(uint8 packets)
999     { minValidPacketSequence = packets; }
1000 
1001     uint8
getDefaultMinValidPacketSequence()1002     getDefaultMinValidPacketSequence() const
1003     { return defaultMinValidPacketSequence; }
1004 
1005     /**
1006      * Get the minimun number of consecutive packets that must be
1007      * received from a source before accepting its data packets.
1008      **/
1009     uint8
getMinValidPacketSequence()1010     getMinValidPacketSequence() const
1011     { return minValidPacketSequence; }
1012 
1013     void
setMaxPacketMisorder(uint16 packets)1014     setMaxPacketMisorder(uint16 packets)
1015     { maxPacketMisorder = packets; }
1016 
1017     uint16
getDefaultMaxPacketMisorder()1018     getDefaultMaxPacketMisorder() const
1019     { return defaultMaxPacketMisorder; }
1020 
1021     uint16
getMaxPacketMisorder()1022     getMaxPacketMisorder() const
1023     { return maxPacketMisorder; }
1024 
1025     /**
1026      *
1027      * It also prevents packets sent after a restart of the source
1028      * being immediately accepted.
1029      **/
1030     void
setMaxPacketDropout(uint16 packets)1031     setMaxPacketDropout(uint16 packets) // default: 3000.
1032     { maxPacketDropout = packets; }
1033 
1034     uint16
getDefaultMaxPacketDropout()1035     getDefaultMaxPacketDropout() const
1036     { return defaultMaxPacketDropout; }
1037 
1038     uint16
getMaxPacketDropout()1039     getMaxPacketDropout() const
1040     { return maxPacketDropout; }
1041 
1042     // default value for constructors that allow to specify
1043     // members table s\ize
1044         inline static size_t
getDefaultMembersSize()1045         getDefaultMembersSize()
1046         { return defaultMembersSize; }
1047 
1048         /**
1049          * Set input queue CryptoContext.
1050          *
1051          * The endQueue method (provided by RTPQueue) deletes all
1052          * registered CryptoContexts.
1053          *
1054          * @param cc Pointer to initialized CryptoContext.
1055          */
1056         void
1057         setInQueueCryptoContext(CryptoContext* cc);
1058 
1059         /**
1060          * Remove input queue CryptoContext.
1061          *
1062          * The endQueue method (provided by RTPQueue) also deletes all
1063          * registered CryptoContexts.
1064          *
1065          * @param cc
1066          *     Pointer to initialized CryptoContext to remove. If pointer
1067          *     if <code>NULL</code> then delete the whole queue
1068          */
1069         void
1070         removeInQueueCryptoContext(CryptoContext* cc);
1071 
1072         /**
1073          * Get an input queue CryptoContext identified by SSRC
1074          *
1075          * @param ssrc Request CryptoContext for this incoming SSRC
1076          * @return Pointer to CryptoContext of the SSRC of NULL if no context
1077          * available for this SSRC.
1078          */
1079         CryptoContext*
1080         getInQueueCryptoContext(uint32 ssrc);
1081 
1082 protected:
1083     /**
1084      * @param size initial size of the membership table.
1085      **/
1086     IncomingDataQueue(uint32 size);
1087 
~IncomingDataQueue()1088     virtual ~IncomingDataQueue()
1089     { }
1090 
1091     /**
1092      * Apply collision and loop detection and correction algorithm
1093      * when receiving RTP data packets. Follows section 8.2 in
1094      * draft-ietf-avt-rtp-new.
1095      *
1096      * @param sourceLink link to the source object.
1097      * @param is_new whether the source has been just recorded.
1098      * @param na data packet network address.
1099      * @param tp data packet source transport port.
1100      *
1101      * @return whether the packet must not be discarded.
1102      **/
1103     bool checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink,
1104                        bool is_new, InetAddress& na,
1105                        tpport_t tp);
1106 
1107     /**
1108      * Set the number of RTCP intervals that the stack will wait
1109      * to change the state of a source from stateActive to
1110      * stateInactive, or to delete the source after being in
1111      * stateInactive.
1112      *
1113      * Note that this value should be uniform accross all
1114      * participants and SHOULD be fixed for a particular profile.
1115      *
1116      * @param intervals number of RTCP report intervals
1117      *
1118      * @note If RTCP is not being used, the RTCP interval is
1119      * assumed to be the default: 5 seconds.
1120      * @note The default for this value is, as RECOMMENDED, 5.
1121      **/
setSourceExpirationPeriod(uint8 intervals)1122     void setSourceExpirationPeriod(uint8 intervals)
1123     { sourceExpirationPeriod = intervals; }
1124 
1125     /**
1126      * This function is used by the service thread to process
1127      * the next incoming packet and place it in the receive list.
1128      *
1129      * @return number of payload bytes received.  <0 if error.
1130      */
1131     virtual size_t
1132     takeInDataPacket();
1133 
1134     void renewLocalSSRC();
1135 
1136     /**
1137      * This is used to fetch a packet in the receive queue and to
1138      * expire packets older than the current timestamp.
1139      *
1140      * @return packet buffer object for current timestamp if found.
1141      * @param timestamp timestamp requested.
1142      * @param src optional source selector
1143      * @note if found, the packet is removed from the reception queue
1144      **/
1145     IncomingDataQueue::IncomingRTPPktLink*
1146     getWaiting(uint32 timestamp, const SyncSource *src = NULL);
1147 
1148     /**
1149      * Log reception of a new RTP packet from this source. Usually
1150      * updates data such as the packet counter, the expected
1151      * sequence number for the next packet and the time the last
1152      * packet was received at.
1153      *
1154      * @param srcLink Link structure for the synchronization
1155      * source of this packet.
1156      * @param pkt Packet just created and to be logged.
1157      * @param recvtime Reception time.
1158      *
1159      * @return whether, according to the source state and
1160      * statistics, the packet is considered valid and must be
1161      * inserted in the incoming packets queue.
1162      **/
1163     bool
1164     recordReception(SyncSourceLink& srcLink, const IncomingRTPPkt& pkt,
1165             const timeval recvtime);
1166 
1167     /**
1168      * Log extraction of a packet from this source from the
1169      * scheduled reception queue.
1170      *
1171      * @param pkt Packet extracted from the queue.
1172      **/
1173     void
1174     recordExtraction(const IncomingRTPPkt& pkt);
1175 
1176     void purgeIncomingQueue();
1177 
1178     /**
1179      * Virtual called when a new synchronization source has joined
1180      * the session.
1181      *
1182      * @param - new synchronization source
1183      **/
1184     inline virtual void
onNewSyncSource(const SyncSource &)1185     onNewSyncSource(const SyncSource&)
1186     { }
1187 
1188 protected:
1189     /**
1190      * A virtual function to support parsing of arriving packets
1191      * to determine if they should be kept in the queue and to
1192      * dispatch events.
1193      *
1194      * A generic header validity check (as specified in RFC 1889)
1195      * is performed on every incoming packet. If the generic check
1196      * completes succesfully, this method is called before the
1197      * packet is actually inserted into the reception queue.
1198      *
1199      * May be used to perform additional validity checks or to do
1200      * some application specific processing.
1201      *
1202      * @param - packet just received.
1203      * @return true if packet is kept in the incoming packets queue.
1204      **/
1205     inline virtual bool
onRTPPacketRecv(IncomingRTPPkt &)1206     onRTPPacketRecv(IncomingRTPPkt&)
1207     { return true; }
1208 
1209     /**
1210      * A hook to filter packets in the receive queue that are being
1211      * expired. This hook may be used to do some application
1212      * specific processing on expired packets before they are
1213      * deleted.
1214      *
1215      * @param - packet expired from the recv queue.
1216      **/
onExpireRecv(IncomingRTPPkt &)1217     inline virtual void onExpireRecv(IncomingRTPPkt&)
1218     { return; }
1219 
1220         /**
1221          * A hook that gets called if the decoding of an incoming SRTP was erroneous
1222          *
1223          * @param pkt
1224          *     The SRTP packet with error.
1225          * @param errorCode
1226          *     The error code: -1 - SRTP authentication failure, -2 - replay
1227          *     check failed
1228          * @return
1229          *     True: put the packet in incoming queue for further processing
1230          *     by the applications; false: dismiss packet. The default
1231          *     implementation returns false.
1232          **/
1233         inline virtual bool
onSRTPPacketError(IncomingRTPPkt & pkt,int32 errorCode)1234         onSRTPPacketError(IncomingRTPPkt& pkt, int32 errorCode)
1235         { return false; }
1236 
1237         inline virtual bool
end2EndDelayed(IncomingRTPPktLink &)1238     end2EndDelayed(IncomingRTPPktLink&)
1239     { return false; }
1240 
1241         /**
1242      * Insert a just received packet in the queue (both general
1243      * and source specific queues). If the packet was already in
1244      * the queue (same SSRC and sequence number), it is not
1245      * inserted but deleted.
1246      *
1247      * @param packetLink link to a packet just received and
1248      * generally validated and processed by onRTPPacketRecv.
1249      *
1250      * @return whether the packet was successfully inserted.
1251      * @retval false when the packet is duplicated (there is
1252      * already a packet from the same source with the same
1253      * timestamp).
1254      * @retval true when the packet is not duplicated.
1255      **/
1256     bool
1257     insertRecvPacket(IncomingRTPPktLink* packetLink);
1258 
1259     /**
1260      * This function performs the physical I/O for reading a
1261      * packet from the source.  It is a virtual that is
1262      * overriden in the derived class.
1263      *
1264      * @return number of bytes read.
1265      * @param buffer of read packet.
1266      * @param length of data to read.
1267      * @param host address of source.
1268      * @param port number of source.
1269      **/
1270     virtual size_t
1271     recvData(unsigned char* buffer, size_t length,
1272          InetHostAddress& host, tpport_t& port) = 0;
1273 
1274     virtual size_t
1275     getNextDataPacketSize() const = 0;
1276 
1277     mutable ThreadLock recvLock;
1278     // reception queue
1279     IncomingRTPPktLink* recvFirst, * recvLast;
1280     // values for packet validation.
1281     static const uint8 defaultMinValidPacketSequence;
1282     static const uint16 defaultMaxPacketMisorder;
1283     static const uint16 defaultMaxPacketDropout;
1284     uint8 minValidPacketSequence;
1285     uint16 maxPacketMisorder;
1286     uint16 maxPacketDropout;
1287     static const size_t defaultMembersSize;
1288     uint8 sourceExpirationPeriod;
1289     mutable Mutex cryptoMutex;
1290         std::list<CryptoContext *> cryptoContexts;
1291 };
1292 
1293 /** @}*/ // iqueue
1294 
1295 END_NAMESPACE
1296 
1297 #endif  //CCXX_RTP_IQUEUE_H_
1298 
1299 /** EMACS **
1300  * Local variables:
1301  * mode: c++
1302  * c-basic-offset: 8
1303  * End:
1304  */
1305