1 /*
2  * SRT - Secure, Reliable, Transport
3  * Copyright (c) 2018 Haivision Systems Inc.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  *
9  */
10 
11 /*****************************************************************************
12 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
13 All rights reserved.
14 
15 Redistribution and use in source and binary forms, with or without
16 modification, are permitted provided that the following conditions are
17 met:
18 
19 * Redistributions of source code must retain the above
20   copyright notice, this list of conditions and the
21   following disclaimer.
22 
23 * Redistributions in binary form must reproduce the
24   above copyright notice, this list of conditions
25   and the following disclaimer in the documentation
26   and/or other materials provided with the distribution.
27 
28 * Neither the name of the University of Illinois
29   nor the names of its contributors may be used to
30   endorse or promote products derived from this
31   software without specific prior written permission.
32 
33 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
34 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
35 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
36 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
37 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
38 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
39 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
40 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
41 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
42 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
43 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
44 *****************************************************************************/
45 
46 /*****************************************************************************
47 written by
48    Yunhong Gu, last updated 01/12/2011
49 modified by
50    Haivision Systems Inc.
51 *****************************************************************************/
52 
53 #ifndef INC_SRT_QUEUE_H
54 #define INC_SRT_QUEUE_H
55 
56 #include "common.h"
57 #include "packet.h"
58 #include "socketconfig.h"
59 #include "netinet_any.h"
60 #include "utilities.h"
61 #include <list>
62 #include <map>
63 #include <queue>
64 #include <vector>
65 
66 namespace srt
67 {
68 class CChannel;
69 class CUDT;
70 
71 struct CUnit
72 {
73     CPacket m_Packet; // packet
74     enum Flag
75     {
76         FREE    = 0,
77         GOOD    = 1,
78         PASSACK = 2,
79         DROPPED = 3
80     };
81     Flag m_iFlag; // 0: free, 1: occupied, 2: msg read but not freed (out-of-order), 3: msg dropped
82 };
83 
84 class CUnitQueue
85 {
86 
87 public:
88     CUnitQueue();
89     ~CUnitQueue();
90 
91 public: // Storage size operations
92         /// Initialize the unit queue.
93         /// @param [in] size queue size
94         /// @param [in] mss maximum segment size
95         /// @param [in] version IP version
96         /// @return 0: success, -1: failure.
97     int init(int size, int mss, int version);
98 
99     /// Increase (double) the unit queue size.
100     /// @return 0: success, -1: failure.
101 
102     int increase();
103 
104     /// Decrease (halve) the unit queue size.
105     /// @return 0: success, -1: failure.
106 
107     int shrink();
108 
109 public:
size()110     int size() const { return m_iSize - m_iCount; }
capacity()111     int capacity() const { return m_iSize; }
112 
113 public: // Operations on units
114         /// find an available unit for incoming packet.
115         /// @return Pointer to the available unit, NULL if not found.
116     CUnit* getNextAvailUnit();
117 
118     void makeUnitFree(CUnit* unit);
119 
120     void makeUnitGood(CUnit* unit);
121 
122 public:
getIPversion()123     inline int getIPversion() const { return m_iIPversion; }
124 
125 private:
126     struct CQEntry
127     {
128         CUnit* m_pUnit;   // unit queue
129         char*  m_pBuffer; // data buffer
130         int    m_iSize;   // size of each queue
131 
132         CQEntry* m_pNext;
133     } * m_pQEntry,     // pointer to the first unit queue
134         *m_pCurrQueue, // pointer to the current available queue
135         *m_pLastQueue; // pointer to the last unit queue
136 
137     CUnit* m_pAvailUnit; // recent available unit
138 
139     int m_iSize;  // total size of the unit queue, in number of packets
140     srt::sync::atomic<int> m_iCount;        // total number of valid (occupied) packets in the queue
141 
142     int m_iMSS;       // unit buffer size
143     int m_iIPversion; // IP version
144 
145 private:
146     CUnitQueue(const CUnitQueue&);
147     CUnitQueue& operator=(const CUnitQueue&);
148 };
149 
150 struct CSNode
151 {
152     CUDT*                          m_pUDT; // Pointer to the instance of CUDT socket
153     sync::steady_clock::time_point m_tsTimeStamp;
154 
155     srt::sync::atomic<int> m_iHeapLoc; // location on the heap, -1 means not on the heap
156 };
157 
158 class CSndUList
159 {
160 public:
161     CSndUList(sync::CTimer* pTimer);
162     ~CSndUList();
163 
164 public:
165     enum EReschedule
166     {
167         DONT_RESCHEDULE = 0,
168         DO_RESCHEDULE   = 1
169     };
170 
rescheduleIf(bool cond)171     static EReschedule rescheduleIf(bool cond) { return cond ? DO_RESCHEDULE : DONT_RESCHEDULE; }
172 
173     /// Update the timestamp of the UDT instance on the list.
174     /// @param [in] u pointer to the UDT instance
175     /// @param [in] reschedule if the timestamp should be rescheduled
176     /// @param [in] ts the next time to trigger sending logic on the CUDT
177     void update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts = sync::steady_clock::now());
178 
179     /// Retrieve the next (in time) socket from the heap to process its sending request.
180     /// @return a pointer to CUDT instance to process next.
181     CUDT* pop();
182 
183     /// Remove UDT instance from the list.
184     /// @param [in] u pointer to the UDT instance
185     void remove(const CUDT* u);// EXCLUDES(m_ListLock);
186 
187     /// Retrieve the next scheduled processing time.
188     /// @return Scheduled processing time of the first UDT socket in the list.
189     sync::steady_clock::time_point getNextProcTime();
190 
191     /// Wait for the list to become non empty.
192     void waitNonEmpty() const;
193 
194     /// Signal to stop waiting in waitNonEmpty().
195     void signalInterrupt() const;
196 
197 private:
198     /// Doubles the size of the list.
199     ///
200     void realloc_();// REQUIRES(m_ListLock);
201 
202     /// Insert a new UDT instance into the list with realloc if required.
203     ///
204     /// @param [in] ts time stamp: next processing time
205     /// @param [in] u pointer to the UDT instance
206     void insert_(const sync::steady_clock::time_point& ts, const CUDT* u);
207 
208     /// Insert a new UDT instance into the list without realloc.
209     /// Should be called if there is a gauranteed space for the element.
210     ///
211     /// @param [in] ts time stamp: next processing time
212     /// @param [in] u pointer to the UDT instance
213     void insert_norealloc_(const sync::steady_clock::time_point& ts, const CUDT* u);// REQUIRES(m_ListLock);
214 
215     /// Removes CUDT entry from the list.
216     /// If the last entry is removed, calls sync::CTimer::interrupt().
217     void remove_(const CUDT* u);
218 
219 private:
220     CSNode** m_pHeap;        // The heap array
221     int      m_iArrayLength; // physical length of the array
222     int      m_iLastEntry;   // position of last entry on the heap array or -1 if empty.
223 
224     mutable sync::Mutex     m_ListLock; // Protects the list (m_pHeap, m_iArrayLength, m_iLastEntry).
225     mutable sync::Condition m_ListCond;
226 
227     sync::CTimer* const m_pTimer;
228 
229 private:
230     CSndUList(const CSndUList&);
231     CSndUList& operator=(const CSndUList&);
232 };
233 
234 struct CRNode
235 {
236     CUDT*                          m_pUDT;        // Pointer to the instance of CUDT socket
237     sync::steady_clock::time_point m_tsTimeStamp; // Time Stamp
238 
239     CRNode* m_pPrev; // previous link
240     CRNode* m_pNext; // next link
241 
242     srt::sync::atomic<bool> m_bOnList;              // if the node is already on the list
243 };
244 
245 class CRcvUList
246 {
247 public:
248     CRcvUList();
249     ~CRcvUList();
250 
251 public:
252     /// Insert a new UDT instance to the list.
253     /// @param [in] u pointer to the UDT instance
254 
255     void insert(const CUDT* u);
256 
257     /// Remove the UDT instance from the list.
258     /// @param [in] u pointer to the UDT instance
259 
260     void remove(const CUDT* u);
261 
262     /// Move the UDT instance to the end of the list, if it already exists; otherwise, do nothing.
263     /// @param [in] u pointer to the UDT instance
264 
265     void update(const CUDT* u);
266 
267 public:
268     CRNode* m_pUList; // the head node
269 
270 private:
271     CRNode* m_pLast; // the last node
272 
273 private:
274     CRcvUList(const CRcvUList&);
275     CRcvUList& operator=(const CRcvUList&);
276 };
277 
278 class CHash
279 {
280 public:
281     CHash();
282     ~CHash();
283 
284 public:
285     /// Initialize the hash table.
286     /// @param [in] size hash table size
287 
288     void init(int size);
289 
290     /// Look for a UDT instance from the hash table.
291     /// @param [in] id socket ID
292     /// @return Pointer to a UDT instance, or NULL if not found.
293 
294     CUDT* lookup(int32_t id);
295 
296     /// Insert an entry to the hash table.
297     /// @param [in] id socket ID
298     /// @param [in] u pointer to the UDT instance
299 
300     void insert(int32_t id, CUDT* u);
301 
302     /// Remove an entry from the hash table.
303     /// @param [in] id socket ID
304 
305     void remove(int32_t id);
306 
307 private:
308     struct CBucket
309     {
310         int32_t m_iID;  // Socket ID
311         CUDT*   m_pUDT; // Socket instance
312 
313         CBucket* m_pNext; // next bucket
314     } * *m_pBucket;       // list of buckets (the hash table)
315 
316     int m_iHashSize; // size of hash table
317 
318 private:
319     CHash(const CHash&);
320     CHash& operator=(const CHash&);
321 };
322 
323 /// @brief A queue of sockets pending for connection.
324 /// It can be either a caller socket in a non-blocking mode
325 /// (the connection has to be handled in background),
326 /// or a socket in rendezvous connection mode.
327 class CRendezvousQueue
328 {
329 public:
330     CRendezvousQueue();
331     ~CRendezvousQueue();
332 
333 public:
334     /// @brief Insert a new socket pending for connection (non-blocking caller or rendezvous).
335     /// @param id socket ID.
336     /// @param u pointer to a corresponding CUDT instance.
337     /// @param addr remote address to connect to.
338     /// @param ttl timepoint for connection attempt to expire.
339     void insert(const SRTSOCKET& id, CUDT* u, const sockaddr_any& addr, const srt::sync::steady_clock::time_point& ttl);
340 
341     /// @brief Remove a socket from the connection pending list.
342     /// @param id socket ID.
343     void remove(const SRTSOCKET& id);
344 
345     /// @brief Locate a socket in the connection pending queue.
346     /// @param addr source address of the packet received over UDP (peer address).
347     /// @param id socket ID.
348     /// @return a pointer to CUDT instance retrieved, or NULL if nothing was found.
349     CUDT* retrieve(const sockaddr_any& addr, SRTSOCKET& id) const;
350 
351     /// @brief Update status of connections in the pending queue.
352     /// Stop connecting if TTL expires. Resend handshake request every 250 ms if no response from the peer.
353     /// @param rst result of reading from a UDP socket: received packet / nothin read / read error.
354     /// @param cst target status for pending connection: reject or proceed.
355     /// @param pktIn packet received from the UDP socket.
356     void updateConnStatus(EReadStatus rst, EConnectStatus cst, CUnit* unit);
357 
358 private:
359     struct LinkStatusInfo
360     {
361         CUDT*        u;
362         SRTSOCKET    id;
363         int          errorcode;
364         sockaddr_any peeraddr;
365         int          token;
366 
367         struct HasID
368         {
369             SRTSOCKET id;
HasIDLinkStatusInfo::HasID370             HasID(SRTSOCKET p)
371                 : id(p)
372             {
373             }
operatorLinkStatusInfo::HasID374             bool operator()(const LinkStatusInfo& i) { return i.id == id; }
375         };
376     };
377 
378     /// @brief Qualify pending connections:
379     /// - Sockets with expired TTL go to the 'to_remove' list and removed from the queue straight away.
380     /// - If HS request is to be resent (resend 250 ms if no response from the peer) go to the 'to_process' list.
381     ///
382     /// @param rst result of reading from a UDP socket: received packet / nothin read / read error.
383     /// @param cst target status for pending connection: reject or proceed.
384     /// @param iDstSockID destination socket ID of the received packet.
385     /// @param[in,out] toRemove stores sockets with expired TTL.
386     /// @param[in,out] toProcess stores sockets which should repeat (resend) HS connection request.
387     bool qualifyToHandle(EReadStatus                  rst,
388                          EConnectStatus               cst,
389                          int                          iDstSockID,
390                          std::vector<LinkStatusInfo>& toRemove,
391                          std::vector<LinkStatusInfo>& toProcess);
392 
393 private:
394     struct CRL
395     {
396         SRTSOCKET                           m_iID;      // SRT socket ID (self)
397         CUDT*                               m_pUDT;     // CUDT instance
398         sockaddr_any                        m_PeerAddr; // SRT sonnection peer address
399         srt::sync::steady_clock::time_point m_tsTTL;    // the time that this request expires
400     };
401     std::list<CRL> m_lRendezvousID; // The sockets currently in rendezvous mode
402 
403     mutable sync::Mutex m_RIDListLock;
404 };
405 
406 class CSndQueue
407 {
408     friend class CUDT;
409     friend class CUDTUnited;
410 
411 public:
412     CSndQueue();
413     ~CSndQueue();
414 
415 public:
416     // XXX There's currently no way to access the socket ID set for
417     // whatever the queue is currently working for. Required to find
418     // some way to do this, possibly by having a "reverse pointer".
419     // Currently just "unimplemented".
CONID()420     std::string CONID() const { return ""; }
421 
422     /// Initialize the sending queue.
423     /// @param [in] c UDP channel to be associated to the queue
424     /// @param [in] t Timer
425 
426     void init(CChannel* c, srt::sync::CTimer* t);
427 
428     /// Send out a packet to a given address.
429     /// @param [in] addr destination address
430     /// @param [in] packet packet to be sent out
431     /// @return Size of data sent out.
432 
433     int sendto(const sockaddr_any& addr, CPacket& packet);
434 
435     /// Get the IP TTL.
436     /// @param [in] ttl IP Time To Live.
437     /// @return TTL.
438 
439     int getIpTTL() const;
440 
441     /// Get the IP Type of Service.
442     /// @return ToS.
443 
444     int getIpToS() const;
445 
446 #ifdef SRT_ENABLE_BINDTODEVICE
447     bool getBind(char* dst, size_t len) const;
448 #endif
449 
450     int ioctlQuery(int type) const;
451     int sockoptQuery(int level, int type) const;
452 
setClosing()453     void setClosing() { m_bClosing = true; }
454 
455 private:
456     static void*       worker(void* param);
457     srt::sync::CThread m_WorkerThread;
458 
459 private:
460     CSndUList*         m_pSndUList; // List of UDT instances for data sending
461     CChannel*          m_pChannel;  // The UDP channel for data sending
462     srt::sync::CTimer* m_pTimer;    // Timing facility
463 
464     srt::sync::atomic<bool> m_bClosing;            // closing the worker
465 
466 #if defined(SRT_DEBUG_SNDQ_HIGHRATE) //>>debug high freq worker
467     uint64_t m_ullDbgPeriod;
468     uint64_t m_ullDbgTime;
469     struct
470     {
471         unsigned long lIteration;   //
472         unsigned long lSleepTo;     // SleepTo
473         unsigned long lNotReadyPop; // Continue
474         unsigned long lSendTo;
475         unsigned long lNotReadyTs;
476         unsigned long lCondWait; // block on m_WindowCond
477     } m_WorkerStats;
478 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
479 
480 #if ENABLE_LOGGING
481     static int m_counter;
482 #endif
483 
484 private:
485     CSndQueue(const CSndQueue&);
486     CSndQueue& operator=(const CSndQueue&);
487 };
488 
489 class CRcvQueue
490 {
491     friend class CUDT;
492     friend class CUDTUnited;
493 
494 public:
495     CRcvQueue();
496     ~CRcvQueue();
497 
498 public:
499     // XXX There's currently no way to access the socket ID set for
500     // whatever the queue is currently working. Required to find
501     // some way to do this, possibly by having a "reverse pointer".
502     // Currently just "unimplemented".
CONID()503     std::string CONID() const { return ""; }
504 
505     /// Initialize the receiving queue.
506     /// @param [in] size queue size
507     /// @param [in] mss maximum packet size
508     /// @param [in] version IP version
509     /// @param [in] hsize hash table size
510     /// @param [in] c UDP channel to be associated to the queue
511     /// @param [in] t timer
512 
513     void init(int size, size_t payload, int version, int hsize, CChannel* c, sync::CTimer* t);
514 
515     /// Read a packet for a specific UDT socket id.
516     /// @param [in] id Socket ID
517     /// @param [out] packet received packet
518     /// @return Data size of the packet
519 
520     int recvfrom(int32_t id, CPacket& to_packet);
521 
522     void stopWorker();
523 
setClosing()524     void setClosing() { m_bClosing = true; }
525 
526 private:
527     static void*  worker(void* param);
528     sync::CThread m_WorkerThread;
529     // Subroutines of worker
530     EReadStatus    worker_RetrieveUnit(int32_t& id, CUnit*& unit, sockaddr_any& sa);
531     EConnectStatus worker_ProcessConnectionRequest(CUnit* unit, const sockaddr_any& sa);
532     EConnectStatus worker_TryAsyncRend_OrStore(int32_t id, CUnit* unit, const sockaddr_any& sa);
533     EConnectStatus worker_ProcessAddressedPacket(int32_t id, CUnit* unit, const sockaddr_any& sa);
534 
535 private:
536     CUnitQueue    m_UnitQueue; // The received packet queue
537     CRcvUList*    m_pRcvUList; // List of UDT instances that will read packets from the queue
538     CHash*        m_pHash;     // Hash table for UDT socket looking up
539     CChannel*     m_pChannel;  // UDP channel for receving packets
540     sync::CTimer* m_pTimer;    // shared timer with the snd queue
541 
542     size_t m_szPayloadSize; // packet payload size
543 
544     srt::sync::atomic<bool> m_bClosing;            // closing the worker
545 #if ENABLE_LOGGING
546     static int m_counter;
547 #endif
548 
549 private:
550     int  setListener(CUDT* u);
551     void removeListener(const CUDT* u);
552 
553     void registerConnector(const SRTSOCKET&                      id,
554                            CUDT*                                 u,
555                            const sockaddr_any&                   addr,
556                            const sync::steady_clock::time_point& ttl);
557     void removeConnector(const SRTSOCKET& id);
558 
559     void  setNewEntry(CUDT* u);
560     bool  ifNewEntry();
561     CUDT* getNewEntry();
562 
563     void storePkt(int32_t id, CPacket* pkt);
564 
565 private:
566     sync::Mutex       m_LSLock;
567     CUDT*             m_pListener;        // pointer to the (unique, if any) listening UDT entity
568     CRendezvousQueue* m_pRendezvousQueue; // The list of sockets in rendezvous mode
569 
570     std::vector<CUDT*> m_vNewEntry; // newly added entries, to be inserted
571     sync::Mutex        m_IDLock;
572 
573     std::map<int32_t, std::queue<CPacket*> > m_mBuffer; // temporary buffer for rendezvous connection request
574     sync::Mutex                              m_BufferLock;
575     sync::Condition                          m_BufferCond;
576 
577 private:
578     CRcvQueue(const CRcvQueue&);
579     CRcvQueue& operator=(const CRcvQueue&);
580 };
581 
582 struct CMultiplexer
583 {
584     CSndQueue*    m_pSndQueue; // The sending queue
585     CRcvQueue*    m_pRcvQueue; // The receiving queue
586     CChannel*     m_pChannel;  // The UDP channel for sending and receiving
587     sync::CTimer* m_pTimer;    // The timer
588 
589     int m_iPort;      // The UDP port number of this multiplexer
590     int m_iIPversion; // Address family (AF_INET or AF_INET6)
591     int m_iRefCount;  // number of UDT instances that are associated with this multiplexer
592 
593     CSrtMuxerConfig m_mcfg;
594 
595     int m_iID; // multiplexer ID
596 
597     // Constructor should reset all pointers to NULL
598     // to prevent dangling pointer when checking for memory alloc fails
CMultiplexerCMultiplexer599     CMultiplexer()
600         : m_pSndQueue(NULL)
601         , m_pRcvQueue(NULL)
602         , m_pChannel(NULL)
603         , m_pTimer(NULL)
604     {
605     }
606 
607     void destroy();
608 };
609 
610 } // namespace srt
611 
612 #endif
613