1 // Copyright (C) 2001-2015 Federico Montesino Pouzols <fedemp@altern.org>. 2 // 3 // This program is free software; you can redistribute it and/or modify 4 // it under the terms of the GNU General Public License as published by 5 // the Free Software Foundation; either version 2 of the License, or 6 // (at your option) any later version. 7 // 8 // This program is distributed in the hope that it will be useful, 9 // but WITHOUT ANY WARRANTY; without even the implied warranty of 10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 // GNU General Public License for more details. 12 // 13 // You should have received a copy of the GNU Lesser General Public License 14 // along with GNU ccRTP. If not, see <http://www.gnu.org/licenses/>. 15 // 16 // As a special exception, you may use this file as part of a free software 17 // library without restriction. Specifically, if other files instantiate 18 // templates or use macros or inline functions from this file, or you compile 19 // this file and link it with other files to produce an executable, this 20 // file does not by itself cause the resulting executable to be covered by 21 // the GNU General Public License. This exception does not however 22 // invalidate any other reasons why the executable file might be covered by 23 // the GNU General Public License. 24 // 25 // This exception applies only to the code released under the name GNU 26 // ccRTP. If you copy code from other releases into a copy of GNU 27 // ccRTP, as the General Public License permits, the exception does 28 // not apply to the code that you add in this way. To avoid misleading 29 // anyone as to the status of such modified files, you must delete 30 // this exception notice from them. 31 // 32 // If you write modifications of your own for GNU ccRTP, it is your choice 33 // whether to permit this exception to apply to your modifications. 34 // If you do not wish that, delete this exception notice. 35 // 36 37 /** 38 * @file iqueue.h 39 * 40 * @short Generic RTP input queues. 41 **/ 42 43 #ifndef CCXX_RTP_IQUEUE_H_ 44 #define CCXX_RTP_IQUEUE_H_ 45 46 #include <ccrtp/queuebase.h> 47 #include <ccrtp/CryptoContext.h> 48 49 #include <list> 50 51 NAMESPACE_COMMONCPP 52 53 /** 54 * @defgroup iqueue Generic RTP input queues. 55 * @{ 56 **/ 57 58 /** 59 * @class Members rtp.h 60 * @short members and senders accounting 61 * 62 * Records the number of members as well as active senders. For now, 63 * it is too simple. 64 * 65 * @author Federico Montesino Pouzols <fedemp@altern.org> 66 **/ 67 class __EXPORT Members 68 { 69 public: 70 inline void setMembersCount(uint32 n)71 setMembersCount(uint32 n) 72 { members = n; } 73 74 inline void increaseMembersCount()75 increaseMembersCount() 76 { members++; } 77 78 inline void decreaseMembersCount()79 decreaseMembersCount() 80 { members--; } 81 82 inline uint32 getMembersCount()83 getMembersCount() const 84 { return members; } 85 86 inline void setSendersCount(uint32 n)87 setSendersCount(uint32 n) 88 { activeSenders = n; } 89 90 inline void increaseSendersCount()91 increaseSendersCount() 92 { activeSenders++; } 93 94 inline void decreaseSendersCount()95 decreaseSendersCount() 96 { activeSenders--; } 97 98 inline uint32 getSendersCount()99 getSendersCount() const 100 { return activeSenders; } 101 102 protected: Members()103 Members() : 104 members(0), 105 activeSenders(0) 106 { } 107 ~Members()108 inline virtual ~Members() 109 { } 110 111 private: 112 /// number of identified members 113 uint32 members; 114 /// number of identified members that currently are active senders 115 uint32 activeSenders; 116 }; 117 118 /** 119 * @class SyncSourceHandler 120 * @short SyncSource objects modification methods. 121 * 122 * @author Federico Montesino Pouzols <fedemp@altern.org> 123 **/ 124 class __EXPORT SyncSourceHandler 125 { 126 public: 127 /** 128 * This requires SyncSource - SyncSourceHandler friendship. 129 * 130 * Get the SyncSourceLink corresponding to a SyncSource 131 * object. 132 **/ 133 inline void* getLink(const SyncSource & source)134 getLink(const SyncSource& source) const 135 { return source.getLink(); } 136 137 inline void setLink(SyncSource & source,void * link)138 setLink(SyncSource& source, void* link) 139 { source.setLink(link); } 140 141 inline void setParticipant(SyncSource & source,Participant & p)142 setParticipant(SyncSource& source, Participant& p) 143 { source.setParticipant(p); } 144 145 inline void setState(SyncSource & source,SyncSource::State ns)146 setState(SyncSource& source, SyncSource::State ns) 147 { source.setState(ns); } 148 149 inline void setSender(SyncSource & source,bool active)150 setSender(SyncSource& source, bool active) 151 { source.setSender(active); } 152 153 inline void setDataTransportPort(SyncSource & source,tpport_t p)154 setDataTransportPort(SyncSource& source, tpport_t p) 155 { source.setDataTransportPort(p); } 156 157 inline void setControlTransportPort(SyncSource & source,tpport_t p)158 setControlTransportPort(SyncSource& source, tpport_t p) 159 { source.setControlTransportPort(p); } 160 161 inline void setNetworkAddress(SyncSource & source,InetAddress addr)162 setNetworkAddress(SyncSource& source, InetAddress addr) 163 { source.setNetworkAddress(addr); } 164 165 protected: SyncSourceHandler()166 SyncSourceHandler() 167 { } 168 ~SyncSourceHandler()169 inline virtual ~SyncSourceHandler() 170 { } 171 }; 172 173 /** 174 * @class ParticipantHandler 175 * @short Participant objects modification methods. 176 * 177 * @author Federico Montesino Pouzols <fedemp@altern.org> 178 **/ 179 class __EXPORT ParticipantHandler 180 { 181 public: 182 inline void setSDESItem(Participant * part,SDESItemType item,const std::string & val)183 setSDESItem(Participant* part, SDESItemType item, 184 const std::string& val) 185 { part->setSDESItem(item,val); } 186 187 inline void setPRIVPrefix(Participant * part,const std::string val)188 setPRIVPrefix(Participant* part, const std::string val) 189 { part->setPRIVPrefix(val); } 190 191 protected: ParticipantHandler()192 ParticipantHandler() 193 { } 194 ~ParticipantHandler()195 inline virtual ~ParticipantHandler() 196 { } 197 }; 198 199 /** 200 * @class ApplicationHandler 201 * @short Application objects modification methods. 202 * 203 * @author Federico Montesino Pouzols <fedemp@altern.org> 204 **/ 205 class __EXPORT ApplicationHandler 206 { 207 public: 208 inline void addParticipant(RTPApplication & app,Participant & part)209 addParticipant(RTPApplication& app, Participant& part) 210 { app.addParticipant(part); } 211 212 inline void removeParticipant(RTPApplication & app,RTPApplication::ParticipantLink * pl)213 removeParticipant(RTPApplication& app, 214 RTPApplication::ParticipantLink* pl) 215 { app.removeParticipant(pl); } 216 217 protected: ApplicationHandler()218 ApplicationHandler() 219 { } 220 ~ApplicationHandler()221 inline virtual ~ApplicationHandler() 222 { } 223 }; 224 225 /** 226 * @class ConflictHandler 227 * @short To track addresses of sources conflicting with the local 228 * one. 229 * 230 * @author Federico Montesino Pouzols <fedemp@altern.org> 231 **/ 232 class __EXPORT ConflictHandler 233 { 234 public: 235 struct ConflictingTransportAddress 236 { 237 ConflictingTransportAddress(InetAddress na, 238 tpport_t dtp, tpport_t ctp); 239 setNextConflictingTransportAddress240 void setNext(ConflictingTransportAddress* nc) 241 { next = nc; } 242 getNetworkAddressConflictingTransportAddress243 inline const InetAddress& getNetworkAddress( ) const 244 { return networkAddress; } 245 getDataTransportPortConflictingTransportAddress246 inline tpport_t getDataTransportPort() const 247 { return dataTransportPort; } 248 getControlTransportPortConflictingTransportAddress249 inline tpport_t getControlTransportPort() const 250 { return controlTransportPort; } 251 252 InetAddress networkAddress; 253 tpport_t dataTransportPort; 254 tpport_t controlTransportPort; 255 ConflictingTransportAddress* next; 256 // arrival time of last data or control packet. 257 timeval lastPacketTime; 258 }; 259 260 /** 261 * @param na Inet network address. 262 * @param dtp Data transport port. 263 **/ 264 ConflictingTransportAddress* searchDataConflict(InetAddress na, 265 tpport_t dtp); 266 /** 267 * @param na Inet network address. 268 * @param ctp Data transport port. 269 **/ 270 ConflictingTransportAddress* searchControlConflict(InetAddress na, 271 tpport_t ctp); 272 updateConflict(ConflictingTransportAddress & ca)273 void updateConflict(ConflictingTransportAddress& ca) 274 { SysTime::gettimeofday(&(ca.lastPacketTime),NULL); } 275 276 void addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp); 277 278 protected: ConflictHandler()279 ConflictHandler() 280 { firstConflict = lastConflict = NULL; } 281 ~ConflictHandler()282 inline virtual ~ConflictHandler() 283 { } 284 285 ConflictingTransportAddress* firstConflict, * lastConflict; 286 }; 287 288 /** 289 * @class MembershipBookkeeping 290 * @short Controls the group membership in the current session. 291 * 292 * For now, this class implements only a hash table of members, but 293 * its design and relation with other classes is intented to support 294 * group membership sampling in case scalability problems arise. 295 * 296 * @author Federico Montesino Pouzols <fedemp@altern.org> 297 */ 298 class __EXPORT MembershipBookkeeping : 299 public SyncSourceHandler, 300 public ParticipantHandler, 301 public ApplicationHandler, 302 public ConflictHandler, 303 private Members 304 { 305 public: getDefaultMembersHashSize()306 inline size_t getDefaultMembersHashSize() 307 { return defaultMembersHashSize; } 308 309 protected: 310 311 /** 312 * @short The initial size is a hint to allocate the resources 313 * needed in order to keep the members' identifiers and 314 * associated information. 315 * 316 * Although ccRTP will reallocate resources when it becomes 317 * necessary, a good hint may save a lot of unpredictable time 318 * penalties. 319 * 320 * @param initialSize an estimation of how many participants 321 * the session will consist of. 322 * 323 */ 324 MembershipBookkeeping(uint32 initialSize = defaultMembersHashSize); 325 326 /** 327 * Purges all RTPSource structures created during the session, 328 * as well as the hash table and the list of sources. 329 **/ 330 inline virtual ~MembershipBookkeeping()331 ~MembershipBookkeeping() 332 { endMembers(); } 333 334 struct SyncSourceLink; 335 getLink(const SyncSource & source)336 inline SyncSourceLink* getLink(const SyncSource& source) const 337 { return static_cast<SyncSourceLink*>(SyncSourceHandler::getLink(source)); } 338 /** 339 * Get whether a synchronization source is recorded in this 340 * membership controller. 341 **/ isMine(const SyncSource & source)342 inline bool isMine(const SyncSource& source) const 343 { return getLink(source)->getMembership() == this; } 344 345 /** 346 * @struct IncomingRTPPktLink 347 * 348 * @short Incoming RTP data packets control structure within 349 * the incoming packet queue class. 350 **/ 351 struct IncomingRTPPktLink 352 { IncomingRTPPktLinkIncomingRTPPktLink353 IncomingRTPPktLink(IncomingRTPPkt* pkt, SyncSourceLink* sLink, 354 struct timeval& recv_ts, 355 uint32 shifted_ts, 356 IncomingRTPPktLink* sp, 357 IncomingRTPPktLink* sn, 358 IncomingRTPPktLink* p, 359 IncomingRTPPktLink* n) : 360 packet(pkt), 361 sourceLink(sLink), 362 prev(p), next(n), 363 srcPrev(sp), srcNext(sn), 364 receptionTime(recv_ts), 365 shiftedTimestamp(shifted_ts) 366 { } 367 ~IncomingRTPPktLinkIncomingRTPPktLink368 ~IncomingRTPPktLink() 369 { } 370 getSourceLinkIncomingRTPPktLink371 inline SyncSourceLink* getSourceLink() const 372 { return sourceLink; } 373 setSourceLinkIncomingRTPPktLink374 inline void setSourceLink(SyncSourceLink* src) 375 { sourceLink = src; } 376 getNextIncomingRTPPktLink377 inline IncomingRTPPktLink* getNext() const 378 { return next; } 379 setNextIncomingRTPPktLink380 inline void setNext(IncomingRTPPktLink* nl) 381 { next = nl; } 382 getPrevIncomingRTPPktLink383 inline IncomingRTPPktLink* getPrev() const 384 { return prev; } 385 setPrevIncomingRTPPktLink386 inline void setPrev(IncomingRTPPktLink* pl) 387 { prev = pl; } 388 getSrcNextIncomingRTPPktLink389 inline IncomingRTPPktLink* getSrcNext() const 390 { return srcNext; } 391 setSrcNextIncomingRTPPktLink392 inline void setSrcNext(IncomingRTPPktLink* sn) 393 { srcNext = sn; } 394 getSrcPrevIncomingRTPPktLink395 inline IncomingRTPPktLink* getSrcPrev() const 396 { return srcPrev; } 397 setSrcPrevIncomingRTPPktLink398 inline void setSrcPrev(IncomingRTPPktLink* sp) 399 { srcPrev = sp; } 400 getPacketIncomingRTPPktLink401 inline IncomingRTPPkt* getPacket() const 402 { return packet; } 403 setPacketIncomingRTPPktLink404 inline void setPacket(IncomingRTPPkt* pkt) 405 { packet = pkt; } 406 407 /** 408 * Set the time this packet was received at. 409 * 410 * @param t time of reception. 411 * @note this has almost nothing to do with the 32-bit 412 * timestamp contained in the packet header. 413 **/ setRecvTimeIncomingRTPPktLink414 inline void setRecvTime(const timeval &t) 415 { receptionTime = t; } 416 417 /** 418 * Get the time this packet was received at. 419 **/ getRecvTimeIncomingRTPPktLink420 inline timeval getRecvTime() const 421 { return receptionTime; } 422 423 /** 424 * Get timestamp of this packet. The timestamp of 425 * incoming packets is filtered so that the timestamp 426 * this method provides for the first packet received 427 * from every source starts from 0. 428 * 429 * @return 32 bit timestamp starting from 0 for each source. 430 */ getTimestampIncomingRTPPktLink431 inline uint32 getTimestamp() const 432 { return shiftedTimestamp; } 433 setTimestampIncomingRTPPktLink434 inline void setTimestamp(uint32 ts) 435 { shiftedTimestamp = ts;} 436 437 // the packet this link refers to. 438 IncomingRTPPkt* packet; 439 // the synchronization source this packet comes from. 440 SyncSourceLink* sourceLink; 441 // global incoming packet queue links. 442 IncomingRTPPktLink* prev, * next; 443 // source specific incoming packet queue links. 444 IncomingRTPPktLink* srcPrev, * srcNext; 445 // time this packet was received at 446 struct timeval receptionTime; 447 // timestamp of the packet in host order and after 448 // substracting the initial timestamp for its source 449 // (it is an increment from the initial timestamp). 450 uint32 shiftedTimestamp; 451 }; 452 453 /** 454 * @struct SyncSourceLink 455 * 456 * @short Synchronization Source internal handler within the 457 * incoming packets queue. 458 * 459 * Incoming packets queue objects hold a hash table and a 460 * linked list of synchronization sources. For each of these 461 * sources, there is also a linked list of incoming rtp 462 * packets, which are linked in an "all incoming packets" list 463 * as well. SyncSourceLink objects hold the necessary data to 464 * maintain these data estructures, as well as source specific 465 * information and statistics for RTCP, 466 * 467 * @author Federico Montesino Pouzols <fedemp@altern.org> 468 **/ 469 struct SyncSourceLink 470 { 471 // 2^16 472 static const uint32 SEQNUMMOD; 473 474 SyncSourceLink(MembershipBookkeeping* m, 475 SyncSource* s, 476 IncomingRTPPktLink* fp = NULL, 477 IncomingRTPPktLink* lp = NULL, 478 SyncSourceLink* ps = NULL, 479 SyncSourceLink* ns = NULL, 480 SyncSourceLink* ncollis = NULL) : membershipSyncSourceLink481 membership(m), source(s), first(fp), last(lp), 482 prev(ps), next(ns), nextCollis(ncollis), 483 prevConflict(NULL) 484 { m->setLink(*s,this); // record that the source is associated 485 initStats(); // to this link. 486 } 487 488 /** 489 * Note it deletes the source. 490 **/ 491 ~SyncSourceLink(); 492 getMembershipSyncSourceLink493 inline MembershipBookkeeping* getMembership() 494 { return membership; } 495 496 /** 497 * Get the synchronization source object this link 498 * objet holds information for. 499 **/ getSourceSyncSourceLink500 inline SyncSource* getSource() { return source; } 501 502 /** 503 * Get first RTP (data) packet in the queue of packets 504 * received from this socket. 505 **/ getFirstSyncSourceLink506 inline IncomingRTPPktLink* getFirst() 507 { return first; } 508 setFirstSyncSourceLink509 inline void setFirst(IncomingRTPPktLink* fp) 510 { first = fp; } 511 512 /** 513 * Get last RTP (data) packet in the queue of packets 514 * received from this socket. 515 **/ getLastSyncSourceLink516 inline IncomingRTPPktLink* getLast() 517 { return last; } 518 setLastSyncSourceLink519 inline void setLast(IncomingRTPPktLink* lp) 520 { last = lp; } 521 522 /** 523 * Get the link object for the previous RTP source. 524 **/ getPrevSyncSourceLink525 inline SyncSourceLink* getPrev() 526 { return prev; } 527 setPrevSyncSourceLink528 inline void setPrev(SyncSourceLink* ps) 529 { prev = ps; } 530 531 /** 532 * Get the link object for the next RTP source. 533 **/ getNextSyncSourceLink534 inline SyncSourceLink* getNext() 535 { return next; } 536 setNextSyncSourceLink537 inline void setNext(SyncSourceLink *ns) 538 { next = ns; } 539 540 /** 541 * Get the link object for the next RTP source in the 542 * hash table entry collision list. Note that 543 * collision does not refer to SSRC collision, but 544 * hash table collision. 545 **/ getNextCollisSyncSourceLink546 inline SyncSourceLink* getNextCollis() 547 { return nextCollis; } 548 setNextCollisSyncSourceLink549 inline void setNextCollis(SyncSourceLink* ns) 550 { nextCollis = ns; } 551 getPrevConflictSyncSourceLink552 inline ConflictingTransportAddress* getPrevConflict() const 553 { return prevConflict; } 554 555 /** 556 * Get conflicting address. 557 **/ 558 void setPrevConflict(InetAddress& addr, tpport_t dataPort, 559 tpport_t controlPort); 560 getSenderInfoSyncSourceLink561 unsigned char* getSenderInfo() 562 { return senderInfo; } 563 564 void setSenderInfo(unsigned char* si); 565 getReceiverInfoSyncSourceLink566 unsigned char* getReceiverInfo() 567 { return receiverInfo; } 568 569 void setReceiverInfo(unsigned char* ri); 570 getLastPacketTimeSyncSourceLink571 inline timeval getLastPacketTime() const 572 { return lastPacketTime; } 573 getLastRTCPPacketTimeSyncSourceLink574 inline timeval getLastRTCPPacketTime() const 575 { return lastRTCPPacketTime; } 576 getLastRTCPSRTimeSyncSourceLink577 inline timeval getLastRTCPSRTime() const 578 { return lastRTCPSRTime; } 579 580 /** 581 * Get the total number of RTP packets received from this 582 * source. 583 */ getObservedPacketCountSyncSourceLink584 inline uint32 getObservedPacketCount() const 585 { return obsPacketCount; } 586 incObservedPacketCountSyncSourceLink587 inline void incObservedPacketCount() 588 { obsPacketCount++; } 589 590 /** 591 * Get the total number of payload octets received from this 592 * source. 593 **/ getObservedOctetCountSyncSourceLink594 inline uint32 getObservedOctetCount() const 595 { return obsOctetCount; } 596 incObservedOctetCountSyncSourceLink597 inline void incObservedOctetCount(uint32 n) 598 { obsOctetCount += n; } 599 600 /** 601 * Get the highest valid sequence number received. 602 **/ 603 uint16 getMaxSeqNumSyncSourceLink604 getMaxSeqNum() const 605 { return maxSeqNum; } 606 607 /** 608 * Set the highest valid sequence number recived. 609 * @param max Sequence number. 610 **/ 611 void setMaxSeqNumSyncSourceLink612 setMaxSeqNum(uint16 max) 613 { maxSeqNum = max; } 614 615 inline uint32 getExtendedMaxSeqNumSyncSourceLink616 getExtendedMaxSeqNum() const 617 { return extendedMaxSeqNum; } 618 619 inline void setExtendedMaxSeqNumSyncSourceLink620 setExtendedMaxSeqNum(uint32 seq) 621 { extendedMaxSeqNum = seq; } 622 getCumulativePacketLostSyncSourceLink623 inline uint32 getCumulativePacketLost() const 624 { return cumulativePacketLost; } 625 setCumulativePacketLostSyncSourceLink626 inline void setCumulativePacketLost(uint32 pl) 627 { cumulativePacketLost = pl; } 628 getFractionLostSyncSourceLink629 inline uint8 getFractionLost() const 630 { return fractionLost; } 631 setFractionLostSyncSourceLink632 inline void setFractionLost(uint8 fl) 633 { fractionLost = fl; } 634 getLastPacketTransitTimeSyncSourceLink635 inline uint32 getLastPacketTransitTime() 636 { return lastPacketTransitTime; } 637 setLastPacketTransitTimeSyncSourceLink638 inline void setLastPacketTransitTime(uint32 time) 639 { lastPacketTransitTime = time; } 640 getJitterSyncSourceLink641 inline float getJitter() const 642 { return jitter; } 643 setJitterSyncSourceLink644 inline void setJitter(float j) 645 { jitter = j; } 646 getInitialDataTimestampSyncSourceLink647 inline uint32 getInitialDataTimestamp() const 648 { return initialDataTimestamp; } 649 setInitialDataTimestampSyncSourceLink650 inline void setInitialDataTimestamp(uint32 ts) 651 { initialDataTimestamp = ts; } 652 getInitialDataTimeSyncSourceLink653 inline timeval getInitialDataTime() const 654 { return initialDataTime; } 655 setInitialDataTimeSyncSourceLink656 inline void setInitialDataTime(timeval it) 657 { initialDataTime = it; } 658 659 /** 660 * Mark this source as having sent a BYE control packet. 661 * 662 * @return whether some packet from this source had 663 * been received before (getHello() has been called at 664 * least once) 665 **/ getGoodbyeSyncSourceLink666 bool getGoodbye() 667 { 668 if(!flag) 669 return false; 670 flag = false; 671 return true; 672 } 673 674 /** 675 * Mark this source as having sent some packet. 676 * 677 * @return whether no packet from this source had been 678 * received before 679 **/ getHelloSyncSourceLink680 bool getHello() { 681 if(flag) 682 return false; 683 flag = true; 684 return true; 685 } 686 getBadSeqNumSyncSourceLink687 inline uint32 getBadSeqNum() const 688 { return badSeqNum; } 689 setBadSeqNumSyncSourceLink690 inline void setBadSeqNum(uint32 seq) 691 { badSeqNum = seq; } 692 getProbationSyncSourceLink693 uint8 getProbation() const 694 { return probation; } 695 setProbationSyncSourceLink696 inline void setProbation(uint8 p) 697 { probation = p; } 698 decProbationSyncSourceLink699 inline void decProbation() 700 { --probation; } 701 isValidSyncSourceLink702 bool isValid() const 703 { return 0 == probation; } 704 getBaseSeqNumSyncSourceLink705 inline uint16 getBaseSeqNum() const 706 { return baseSeqNum; } 707 setBaseSeqNumSyncSourceLink708 inline void setBaseSeqNum(uint16 seqnum) 709 { baseSeqNum = seqnum; } 710 getSeqNumAccumSyncSourceLink711 inline uint32 getSeqNumAccum() const 712 { return seqNumAccum; } 713 incSeqNumAccumSyncSourceLink714 inline void incSeqNumAccum() 715 { seqNumAccum += SEQNUMMOD; } 716 717 /** 718 * Start a new sequence of received packets. 719 **/ initSequenceSyncSourceLink720 inline void initSequence(uint16 seqnum) 721 { maxSeqNum = seqNumAccum = seqnum; } 722 723 /** 724 * Record the insertion of an RTP packet from this 725 * source into the scheduled reception queue. All 726 * received packets should be registered with 727 * recordReception(), but only those actually inserted 728 * into the queue should be registered via this 729 * method. 730 * 731 * @param pl Link structure for packet inserted into the queue. 732 **/ 733 void recordInsertion(const IncomingRTPPktLink& pl); 734 735 void initStats(); 736 737 /** 738 * Compute cumulative packet lost and fraction of 739 * packets lost during the last reporting interval. 740 **/ 741 void computeStats(); 742 743 MembershipBookkeeping* membership; 744 // The source this link object refers to. 745 SyncSource* source; 746 // first/last packets from this source in the queue. 747 IncomingRTPPktLink* first, * last; 748 // Links for synchronization sources located before 749 // and after this one in the list of sources. 750 SyncSourceLink* prev, * next; 751 // Prev and next inside the hash table collision list. 752 SyncSourceLink* nextCollis; 753 ConflictingTransportAddress* prevConflict; 754 unsigned char* senderInfo; 755 unsigned char* receiverInfo; 756 // time the last RTP packet from this source was 757 // received at. 758 timeval lastPacketTime; 759 // time the last RTCP packet was received. 760 timeval lastRTCPPacketTime; 761 // time the lasrt RTCP SR was received. Required for 762 // DLSR computation. 763 timeval lastRTCPSRTime; 764 765 // for outgoing RR reports. 766 // number of packets received from this source. 767 uint32 obsPacketCount; 768 // number of octets received from this source. 769 uint32 obsOctetCount; 770 // the higher sequence number seen from this source 771 uint16 maxSeqNum; 772 uint32 extendedMaxSeqNum; 773 uint32 cumulativePacketLost; 774 uint8 fractionLost; 775 // for interarrivel jitter computation 776 uint32 lastPacketTransitTime; 777 // interarrival jitter of packets from this source. 778 float jitter; 779 uint32 initialDataTimestamp; 780 timeval initialDataTime; 781 782 // this flag assures we only call one gotHello and one 783 // gotGoodbye for this src. 784 bool flag; 785 786 // for source validation: 787 uint32 badSeqNum; 788 uint8 probation; // packets in sequence before valid. 789 uint16 baseSeqNum; 790 uint32 expectedPrior; 791 uint32 receivedPrior; 792 uint32 seqNumAccum; 793 }; 794 795 /** 796 * Returns whether there is already a synchronizacion source 797 * with "ssrc" SSRC identifier. 798 **/ 799 bool 800 isRegistered(uint32 ssrc); 801 802 /** 803 * Get the description of a source by its <code>ssrc</code> identifier. 804 * 805 * @param ssrc SSRC identifier, in host order. 806 * @param created whether a new source has been created. 807 * @return Pointer to the SyncSource object identified by 808 * <code>ssrc</code>. 809 */ 810 SyncSourceLink* 811 getSourceBySSRC(uint32 ssrc, bool& created); 812 813 /** 814 * Mark the source identified by <code>ssrc</code> as having 815 * sent a BYE packet. It is not deleted until a timeout 816 * expires, so that in case some packets from this source 817 * arrive a bit later the source is not inserted again in the 818 * table of known sources. 819 * 820 * @return true if the source had been previously identified. 821 * false if it was not in the table of known sources. 822 **/ 823 bool 824 BYESource(uint32 ssrc); 825 826 /** 827 * Remove the description of the source identified by 828 * <code>ssrc</code> 829 * 830 * @return whether the source has been actually removed or it 831 * did not exist. 832 */ 833 bool 834 removeSource(uint32 ssrc); 835 getFirst()836 inline SyncSourceLink* getFirst() 837 { return first; } 838 getLast()839 inline SyncSourceLink* getLast() 840 { return last; } 841 842 inline uint32 getMembersCount()843 getMembersCount() 844 { return Members::getMembersCount(); } 845 846 inline void setMembersCount(uint32 n)847 setMembersCount(uint32 n) 848 { Members::setMembersCount(n); } 849 850 inline uint32 getSendersCount()851 getSendersCount() 852 { return Members::getSendersCount(); } 853 854 static const size_t defaultMembersHashSize; 855 static const uint32 SEQNUMMOD; 856 857 private: 858 MembershipBookkeeping(const MembershipBookkeeping &o); 859 860 MembershipBookkeeping& 861 operator=(const MembershipBookkeeping &o); 862 863 /** 864 * Purge all RTPSource structures, the hash table and the list 865 * of sources. 866 **/ 867 void 868 endMembers(); 869 870 // Hash table with sources of RTP and RTCP packets 871 uint32 sourceBucketsNum; 872 SyncSourceLink** sourceLinks; 873 // List of sources, ordered from older to newer 874 SyncSourceLink* first, * last; 875 }; 876 877 /** 878 * @class IncomingDataQueue 879 * @short Queue for incoming RTP data packets in an RTP session. 880 * 881 * @author Federico Montesino Pouzols <fedemp@altern.org> 882 **/ 883 class __EXPORT IncomingDataQueue: public IncomingDataQueueBase, 884 protected MembershipBookkeeping 885 { 886 public: 887 /** 888 * @class SyncSourcesIterator 889 * @short iterator through the list of synchronizations 890 * sources in this session 891 **/ 892 class SyncSourcesIterator 893 { 894 public: 895 typedef std::forward_iterator_tag iterator_category; 896 typedef SyncSource value_type; 897 typedef std::ptrdiff_t difference_type; 898 typedef const SyncSource* pointer; 899 typedef const SyncSource& reference; 900 901 SyncSourcesIterator(SyncSourceLink* l = NULL) : link(l)902 link(l) 903 { } 904 SyncSourcesIterator(const SyncSourcesIterator & si)905 SyncSourcesIterator(const SyncSourcesIterator& si) : 906 link(si.link) 907 { } 908 909 reference operator*() const 910 { return *(link->getSource()); } 911 912 pointer operator->() const 913 { return link->getSource(); } 914 915 SyncSourcesIterator& operator++() { 916 link = link->getNext(); 917 return *this; 918 } 919 920 SyncSourcesIterator operator++(int) { 921 SyncSourcesIterator result(*this); 922 ++(*this); 923 return result; 924 } 925 926 friend bool operator==(const SyncSourcesIterator& l, 927 const SyncSourcesIterator& r) 928 { return l.link == r.link; } 929 930 friend bool operator!=(const SyncSourcesIterator& l, 931 const SyncSourcesIterator& r) 932 { return l.link != r.link; } 933 934 private: 935 SyncSourceLink *link; 936 }; 937 begin()938 SyncSourcesIterator begin() 939 { return SyncSourcesIterator(MembershipBookkeeping::getFirst()); } 940 end()941 SyncSourcesIterator end() 942 { return SyncSourcesIterator(NULL); } 943 944 /** 945 * Retreive data from a specific timestamped packet if such a 946 * packet is currently available in the receive buffer. 947 * 948 * @param stamp Data unit timestamp. 949 * @param src Optional synchronization source selector. 950 * @return data retrieved from the reception buffer. 951 * @retval null pointer if no packet with such timestamp is available. 952 **/ 953 const AppDataUnit* 954 getData(uint32 stamp, const SyncSource* src = NULL); 955 956 957 /** 958 * Determine if packets are waiting in the reception queue. 959 * 960 * @param src Optional synchronization source selector. 961 * @return True if packets are waiting. 962 */ 963 bool 964 isWaiting(const SyncSource* src = NULL) const; 965 966 /** 967 * Get timestamp of first packet waiting in the queue. 968 * 969 * @param src optional source selector. 970 * @return timestamp of first arrival packet. 971 **/ 972 uint32 973 getFirstTimestamp(const SyncSource* src = NULL) const; 974 975 /** 976 * When receiving packets from a new source, it may be 977 * convenient to reject a first few packets before we are 978 * really sure the source is valid. This method sets how many 979 * data packets must be received in sequence before the source 980 * is considered valid and the stack starts to accept its 981 * packets. 982 * 983 * @note the default (see defaultMinValidPacketSequence()) 984 * value for this parameter is 0, so that no packets are 985 * rejected (data packets are accepted from the first one). 986 * 987 * @note this validation is performed after the generic header 988 * validation and the additional validation done in 989 * onRTPPacketRecv(). 990 * 991 * @note if any valid RTCP packet is received from this 992 * source, it will be immediatly considered valid regardless 993 * of the number of sequential data packets received. 994 * 995 * @param packets number of sequential packet required 996 **/ 997 void setMinValidPacketSequence(uint8 packets)998 setMinValidPacketSequence(uint8 packets) 999 { minValidPacketSequence = packets; } 1000 1001 uint8 getDefaultMinValidPacketSequence()1002 getDefaultMinValidPacketSequence() const 1003 { return defaultMinValidPacketSequence; } 1004 1005 /** 1006 * Get the minimun number of consecutive packets that must be 1007 * received from a source before accepting its data packets. 1008 **/ 1009 uint8 getMinValidPacketSequence()1010 getMinValidPacketSequence() const 1011 { return minValidPacketSequence; } 1012 1013 void setMaxPacketMisorder(uint16 packets)1014 setMaxPacketMisorder(uint16 packets) 1015 { maxPacketMisorder = packets; } 1016 1017 uint16 getDefaultMaxPacketMisorder()1018 getDefaultMaxPacketMisorder() const 1019 { return defaultMaxPacketMisorder; } 1020 1021 uint16 getMaxPacketMisorder()1022 getMaxPacketMisorder() const 1023 { return maxPacketMisorder; } 1024 1025 /** 1026 * 1027 * It also prevents packets sent after a restart of the source 1028 * being immediately accepted. 1029 **/ 1030 void setMaxPacketDropout(uint16 packets)1031 setMaxPacketDropout(uint16 packets) // default: 3000. 1032 { maxPacketDropout = packets; } 1033 1034 uint16 getDefaultMaxPacketDropout()1035 getDefaultMaxPacketDropout() const 1036 { return defaultMaxPacketDropout; } 1037 1038 uint16 getMaxPacketDropout()1039 getMaxPacketDropout() const 1040 { return maxPacketDropout; } 1041 1042 // default value for constructors that allow to specify 1043 // members table s\ize 1044 inline static size_t getDefaultMembersSize()1045 getDefaultMembersSize() 1046 { return defaultMembersSize; } 1047 1048 /** 1049 * Set input queue CryptoContext. 1050 * 1051 * The endQueue method (provided by RTPQueue) deletes all 1052 * registered CryptoContexts. 1053 * 1054 * @param cc Pointer to initialized CryptoContext. 1055 */ 1056 void 1057 setInQueueCryptoContext(CryptoContext* cc); 1058 1059 /** 1060 * Remove input queue CryptoContext. 1061 * 1062 * The endQueue method (provided by RTPQueue) also deletes all 1063 * registered CryptoContexts. 1064 * 1065 * @param cc 1066 * Pointer to initialized CryptoContext to remove. If pointer 1067 * if <code>NULL</code> then delete the whole queue 1068 */ 1069 void 1070 removeInQueueCryptoContext(CryptoContext* cc); 1071 1072 /** 1073 * Get an input queue CryptoContext identified by SSRC 1074 * 1075 * @param ssrc Request CryptoContext for this incoming SSRC 1076 * @return Pointer to CryptoContext of the SSRC of NULL if no context 1077 * available for this SSRC. 1078 */ 1079 CryptoContext* 1080 getInQueueCryptoContext(uint32 ssrc); 1081 1082 protected: 1083 /** 1084 * @param size initial size of the membership table. 1085 **/ 1086 IncomingDataQueue(uint32 size); 1087 ~IncomingDataQueue()1088 virtual ~IncomingDataQueue() 1089 { } 1090 1091 /** 1092 * Apply collision and loop detection and correction algorithm 1093 * when receiving RTP data packets. Follows section 8.2 in 1094 * draft-ietf-avt-rtp-new. 1095 * 1096 * @param sourceLink link to the source object. 1097 * @param is_new whether the source has been just recorded. 1098 * @param na data packet network address. 1099 * @param tp data packet source transport port. 1100 * 1101 * @return whether the packet must not be discarded. 1102 **/ 1103 bool checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink, 1104 bool is_new, InetAddress& na, 1105 tpport_t tp); 1106 1107 /** 1108 * Set the number of RTCP intervals that the stack will wait 1109 * to change the state of a source from stateActive to 1110 * stateInactive, or to delete the source after being in 1111 * stateInactive. 1112 * 1113 * Note that this value should be uniform accross all 1114 * participants and SHOULD be fixed for a particular profile. 1115 * 1116 * @param intervals number of RTCP report intervals 1117 * 1118 * @note If RTCP is not being used, the RTCP interval is 1119 * assumed to be the default: 5 seconds. 1120 * @note The default for this value is, as RECOMMENDED, 5. 1121 **/ setSourceExpirationPeriod(uint8 intervals)1122 void setSourceExpirationPeriod(uint8 intervals) 1123 { sourceExpirationPeriod = intervals; } 1124 1125 /** 1126 * This function is used by the service thread to process 1127 * the next incoming packet and place it in the receive list. 1128 * 1129 * @return number of payload bytes received. <0 if error. 1130 */ 1131 virtual size_t 1132 takeInDataPacket(); 1133 1134 void renewLocalSSRC(); 1135 1136 /** 1137 * This is used to fetch a packet in the receive queue and to 1138 * expire packets older than the current timestamp. 1139 * 1140 * @return packet buffer object for current timestamp if found. 1141 * @param timestamp timestamp requested. 1142 * @param src optional source selector 1143 * @note if found, the packet is removed from the reception queue 1144 **/ 1145 IncomingDataQueue::IncomingRTPPktLink* 1146 getWaiting(uint32 timestamp, const SyncSource *src = NULL); 1147 1148 /** 1149 * Log reception of a new RTP packet from this source. Usually 1150 * updates data such as the packet counter, the expected 1151 * sequence number for the next packet and the time the last 1152 * packet was received at. 1153 * 1154 * @param srcLink Link structure for the synchronization 1155 * source of this packet. 1156 * @param pkt Packet just created and to be logged. 1157 * @param recvtime Reception time. 1158 * 1159 * @return whether, according to the source state and 1160 * statistics, the packet is considered valid and must be 1161 * inserted in the incoming packets queue. 1162 **/ 1163 bool 1164 recordReception(SyncSourceLink& srcLink, const IncomingRTPPkt& pkt, 1165 const timeval recvtime); 1166 1167 /** 1168 * Log extraction of a packet from this source from the 1169 * scheduled reception queue. 1170 * 1171 * @param pkt Packet extracted from the queue. 1172 **/ 1173 void 1174 recordExtraction(const IncomingRTPPkt& pkt); 1175 1176 void purgeIncomingQueue(); 1177 1178 /** 1179 * Virtual called when a new synchronization source has joined 1180 * the session. 1181 * 1182 * @param - new synchronization source 1183 **/ 1184 inline virtual void onNewSyncSource(const SyncSource &)1185 onNewSyncSource(const SyncSource&) 1186 { } 1187 1188 protected: 1189 /** 1190 * A virtual function to support parsing of arriving packets 1191 * to determine if they should be kept in the queue and to 1192 * dispatch events. 1193 * 1194 * A generic header validity check (as specified in RFC 1889) 1195 * is performed on every incoming packet. If the generic check 1196 * completes succesfully, this method is called before the 1197 * packet is actually inserted into the reception queue. 1198 * 1199 * May be used to perform additional validity checks or to do 1200 * some application specific processing. 1201 * 1202 * @param - packet just received. 1203 * @return true if packet is kept in the incoming packets queue. 1204 **/ 1205 inline virtual bool onRTPPacketRecv(IncomingRTPPkt &)1206 onRTPPacketRecv(IncomingRTPPkt&) 1207 { return true; } 1208 1209 /** 1210 * A hook to filter packets in the receive queue that are being 1211 * expired. This hook may be used to do some application 1212 * specific processing on expired packets before they are 1213 * deleted. 1214 * 1215 * @param - packet expired from the recv queue. 1216 **/ onExpireRecv(IncomingRTPPkt &)1217 inline virtual void onExpireRecv(IncomingRTPPkt&) 1218 { return; } 1219 1220 /** 1221 * A hook that gets called if the decoding of an incoming SRTP was erroneous 1222 * 1223 * @param pkt 1224 * The SRTP packet with error. 1225 * @param errorCode 1226 * The error code: -1 - SRTP authentication failure, -2 - replay 1227 * check failed 1228 * @return 1229 * True: put the packet in incoming queue for further processing 1230 * by the applications; false: dismiss packet. The default 1231 * implementation returns false. 1232 **/ 1233 inline virtual bool onSRTPPacketError(IncomingRTPPkt & pkt,int32 errorCode)1234 onSRTPPacketError(IncomingRTPPkt& pkt, int32 errorCode) 1235 { return false; } 1236 1237 inline virtual bool end2EndDelayed(IncomingRTPPktLink &)1238 end2EndDelayed(IncomingRTPPktLink&) 1239 { return false; } 1240 1241 /** 1242 * Insert a just received packet in the queue (both general 1243 * and source specific queues). If the packet was already in 1244 * the queue (same SSRC and sequence number), it is not 1245 * inserted but deleted. 1246 * 1247 * @param packetLink link to a packet just received and 1248 * generally validated and processed by onRTPPacketRecv. 1249 * 1250 * @return whether the packet was successfully inserted. 1251 * @retval false when the packet is duplicated (there is 1252 * already a packet from the same source with the same 1253 * timestamp). 1254 * @retval true when the packet is not duplicated. 1255 **/ 1256 bool 1257 insertRecvPacket(IncomingRTPPktLink* packetLink); 1258 1259 /** 1260 * This function performs the physical I/O for reading a 1261 * packet from the source. It is a virtual that is 1262 * overriden in the derived class. 1263 * 1264 * @return number of bytes read. 1265 * @param buffer of read packet. 1266 * @param length of data to read. 1267 * @param host address of source. 1268 * @param port number of source. 1269 **/ 1270 virtual size_t 1271 recvData(unsigned char* buffer, size_t length, 1272 InetHostAddress& host, tpport_t& port) = 0; 1273 1274 virtual size_t 1275 getNextDataPacketSize() const = 0; 1276 1277 mutable ThreadLock recvLock; 1278 // reception queue 1279 IncomingRTPPktLink* recvFirst, * recvLast; 1280 // values for packet validation. 1281 static const uint8 defaultMinValidPacketSequence; 1282 static const uint16 defaultMaxPacketMisorder; 1283 static const uint16 defaultMaxPacketDropout; 1284 uint8 minValidPacketSequence; 1285 uint16 maxPacketMisorder; 1286 uint16 maxPacketDropout; 1287 static const size_t defaultMembersSize; 1288 uint8 sourceExpirationPeriod; 1289 mutable Mutex cryptoMutex; 1290 std::list<CryptoContext *> cryptoContexts; 1291 }; 1292 1293 /** @}*/ // iqueue 1294 1295 END_NAMESPACE 1296 1297 #endif //CCXX_RTP_IQUEUE_H_ 1298 1299 /** EMACS ** 1300 * Local variables: 1301 * mode: c++ 1302 * c-basic-offset: 8 1303 * End: 1304 */ 1305