1 /* 2 * SRT - Secure, Reliable, Transport 3 * Copyright (c) 2018 Haivision Systems Inc. 4 * 5 * This Source Code Form is subject to the terms of the Mozilla Public 6 * License, v. 2.0. If a copy of the MPL was not distributed with this 7 * file, You can obtain one at http://mozilla.org/MPL/2.0/. 8 * 9 */ 10 11 /***************************************************************************** 12 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois. 13 All rights reserved. 14 15 Redistribution and use in source and binary forms, with or without 16 modification, are permitted provided that the following conditions are 17 met: 18 19 * Redistributions of source code must retain the above 20 copyright notice, this list of conditions and the 21 following disclaimer. 22 23 * Redistributions in binary form must reproduce the 24 above copyright notice, this list of conditions 25 and the following disclaimer in the documentation 26 and/or other materials provided with the distribution. 27 28 * Neither the name of the University of Illinois 29 nor the names of its contributors may be used to 30 endorse or promote products derived from this 31 software without specific prior written permission. 32 33 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 34 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 35 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 36 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 37 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 38 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 39 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 40 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 41 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 42 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 43 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 44 *****************************************************************************/ 45 46 /***************************************************************************** 47 written by 48 Yunhong Gu, last updated 01/12/2011 49 modified by 50 Haivision Systems Inc. 51 *****************************************************************************/ 52 53 #ifndef INC_SRT_QUEUE_H 54 #define INC_SRT_QUEUE_H 55 56 #include "common.h" 57 #include "packet.h" 58 #include "socketconfig.h" 59 #include "netinet_any.h" 60 #include "utilities.h" 61 #include <list> 62 #include <map> 63 #include <queue> 64 #include <vector> 65 66 namespace srt 67 { 68 class CChannel; 69 class CUDT; 70 71 struct CUnit 72 { 73 CPacket m_Packet; // packet 74 enum Flag 75 { 76 FREE = 0, 77 GOOD = 1, 78 PASSACK = 2, 79 DROPPED = 3 80 }; 81 Flag m_iFlag; // 0: free, 1: occupied, 2: msg read but not freed (out-of-order), 3: msg dropped 82 }; 83 84 class CUnitQueue 85 { 86 87 public: 88 CUnitQueue(); 89 ~CUnitQueue(); 90 91 public: // Storage size operations 92 /// Initialize the unit queue. 93 /// @param [in] size queue size 94 /// @param [in] mss maximum segment size 95 /// @param [in] version IP version 96 /// @return 0: success, -1: failure. 97 int init(int size, int mss, int version); 98 99 /// Increase (double) the unit queue size. 100 /// @return 0: success, -1: failure. 101 102 int increase(); 103 104 /// Decrease (halve) the unit queue size. 105 /// @return 0: success, -1: failure. 106 107 int shrink(); 108 109 public: size()110 int size() const { return m_iSize - m_iCount; } capacity()111 int capacity() const { return m_iSize; } 112 113 public: // Operations on units 114 /// find an available unit for incoming packet. 115 /// @return Pointer to the available unit, NULL if not found. 116 CUnit* getNextAvailUnit(); 117 118 void makeUnitFree(CUnit* unit); 119 120 void makeUnitGood(CUnit* unit); 121 122 public: getIPversion()123 inline int getIPversion() const { return m_iIPversion; } 124 125 private: 126 struct CQEntry 127 { 128 CUnit* m_pUnit; // unit queue 129 char* m_pBuffer; // data buffer 130 int m_iSize; // size of each queue 131 132 CQEntry* m_pNext; 133 } * m_pQEntry, // pointer to the first unit queue 134 *m_pCurrQueue, // pointer to the current available queue 135 *m_pLastQueue; // pointer to the last unit queue 136 137 CUnit* m_pAvailUnit; // recent available unit 138 139 int m_iSize; // total size of the unit queue, in number of packets 140 srt::sync::atomic<int> m_iCount; // total number of valid (occupied) packets in the queue 141 142 int m_iMSS; // unit buffer size 143 int m_iIPversion; // IP version 144 145 private: 146 CUnitQueue(const CUnitQueue&); 147 CUnitQueue& operator=(const CUnitQueue&); 148 }; 149 150 struct CSNode 151 { 152 CUDT* m_pUDT; // Pointer to the instance of CUDT socket 153 sync::steady_clock::time_point m_tsTimeStamp; 154 155 srt::sync::atomic<int> m_iHeapLoc; // location on the heap, -1 means not on the heap 156 }; 157 158 class CSndUList 159 { 160 public: 161 CSndUList(sync::CTimer* pTimer); 162 ~CSndUList(); 163 164 public: 165 enum EReschedule 166 { 167 DONT_RESCHEDULE = 0, 168 DO_RESCHEDULE = 1 169 }; 170 rescheduleIf(bool cond)171 static EReschedule rescheduleIf(bool cond) { return cond ? DO_RESCHEDULE : DONT_RESCHEDULE; } 172 173 /// Update the timestamp of the UDT instance on the list. 174 /// @param [in] u pointer to the UDT instance 175 /// @param [in] reschedule if the timestamp should be rescheduled 176 /// @param [in] ts the next time to trigger sending logic on the CUDT 177 void update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts = sync::steady_clock::now()); 178 179 /// Retrieve the next (in time) socket from the heap to process its sending request. 180 /// @return a pointer to CUDT instance to process next. 181 CUDT* pop(); 182 183 /// Remove UDT instance from the list. 184 /// @param [in] u pointer to the UDT instance 185 void remove(const CUDT* u);// EXCLUDES(m_ListLock); 186 187 /// Retrieve the next scheduled processing time. 188 /// @return Scheduled processing time of the first UDT socket in the list. 189 sync::steady_clock::time_point getNextProcTime(); 190 191 /// Wait for the list to become non empty. 192 void waitNonEmpty() const; 193 194 /// Signal to stop waiting in waitNonEmpty(). 195 void signalInterrupt() const; 196 197 private: 198 /// Doubles the size of the list. 199 /// 200 void realloc_();// REQUIRES(m_ListLock); 201 202 /// Insert a new UDT instance into the list with realloc if required. 203 /// 204 /// @param [in] ts time stamp: next processing time 205 /// @param [in] u pointer to the UDT instance 206 void insert_(const sync::steady_clock::time_point& ts, const CUDT* u); 207 208 /// Insert a new UDT instance into the list without realloc. 209 /// Should be called if there is a gauranteed space for the element. 210 /// 211 /// @param [in] ts time stamp: next processing time 212 /// @param [in] u pointer to the UDT instance 213 void insert_norealloc_(const sync::steady_clock::time_point& ts, const CUDT* u);// REQUIRES(m_ListLock); 214 215 /// Removes CUDT entry from the list. 216 /// If the last entry is removed, calls sync::CTimer::interrupt(). 217 void remove_(const CUDT* u); 218 219 private: 220 CSNode** m_pHeap; // The heap array 221 int m_iArrayLength; // physical length of the array 222 int m_iLastEntry; // position of last entry on the heap array or -1 if empty. 223 224 mutable sync::Mutex m_ListLock; // Protects the list (m_pHeap, m_iArrayLength, m_iLastEntry). 225 mutable sync::Condition m_ListCond; 226 227 sync::CTimer* const m_pTimer; 228 229 private: 230 CSndUList(const CSndUList&); 231 CSndUList& operator=(const CSndUList&); 232 }; 233 234 struct CRNode 235 { 236 CUDT* m_pUDT; // Pointer to the instance of CUDT socket 237 sync::steady_clock::time_point m_tsTimeStamp; // Time Stamp 238 239 CRNode* m_pPrev; // previous link 240 CRNode* m_pNext; // next link 241 242 srt::sync::atomic<bool> m_bOnList; // if the node is already on the list 243 }; 244 245 class CRcvUList 246 { 247 public: 248 CRcvUList(); 249 ~CRcvUList(); 250 251 public: 252 /// Insert a new UDT instance to the list. 253 /// @param [in] u pointer to the UDT instance 254 255 void insert(const CUDT* u); 256 257 /// Remove the UDT instance from the list. 258 /// @param [in] u pointer to the UDT instance 259 260 void remove(const CUDT* u); 261 262 /// Move the UDT instance to the end of the list, if it already exists; otherwise, do nothing. 263 /// @param [in] u pointer to the UDT instance 264 265 void update(const CUDT* u); 266 267 public: 268 CRNode* m_pUList; // the head node 269 270 private: 271 CRNode* m_pLast; // the last node 272 273 private: 274 CRcvUList(const CRcvUList&); 275 CRcvUList& operator=(const CRcvUList&); 276 }; 277 278 class CHash 279 { 280 public: 281 CHash(); 282 ~CHash(); 283 284 public: 285 /// Initialize the hash table. 286 /// @param [in] size hash table size 287 288 void init(int size); 289 290 /// Look for a UDT instance from the hash table. 291 /// @param [in] id socket ID 292 /// @return Pointer to a UDT instance, or NULL if not found. 293 294 CUDT* lookup(int32_t id); 295 296 /// Insert an entry to the hash table. 297 /// @param [in] id socket ID 298 /// @param [in] u pointer to the UDT instance 299 300 void insert(int32_t id, CUDT* u); 301 302 /// Remove an entry from the hash table. 303 /// @param [in] id socket ID 304 305 void remove(int32_t id); 306 307 private: 308 struct CBucket 309 { 310 int32_t m_iID; // Socket ID 311 CUDT* m_pUDT; // Socket instance 312 313 CBucket* m_pNext; // next bucket 314 } * *m_pBucket; // list of buckets (the hash table) 315 316 int m_iHashSize; // size of hash table 317 318 private: 319 CHash(const CHash&); 320 CHash& operator=(const CHash&); 321 }; 322 323 /// @brief A queue of sockets pending for connection. 324 /// It can be either a caller socket in a non-blocking mode 325 /// (the connection has to be handled in background), 326 /// or a socket in rendezvous connection mode. 327 class CRendezvousQueue 328 { 329 public: 330 CRendezvousQueue(); 331 ~CRendezvousQueue(); 332 333 public: 334 /// @brief Insert a new socket pending for connection (non-blocking caller or rendezvous). 335 /// @param id socket ID. 336 /// @param u pointer to a corresponding CUDT instance. 337 /// @param addr remote address to connect to. 338 /// @param ttl timepoint for connection attempt to expire. 339 void insert(const SRTSOCKET& id, CUDT* u, const sockaddr_any& addr, const srt::sync::steady_clock::time_point& ttl); 340 341 /// @brief Remove a socket from the connection pending list. 342 /// @param id socket ID. 343 void remove(const SRTSOCKET& id); 344 345 /// @brief Locate a socket in the connection pending queue. 346 /// @param addr source address of the packet received over UDP (peer address). 347 /// @param id socket ID. 348 /// @return a pointer to CUDT instance retrieved, or NULL if nothing was found. 349 CUDT* retrieve(const sockaddr_any& addr, SRTSOCKET& id) const; 350 351 /// @brief Update status of connections in the pending queue. 352 /// Stop connecting if TTL expires. Resend handshake request every 250 ms if no response from the peer. 353 /// @param rst result of reading from a UDP socket: received packet / nothin read / read error. 354 /// @param cst target status for pending connection: reject or proceed. 355 /// @param pktIn packet received from the UDP socket. 356 void updateConnStatus(EReadStatus rst, EConnectStatus cst, CUnit* unit); 357 358 private: 359 struct LinkStatusInfo 360 { 361 CUDT* u; 362 SRTSOCKET id; 363 int errorcode; 364 sockaddr_any peeraddr; 365 int token; 366 367 struct HasID 368 { 369 SRTSOCKET id; HasIDLinkStatusInfo::HasID370 HasID(SRTSOCKET p) 371 : id(p) 372 { 373 } operatorLinkStatusInfo::HasID374 bool operator()(const LinkStatusInfo& i) { return i.id == id; } 375 }; 376 }; 377 378 /// @brief Qualify pending connections: 379 /// - Sockets with expired TTL go to the 'to_remove' list and removed from the queue straight away. 380 /// - If HS request is to be resent (resend 250 ms if no response from the peer) go to the 'to_process' list. 381 /// 382 /// @param rst result of reading from a UDP socket: received packet / nothin read / read error. 383 /// @param cst target status for pending connection: reject or proceed. 384 /// @param iDstSockID destination socket ID of the received packet. 385 /// @param[in,out] toRemove stores sockets with expired TTL. 386 /// @param[in,out] toProcess stores sockets which should repeat (resend) HS connection request. 387 bool qualifyToHandle(EReadStatus rst, 388 EConnectStatus cst, 389 int iDstSockID, 390 std::vector<LinkStatusInfo>& toRemove, 391 std::vector<LinkStatusInfo>& toProcess); 392 393 private: 394 struct CRL 395 { 396 SRTSOCKET m_iID; // SRT socket ID (self) 397 CUDT* m_pUDT; // CUDT instance 398 sockaddr_any m_PeerAddr; // SRT sonnection peer address 399 srt::sync::steady_clock::time_point m_tsTTL; // the time that this request expires 400 }; 401 std::list<CRL> m_lRendezvousID; // The sockets currently in rendezvous mode 402 403 mutable sync::Mutex m_RIDListLock; 404 }; 405 406 class CSndQueue 407 { 408 friend class CUDT; 409 friend class CUDTUnited; 410 411 public: 412 CSndQueue(); 413 ~CSndQueue(); 414 415 public: 416 // XXX There's currently no way to access the socket ID set for 417 // whatever the queue is currently working for. Required to find 418 // some way to do this, possibly by having a "reverse pointer". 419 // Currently just "unimplemented". CONID()420 std::string CONID() const { return ""; } 421 422 /// Initialize the sending queue. 423 /// @param [in] c UDP channel to be associated to the queue 424 /// @param [in] t Timer 425 426 void init(CChannel* c, srt::sync::CTimer* t); 427 428 /// Send out a packet to a given address. 429 /// @param [in] addr destination address 430 /// @param [in] packet packet to be sent out 431 /// @return Size of data sent out. 432 433 int sendto(const sockaddr_any& addr, CPacket& packet); 434 435 /// Get the IP TTL. 436 /// @param [in] ttl IP Time To Live. 437 /// @return TTL. 438 439 int getIpTTL() const; 440 441 /// Get the IP Type of Service. 442 /// @return ToS. 443 444 int getIpToS() const; 445 446 #ifdef SRT_ENABLE_BINDTODEVICE 447 bool getBind(char* dst, size_t len) const; 448 #endif 449 450 int ioctlQuery(int type) const; 451 int sockoptQuery(int level, int type) const; 452 setClosing()453 void setClosing() { m_bClosing = true; } 454 455 private: 456 static void* worker(void* param); 457 srt::sync::CThread m_WorkerThread; 458 459 private: 460 CSndUList* m_pSndUList; // List of UDT instances for data sending 461 CChannel* m_pChannel; // The UDP channel for data sending 462 srt::sync::CTimer* m_pTimer; // Timing facility 463 464 srt::sync::atomic<bool> m_bClosing; // closing the worker 465 466 #if defined(SRT_DEBUG_SNDQ_HIGHRATE) //>>debug high freq worker 467 uint64_t m_ullDbgPeriod; 468 uint64_t m_ullDbgTime; 469 struct 470 { 471 unsigned long lIteration; // 472 unsigned long lSleepTo; // SleepTo 473 unsigned long lNotReadyPop; // Continue 474 unsigned long lSendTo; 475 unsigned long lNotReadyTs; 476 unsigned long lCondWait; // block on m_WindowCond 477 } m_WorkerStats; 478 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */ 479 480 #if ENABLE_LOGGING 481 static int m_counter; 482 #endif 483 484 private: 485 CSndQueue(const CSndQueue&); 486 CSndQueue& operator=(const CSndQueue&); 487 }; 488 489 class CRcvQueue 490 { 491 friend class CUDT; 492 friend class CUDTUnited; 493 494 public: 495 CRcvQueue(); 496 ~CRcvQueue(); 497 498 public: 499 // XXX There's currently no way to access the socket ID set for 500 // whatever the queue is currently working. Required to find 501 // some way to do this, possibly by having a "reverse pointer". 502 // Currently just "unimplemented". CONID()503 std::string CONID() const { return ""; } 504 505 /// Initialize the receiving queue. 506 /// @param [in] size queue size 507 /// @param [in] mss maximum packet size 508 /// @param [in] version IP version 509 /// @param [in] hsize hash table size 510 /// @param [in] c UDP channel to be associated to the queue 511 /// @param [in] t timer 512 513 void init(int size, size_t payload, int version, int hsize, CChannel* c, sync::CTimer* t); 514 515 /// Read a packet for a specific UDT socket id. 516 /// @param [in] id Socket ID 517 /// @param [out] packet received packet 518 /// @return Data size of the packet 519 520 int recvfrom(int32_t id, CPacket& to_packet); 521 522 void stopWorker(); 523 setClosing()524 void setClosing() { m_bClosing = true; } 525 526 private: 527 static void* worker(void* param); 528 sync::CThread m_WorkerThread; 529 // Subroutines of worker 530 EReadStatus worker_RetrieveUnit(int32_t& id, CUnit*& unit, sockaddr_any& sa); 531 EConnectStatus worker_ProcessConnectionRequest(CUnit* unit, const sockaddr_any& sa); 532 EConnectStatus worker_TryAsyncRend_OrStore(int32_t id, CUnit* unit, const sockaddr_any& sa); 533 EConnectStatus worker_ProcessAddressedPacket(int32_t id, CUnit* unit, const sockaddr_any& sa); 534 535 private: 536 CUnitQueue m_UnitQueue; // The received packet queue 537 CRcvUList* m_pRcvUList; // List of UDT instances that will read packets from the queue 538 CHash* m_pHash; // Hash table for UDT socket looking up 539 CChannel* m_pChannel; // UDP channel for receving packets 540 sync::CTimer* m_pTimer; // shared timer with the snd queue 541 542 size_t m_szPayloadSize; // packet payload size 543 544 srt::sync::atomic<bool> m_bClosing; // closing the worker 545 #if ENABLE_LOGGING 546 static int m_counter; 547 #endif 548 549 private: 550 int setListener(CUDT* u); 551 void removeListener(const CUDT* u); 552 553 void registerConnector(const SRTSOCKET& id, 554 CUDT* u, 555 const sockaddr_any& addr, 556 const sync::steady_clock::time_point& ttl); 557 void removeConnector(const SRTSOCKET& id); 558 559 void setNewEntry(CUDT* u); 560 bool ifNewEntry(); 561 CUDT* getNewEntry(); 562 563 void storePkt(int32_t id, CPacket* pkt); 564 565 private: 566 sync::Mutex m_LSLock; 567 CUDT* m_pListener; // pointer to the (unique, if any) listening UDT entity 568 CRendezvousQueue* m_pRendezvousQueue; // The list of sockets in rendezvous mode 569 570 std::vector<CUDT*> m_vNewEntry; // newly added entries, to be inserted 571 sync::Mutex m_IDLock; 572 573 std::map<int32_t, std::queue<CPacket*> > m_mBuffer; // temporary buffer for rendezvous connection request 574 sync::Mutex m_BufferLock; 575 sync::Condition m_BufferCond; 576 577 private: 578 CRcvQueue(const CRcvQueue&); 579 CRcvQueue& operator=(const CRcvQueue&); 580 }; 581 582 struct CMultiplexer 583 { 584 CSndQueue* m_pSndQueue; // The sending queue 585 CRcvQueue* m_pRcvQueue; // The receiving queue 586 CChannel* m_pChannel; // The UDP channel for sending and receiving 587 sync::CTimer* m_pTimer; // The timer 588 589 int m_iPort; // The UDP port number of this multiplexer 590 int m_iIPversion; // Address family (AF_INET or AF_INET6) 591 int m_iRefCount; // number of UDT instances that are associated with this multiplexer 592 593 CSrtMuxerConfig m_mcfg; 594 595 int m_iID; // multiplexer ID 596 597 // Constructor should reset all pointers to NULL 598 // to prevent dangling pointer when checking for memory alloc fails CMultiplexerCMultiplexer599 CMultiplexer() 600 : m_pSndQueue(NULL) 601 , m_pRcvQueue(NULL) 602 , m_pChannel(NULL) 603 , m_pTimer(NULL) 604 { 605 } 606 607 void destroy(); 608 }; 609 610 } // namespace srt 611 612 #endif 613