1 /*
2  * SRT - Secure, Reliable, Transport
3  * Copyright (c) 2018 Haivision Systems Inc.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  *
9  */
10 
11 /*****************************************************************************
12 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
13 All rights reserved.
14 
15 Redistribution and use in source and binary forms, with or without
16 modification, are permitted provided that the following conditions are
17 met:
18 
19 * Redistributions of source code must retain the above
20   copyright notice, this list of conditions and the
21   following disclaimer.
22 
23 * Redistributions in binary form must reproduce the
24   above copyright notice, this list of conditions
25   and the following disclaimer in the documentation
26   and/or other materials provided with the distribution.
27 
28 * Neither the name of the University of Illinois
29   nor the names of its contributors may be used to
30   endorse or promote products derived from this
31   software without specific prior written permission.
32 
33 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
34 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
35 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
36 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
37 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
38 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
39 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
40 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
41 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
42 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
43 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
44 *****************************************************************************/
45 
46 /*****************************************************************************
47 written by
48    Yunhong Gu, last updated 05/05/2011
49 modified by
50    Haivision Systems Inc.
51 *****************************************************************************/
52 
53 #include "platform_sys.h"
54 
55 #include <cstring>
56 
57 #include "common.h"
58 #include "api.h"
59 #include "netinet_any.h"
60 #include "threadname.h"
61 #include "logging.h"
62 #include "queue.h"
63 
64 using namespace std;
65 using namespace srt::sync;
66 using namespace srt_logging;
67 
CUnitQueue()68 srt::CUnitQueue::CUnitQueue()
69     : m_pQEntry(NULL)
70     , m_pCurrQueue(NULL)
71     , m_pLastQueue(NULL)
72     , m_iSize(0)
73     , m_iCount(0)
74     , m_iMSS()
75     , m_iIPversion()
76 {
77 }
78 
~CUnitQueue()79 srt::CUnitQueue::~CUnitQueue()
80 {
81     CQEntry* p = m_pQEntry;
82 
83     while (p != NULL)
84     {
85         delete[] p->m_pUnit;
86         delete[] p->m_pBuffer;
87 
88         CQEntry* q = p;
89         if (p == m_pLastQueue)
90             p = NULL;
91         else
92             p = p->m_pNext;
93         delete q;
94     }
95 }
96 
init(int size,int mss,int version)97 int srt::CUnitQueue::init(int size, int mss, int version)
98 {
99     CQEntry* tempq = NULL;
100     CUnit*   tempu = NULL;
101     char*    tempb = NULL;
102 
103     try
104     {
105         tempq = new CQEntry;
106         tempu = new CUnit[size];
107         tempb = new char[size * mss];
108     }
109     catch (...)
110     {
111         delete tempq;
112         delete[] tempu;
113         delete[] tempb;
114 
115         return -1;
116     }
117 
118     for (int i = 0; i < size; ++i)
119     {
120         tempu[i].m_iFlag           = CUnit::FREE;
121         tempu[i].m_Packet.m_pcData = tempb + i * mss;
122     }
123     tempq->m_pUnit   = tempu;
124     tempq->m_pBuffer = tempb;
125     tempq->m_iSize   = size;
126 
127     m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq;
128     m_pQEntry->m_pNext                      = m_pQEntry;
129 
130     m_pAvailUnit = m_pCurrQueue->m_pUnit;
131 
132     m_iSize      = size;
133     m_iMSS       = mss;
134     m_iIPversion = version;
135 
136     return 0;
137 }
138 
139 // XXX Lots of common code with CUnitQueue:init.
140 // Consider merging.
increase()141 int srt::CUnitQueue::increase()
142 {
143     // adjust/correct m_iCount
144     int      real_count = 0;
145     CQEntry* p          = m_pQEntry;
146     while (p != NULL)
147     {
148         CUnit* u = p->m_pUnit;
149         for (CUnit* end = u + p->m_iSize; u != end; ++u)
150             if (u->m_iFlag != CUnit::FREE)
151                 ++real_count;
152 
153         if (p == m_pLastQueue)
154             p = NULL;
155         else
156             p = p->m_pNext;
157     }
158     m_iCount = real_count;
159     if (double(m_iCount) / m_iSize < 0.9)
160         return -1;
161 
162     CQEntry* tempq = NULL;
163     CUnit*   tempu = NULL;
164     char*    tempb = NULL;
165 
166     // all queues have the same size
167     const int size = m_pQEntry->m_iSize;
168 
169     try
170     {
171         tempq = new CQEntry;
172         tempu = new CUnit[size];
173         tempb = new char[size * m_iMSS];
174     }
175     catch (...)
176     {
177         delete tempq;
178         delete[] tempu;
179         delete[] tempb;
180 
181         LOGC(rslog.Error,
182              log << "CUnitQueue:increase: failed to allocate " << size << " new units."
183                  << " Current size=" << m_iSize);
184         return -1;
185     }
186 
187     for (int i = 0; i < size; ++i)
188     {
189         tempu[i].m_iFlag           = CUnit::FREE;
190         tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
191     }
192     tempq->m_pUnit   = tempu;
193     tempq->m_pBuffer = tempb;
194     tempq->m_iSize   = size;
195 
196     m_pLastQueue->m_pNext = tempq;
197     m_pLastQueue          = tempq;
198     m_pLastQueue->m_pNext = m_pQEntry;
199 
200     m_iSize += size;
201 
202     return 0;
203 }
204 
shrink()205 int srt::CUnitQueue::shrink()
206 {
207     // currently queue cannot be shrunk.
208     return -1;
209 }
210 
getNextAvailUnit()211 srt::CUnit* srt::CUnitQueue::getNextAvailUnit()
212 {
213     if (m_iCount * 10 > m_iSize * 9)
214         increase();
215 
216     if (m_iCount >= m_iSize)
217         return NULL;
218 
219     int units_checked = 0;
220     do
221     {
222         const CUnit* end = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize;
223         for (; m_pAvailUnit != end; ++m_pAvailUnit, ++units_checked)
224         {
225             if (m_pAvailUnit->m_iFlag == CUnit::FREE)
226             {
227                 return m_pAvailUnit;
228             }
229         }
230 
231         m_pCurrQueue = m_pCurrQueue->m_pNext;
232         m_pAvailUnit = m_pCurrQueue->m_pUnit;
233     } while (units_checked < m_iSize);
234 
235     increase();
236 
237     return NULL;
238 }
239 
makeUnitFree(CUnit * unit)240 void srt::CUnitQueue::makeUnitFree(CUnit* unit)
241 {
242     SRT_ASSERT(unit != NULL);
243     SRT_ASSERT(unit->m_iFlag != CUnit::FREE);
244     unit->m_iFlag = CUnit::FREE;
245 
246     --m_iCount;
247 }
248 
makeUnitGood(CUnit * unit)249 void srt::CUnitQueue::makeUnitGood(CUnit* unit)
250 {
251     ++m_iCount;
252 
253     SRT_ASSERT(unit != NULL);
254     SRT_ASSERT(unit->m_iFlag == CUnit::FREE);
255     unit->m_iFlag = CUnit::GOOD;
256 }
257 
CSndUList(sync::CTimer * pTimer)258 srt::CSndUList::CSndUList(sync::CTimer* pTimer)
259     : m_pHeap(NULL)
260     , m_iArrayLength(512)
261     , m_iLastEntry(-1)
262     , m_ListLock()
263     , m_pTimer(pTimer)
264 {
265     setupCond(m_ListCond, "CSndUListCond");
266     m_pHeap = new CSNode*[m_iArrayLength];
267 }
268 
~CSndUList()269 srt::CSndUList::~CSndUList()
270 {
271     releaseCond(m_ListCond);
272     delete[] m_pHeap;
273 }
274 
update(const CUDT * u,EReschedule reschedule,sync::steady_clock::time_point ts)275 void srt::CSndUList::update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts)
276 {
277     ScopedLock listguard(m_ListLock);
278 
279     CSNode* n = u->m_pSNode;
280 
281     if (n->m_iHeapLoc >= 0)
282     {
283         if (reschedule == DONT_RESCHEDULE)
284             return;
285 
286         if (n->m_tsTimeStamp <= ts)
287             return;
288 
289         if (n->m_iHeapLoc == 0)
290         {
291             n->m_tsTimeStamp = ts;
292             m_pTimer->interrupt();
293             return;
294         }
295 
296         remove_(u);
297         insert_norealloc_(ts, u);
298         return;
299     }
300 
301     insert_(ts, u);
302 }
303 
pop()304 srt::CUDT* srt::CSndUList::pop()
305 {
306     ScopedLock listguard(m_ListLock);
307 
308     if (-1 == m_iLastEntry)
309         return NULL;
310 
311     // no pop until the next scheduled time
312     if (m_pHeap[0]->m_tsTimeStamp > steady_clock::now())
313         return NULL;
314 
315     CUDT* u = m_pHeap[0]->m_pUDT;
316     remove_(u);
317     return u;
318 }
319 
remove(const CUDT * u)320 void srt::CSndUList::remove(const CUDT* u)
321 {
322     ScopedLock listguard(m_ListLock);
323     remove_(u);
324 }
325 
getNextProcTime()326 steady_clock::time_point srt::CSndUList::getNextProcTime()
327 {
328     ScopedLock listguard(m_ListLock);
329 
330     if (-1 == m_iLastEntry)
331         return steady_clock::time_point();
332 
333     return m_pHeap[0]->m_tsTimeStamp;
334 }
335 
waitNonEmpty() const336 void srt::CSndUList::waitNonEmpty() const
337 {
338     UniqueLock listguard(m_ListLock);
339     if (m_iLastEntry >= 0)
340         return;
341 
342     m_ListCond.wait(listguard);
343 }
344 
signalInterrupt() const345 void srt::CSndUList::signalInterrupt() const
346 {
347     ScopedLock listguard(m_ListLock);
348     m_ListCond.notify_all();
349 }
350 
realloc_()351 void srt::CSndUList::realloc_()
352 {
353     CSNode** temp = NULL;
354 
355     try
356     {
357         temp = new CSNode*[2 * m_iArrayLength];
358     }
359     catch (...)
360     {
361         throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
362     }
363 
364     memcpy((temp), m_pHeap, sizeof(CSNode*) * m_iArrayLength);
365     m_iArrayLength *= 2;
366     delete[] m_pHeap;
367     m_pHeap = temp;
368 }
369 
insert_(const steady_clock::time_point & ts,const CUDT * u)370 void srt::CSndUList::insert_(const steady_clock::time_point& ts, const CUDT* u)
371 {
372     // increase the heap array size if necessary
373     if (m_iLastEntry == m_iArrayLength - 1)
374         realloc_();
375 
376     insert_norealloc_(ts, u);
377 }
378 
insert_norealloc_(const steady_clock::time_point & ts,const CUDT * u)379 void srt::CSndUList::insert_norealloc_(const steady_clock::time_point& ts, const CUDT* u)
380 {
381     CSNode* n = u->m_pSNode;
382 
383     // do not insert repeated node
384     if (n->m_iHeapLoc >= 0)
385         return;
386 
387     SRT_ASSERT(m_iLastEntry < m_iArrayLength);
388 
389     m_iLastEntry++;
390     m_pHeap[m_iLastEntry] = n;
391     n->m_tsTimeStamp      = ts;
392 
393     int q = m_iLastEntry;
394     int p = q;
395     while (p != 0)
396     {
397         p = (q - 1) >> 1;
398         if (m_pHeap[p]->m_tsTimeStamp <= m_pHeap[q]->m_tsTimeStamp)
399             break;
400 
401         swap(m_pHeap[p], m_pHeap[q]);
402         m_pHeap[q]->m_iHeapLoc = q;
403         q                      = p;
404     }
405 
406     n->m_iHeapLoc = q;
407 
408     // an earlier event has been inserted, wake up sending worker
409     if (n->m_iHeapLoc == 0)
410         m_pTimer->interrupt();
411 
412     // first entry, activate the sending queue
413     if (0 == m_iLastEntry)
414     {
415         // m_ListLock is assumed to be locked.
416         m_ListCond.notify_all();
417     }
418 }
419 
remove_(const CUDT * u)420 void srt::CSndUList::remove_(const CUDT* u)
421 {
422     CSNode* n = u->m_pSNode;
423 
424     if (n->m_iHeapLoc >= 0)
425     {
426         // remove the node from heap
427         m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
428         m_iLastEntry--;
429         m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc.load();
430 
431         int q = n->m_iHeapLoc;
432         int p = q * 2 + 1;
433         while (p <= m_iLastEntry)
434         {
435             if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_tsTimeStamp > m_pHeap[p + 1]->m_tsTimeStamp))
436                 p++;
437 
438             if (m_pHeap[q]->m_tsTimeStamp > m_pHeap[p]->m_tsTimeStamp)
439             {
440                 swap(m_pHeap[p], m_pHeap[q]);
441                 m_pHeap[p]->m_iHeapLoc = p;
442                 m_pHeap[q]->m_iHeapLoc = q;
443 
444                 q = p;
445                 p = q * 2 + 1;
446             }
447             else
448                 break;
449         }
450 
451         n->m_iHeapLoc = -1;
452     }
453 
454     // the only event has been deleted, wake up immediately
455     if (0 == m_iLastEntry)
456         m_pTimer->interrupt();
457 }
458 
459 //
CSndQueue()460 srt::CSndQueue::CSndQueue()
461     : m_pSndUList(NULL)
462     , m_pChannel(NULL)
463     , m_pTimer(NULL)
464     , m_bClosing(false)
465 {
466 }
467 
~CSndQueue()468 srt::CSndQueue::~CSndQueue()
469 {
470     m_bClosing = true;
471 
472     if (m_pTimer != NULL)
473     {
474         m_pTimer->interrupt();
475     }
476 
477     // Unblock CSndQueue worker thread if it is waiting.
478     m_pSndUList->signalInterrupt();
479 
480     if (m_WorkerThread.joinable())
481     {
482         HLOGC(rslog.Debug, log << "SndQueue: EXIT");
483         m_WorkerThread.join();
484     }
485 
486     delete m_pSndUList;
487 }
488 
ioctlQuery(int type) const489 int srt::CSndQueue::ioctlQuery(int type) const
490 {
491     return m_pChannel->ioctlQuery(type);
492 }
sockoptQuery(int level,int type) const493 int srt::CSndQueue::sockoptQuery(int level, int type) const
494 {
495     return m_pChannel->sockoptQuery(level, type);
496 }
497 
498 #if ENABLE_LOGGING
499 int srt::CSndQueue::m_counter = 0;
500 #endif
501 
init(CChannel * c,CTimer * t)502 void srt::CSndQueue::init(CChannel* c, CTimer* t)
503 {
504     m_pChannel  = c;
505     m_pTimer    = t;
506     m_pSndUList = new CSndUList(t);
507 
508 #if ENABLE_LOGGING
509     ++m_counter;
510     const std::string thrname = "SRT:SndQ:w" + Sprint(m_counter);
511     const char*       thname  = thrname.c_str();
512 #else
513     const char* thname = "SRT:SndQ";
514 #endif
515     if (!StartThread(m_WorkerThread, CSndQueue::worker, this, thname))
516         throw CUDTException(MJ_SYSTEMRES, MN_THREAD);
517 }
518 
getIpTTL() const519 int srt::CSndQueue::getIpTTL() const
520 {
521     return m_pChannel ? m_pChannel->getIpTTL() : -1;
522 }
523 
getIpToS() const524 int srt::CSndQueue::getIpToS() const
525 {
526     return m_pChannel ? m_pChannel->getIpToS() : -1;
527 }
528 
529 #ifdef SRT_ENABLE_BINDTODEVICE
getBind(char * dst,size_t len) const530 bool srt::CSndQueue::getBind(char* dst, size_t len) const
531 {
532     return m_pChannel ? m_pChannel->getBind(dst, len) : false;
533 }
534 #endif
535 
worker(void * param)536 void* srt::CSndQueue::worker(void* param)
537 {
538     CSndQueue* self = (CSndQueue*)param;
539 
540 #if ENABLE_LOGGING
541     THREAD_STATE_INIT(("SRT:SndQ:w" + Sprint(m_counter)).c_str());
542 #else
543     THREAD_STATE_INIT("SRT:SndQ:worker");
544 #endif
545 
546 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
547     CTimer::rdtsc(self->m_ullDbgTime);
548     self->m_ullDbgPeriod = uint64_t(5000000) * CTimer::getCPUFrequency();
549     self->m_ullDbgTime += self->m_ullDbgPeriod;
550 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
551 
552     while (!self->m_bClosing)
553     {
554         const steady_clock::time_point next_time = self->m_pSndUList->getNextProcTime();
555 
556 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
557         self->m_WorkerStats.lIteration++;
558 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
559 
560         if (is_zero(next_time))
561         {
562 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
563             self->m_WorkerStats.lNotReadyTs++;
564 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
565 
566             // wait here if there is no sockets with data to be sent
567             THREAD_PAUSED();
568             if (!self->m_bClosing)
569             {
570                 self->m_pSndUList->waitNonEmpty();
571 
572 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
573                 self->m_WorkerStats.lCondWait++;
574 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
575             }
576             THREAD_RESUMED();
577 
578             continue;
579         }
580 
581         // wait until next processing time of the first socket on the list
582         const steady_clock::time_point currtime = steady_clock::now();
583 
584 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
585         if (self->m_ullDbgTime <= currtime)
586         {
587             fprintf(stdout,
588                     "SndQueue %lu slt:%lu nrp:%lu snt:%lu nrt:%lu ctw:%lu\n",
589                     self->m_WorkerStats.lIteration,
590                     self->m_WorkerStats.lSleepTo,
591                     self->m_WorkerStats.lNotReadyPop,
592                     self->m_WorkerStats.lSendTo,
593                     self->m_WorkerStats.lNotReadyTs,
594                     self->m_WorkerStats.lCondWait);
595             memset(&self->m_WorkerStats, 0, sizeof(self->m_WorkerStats));
596             self->m_ullDbgTime = currtime + self->m_ullDbgPeriod;
597         }
598 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
599 
600         THREAD_PAUSED();
601         if (currtime < next_time)
602         {
603             self->m_pTimer->sleep_until(next_time);
604 
605 #if defined(HAI_DEBUG_SNDQ_HIGHRATE)
606             self->m_WorkerStats.lSleepTo++;
607 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
608         }
609         THREAD_RESUMED();
610 
611         // Get a socket with a send request if any.
612         CUDT* u = self->m_pSndUList->pop();
613         if (u == NULL)
614         {
615 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
616             self->m_WorkerStats.lNotReadyPop++;
617 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
618             continue;
619         }
620 
621 #define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " "
622         HLOGC(qslog.Debug,
623             log << "CSndQueue: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening)
624                 << UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth)
625                 << UST(Opened));
626 #undef UST
627 
628         if (!u->m_bConnected || u->m_bBroken)
629         {
630 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
631             self->m_WorkerStats.lNotReadyPop++;
632 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
633             continue;
634         }
635 
636         // pack a packet from the socket
637         CPacket pkt;
638         const std::pair<int, steady_clock::time_point> res_time = u->packData((pkt));
639 
640         // Check if payload size is invalid.
641         if (res_time.first <= 0)
642         {
643 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
644             self->m_WorkerStats.lNotReadyPop++;
645 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
646             continue;
647         }
648 
649         const sockaddr_any addr = u->m_PeerAddr;
650         const steady_clock::time_point next_send_time = res_time.second;
651         if (!is_zero(next_send_time))
652             self->m_pSndUList->update(u, CSndUList::DO_RESCHEDULE, next_send_time);
653 
654         HLOGC(qslog.Debug, log << self->CONID() << "chn:SENDING: " << pkt.Info());
655         self->m_pChannel->sendto(addr, pkt);
656 
657 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
658         self->m_WorkerStats.lSendTo++;
659 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
660     }
661 
662     THREAD_EXIT();
663     return NULL;
664 }
665 
sendto(const sockaddr_any & w_addr,CPacket & w_packet)666 int srt::CSndQueue::sendto(const sockaddr_any& w_addr, CPacket& w_packet)
667 {
668     // send out the packet immediately (high priority), this is a control packet
669     m_pChannel->sendto(w_addr, w_packet);
670     return (int)w_packet.getLength();
671 }
672 
673 //
CRcvUList()674 srt::CRcvUList::CRcvUList()
675     : m_pUList(NULL)
676     , m_pLast(NULL)
677 {
678 }
679 
~CRcvUList()680 srt::CRcvUList::~CRcvUList() {}
681 
insert(const CUDT * u)682 void srt::CRcvUList::insert(const CUDT* u)
683 {
684     CRNode* n        = u->m_pRNode;
685     n->m_tsTimeStamp = steady_clock::now();
686 
687     if (NULL == m_pUList)
688     {
689         // empty list, insert as the single node
690         n->m_pPrev = n->m_pNext = NULL;
691         m_pLast = m_pUList = n;
692 
693         return;
694     }
695 
696     // always insert at the end for RcvUList
697     n->m_pPrev       = m_pLast;
698     n->m_pNext       = NULL;
699     m_pLast->m_pNext = n;
700     m_pLast          = n;
701 }
702 
remove(const CUDT * u)703 void srt::CRcvUList::remove(const CUDT* u)
704 {
705     CRNode* n = u->m_pRNode;
706 
707     if (!n->m_bOnList)
708         return;
709 
710     if (NULL == n->m_pPrev)
711     {
712         // n is the first node
713         m_pUList = n->m_pNext;
714         if (NULL == m_pUList)
715             m_pLast = NULL;
716         else
717             m_pUList->m_pPrev = NULL;
718     }
719     else
720     {
721         n->m_pPrev->m_pNext = n->m_pNext;
722         if (NULL == n->m_pNext)
723         {
724             // n is the last node
725             m_pLast = n->m_pPrev;
726         }
727         else
728             n->m_pNext->m_pPrev = n->m_pPrev;
729     }
730 
731     n->m_pNext = n->m_pPrev = NULL;
732 }
733 
update(const CUDT * u)734 void srt::CRcvUList::update(const CUDT* u)
735 {
736     CRNode* n = u->m_pRNode;
737 
738     if (!n->m_bOnList)
739         return;
740 
741     n->m_tsTimeStamp = steady_clock::now();
742 
743     // if n is the last node, do not need to change
744     if (NULL == n->m_pNext)
745         return;
746 
747     if (NULL == n->m_pPrev)
748     {
749         m_pUList          = n->m_pNext;
750         m_pUList->m_pPrev = NULL;
751     }
752     else
753     {
754         n->m_pPrev->m_pNext = n->m_pNext;
755         n->m_pNext->m_pPrev = n->m_pPrev;
756     }
757 
758     n->m_pPrev       = m_pLast;
759     n->m_pNext       = NULL;
760     m_pLast->m_pNext = n;
761     m_pLast          = n;
762 }
763 
764 //
CHash()765 srt::CHash::CHash()
766     : m_pBucket(NULL)
767     , m_iHashSize(0)
768 {
769 }
770 
~CHash()771 srt::CHash::~CHash()
772 {
773     for (int i = 0; i < m_iHashSize; ++i)
774     {
775         CBucket* b = m_pBucket[i];
776         while (NULL != b)
777         {
778             CBucket* n = b->m_pNext;
779             delete b;
780             b = n;
781         }
782     }
783 
784     delete[] m_pBucket;
785 }
786 
init(int size)787 void srt::CHash::init(int size)
788 {
789     m_pBucket = new CBucket*[size];
790 
791     for (int i = 0; i < size; ++i)
792         m_pBucket[i] = NULL;
793 
794     m_iHashSize = size;
795 }
796 
lookup(int32_t id)797 srt::CUDT* srt::CHash::lookup(int32_t id)
798 {
799     // simple hash function (% hash table size); suitable for socket descriptors
800     CBucket* b = m_pBucket[id % m_iHashSize];
801 
802     while (NULL != b)
803     {
804         if (id == b->m_iID)
805             return b->m_pUDT;
806         b = b->m_pNext;
807     }
808 
809     return NULL;
810 }
811 
insert(int32_t id,CUDT * u)812 void srt::CHash::insert(int32_t id, CUDT* u)
813 {
814     CBucket* b = m_pBucket[id % m_iHashSize];
815 
816     CBucket* n = new CBucket;
817     n->m_iID   = id;
818     n->m_pUDT  = u;
819     n->m_pNext = b;
820 
821     m_pBucket[id % m_iHashSize] = n;
822 }
823 
remove(int32_t id)824 void srt::CHash::remove(int32_t id)
825 {
826     CBucket* b = m_pBucket[id % m_iHashSize];
827     CBucket* p = NULL;
828 
829     while (NULL != b)
830     {
831         if (id == b->m_iID)
832         {
833             if (NULL == p)
834                 m_pBucket[id % m_iHashSize] = b->m_pNext;
835             else
836                 p->m_pNext = b->m_pNext;
837 
838             delete b;
839 
840             return;
841         }
842 
843         p = b;
844         b = b->m_pNext;
845     }
846 }
847 
848 //
CRendezvousQueue()849 srt::CRendezvousQueue::CRendezvousQueue()
850     : m_lRendezvousID()
851     , m_RIDListLock()
852 {
853 }
854 
~CRendezvousQueue()855 srt::CRendezvousQueue::~CRendezvousQueue()
856 {
857     m_lRendezvousID.clear();
858 }
859 
insert(const SRTSOCKET & id,CUDT * u,const sockaddr_any & addr,const steady_clock::time_point & ttl)860 void srt::CRendezvousQueue::insert(const SRTSOCKET&                id,
861                               CUDT*                           u,
862                               const sockaddr_any&             addr,
863                               const steady_clock::time_point& ttl)
864 {
865     ScopedLock vg(m_RIDListLock);
866 
867     CRL r;
868     r.m_iID      = id;
869     r.m_pUDT     = u;
870     r.m_PeerAddr = addr;
871     r.m_tsTTL    = ttl;
872 
873     m_lRendezvousID.push_back(r);
874     HLOGC(cnlog.Debug,
875           log << "RID: adding socket @" << id << " for address: " << addr.str() << " expires: " << FormatTime(ttl)
876               << " (total connectors: " << m_lRendezvousID.size() << ")");
877 }
878 
remove(const SRTSOCKET & id)879 void srt::CRendezvousQueue::remove(const SRTSOCKET& id)
880 {
881     ScopedLock lkv(m_RIDListLock);
882 
883     for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i)
884     {
885         if (i->m_iID == id)
886         {
887             m_lRendezvousID.erase(i);
888             break;
889         }
890     }
891 }
892 
retrieve(const sockaddr_any & addr,SRTSOCKET & w_id) const893 srt::CUDT* srt::CRendezvousQueue::retrieve(const sockaddr_any& addr, SRTSOCKET& w_id) const
894 {
895     ScopedLock vg(m_RIDListLock);
896 
897     // TODO: optimize search
898     for (list<CRL>::const_iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i)
899     {
900         if (i->m_PeerAddr == addr && ((w_id == 0) || (w_id == i->m_iID)))
901         {
902             HLOGC(cnlog.Debug,
903                   log << "RID: found id @" << i->m_iID << " while looking for "
904                       << (w_id ? "THIS ID FROM " : "A NEW CONNECTION FROM ") << i->m_PeerAddr.str());
905             w_id = i->m_iID;
906             return i->m_pUDT;
907         }
908     }
909 
910 #if ENABLE_HEAVY_LOGGING
911     std::ostringstream spec;
912     if (w_id == 0)
913         spec << "A NEW CONNECTION REQUEST";
914     else
915         spec << " AGENT @" << w_id;
916     HLOGC(cnlog.Debug,
917           log << "RID: NO CONNECTOR FOR ADR:" << addr.str() << " while looking for " << spec.str() << " ("
918               << m_lRendezvousID.size() << " connectors total)");
919 #endif
920 
921     return NULL;
922 }
923 
updateConnStatus(EReadStatus rst,EConnectStatus cst,CUnit * unit)924 void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst, CUnit* unit)
925 {
926     vector<LinkStatusInfo> toRemove, toProcess;
927 
928     const CPacket* pkt = unit ? &unit->m_Packet : NULL;
929 
930     // Need a stub value for a case when there's no unit provided ("storage depleted" case).
931     // It should be normally NOT IN USE because in case of "storage depleted", rst != RST_OK.
932     const SRTSOCKET dest_id = pkt ? pkt->m_iID : 0;
933 
934     // If no socket were qualified for further handling, finish here.
935     // Otherwise toRemove and toProcess contain items to handle.
936     if (!qualifyToHandle(rst, cst, dest_id, (toRemove), (toProcess)))
937         return;
938 
939     HLOGC(cnlog.Debug,
940           log << "updateConnStatus: collected " << toProcess.size() << " for processing, " << toRemove.size()
941               << " to close");
942 
943     // Repeat (resend) connection request.
944     for (vector<LinkStatusInfo>::iterator i = toProcess.begin(); i != toProcess.end(); ++i)
945     {
946         // IMPORTANT INFORMATION concerning changes towards UDT legacy.
947         // In the UDT code there was no attempt to interpret any incoming data.
948         // All data from the incoming packet were considered to be already deployed into
949         // m_ConnRes field, and m_ConnReq field was considered at this time accordingly updated.
950         // Therefore this procedure did only one thing: craft a new handshake packet and send it.
951         // In SRT this may also interpret extra data (extensions in case when Agent is Responder)
952         // and the `pktIn` packet may sometimes contain no data. Therefore the passed `rst`
953         // must be checked to distinguish the call by periodic update (RST_AGAIN) from a call
954         // due to have received the packet (RST_OK).
955         //
956         // In the below call, only the underlying `processRendezvous` function will be attempting
957         // to interpret these data (for caller-listener this was already done by `processConnectRequest`
958         // before calling this function), and it checks for the data presence.
959 
960         EReadStatus    read_st = rst;
961         EConnectStatus conn_st = cst;
962 
963         if (i->id != dest_id)
964         {
965             read_st = RST_AGAIN;
966             conn_st = CONN_AGAIN;
967         }
968 
969         HLOGC(cnlog.Debug,
970               log << "updateConnStatus: processing async conn for @" << i->id << " FROM " << i->peeraddr.str());
971 
972         if (!i->u->processAsyncConnectRequest(read_st, conn_st, pkt, i->peeraddr))
973         {
974             // cst == CONN_REJECT can only be result of worker_ProcessAddressedPacket and
975             // its already set in this case.
976             LinkStatusInfo fi = *i;
977             fi.errorcode      = SRT_ECONNREJ;
978             toRemove.push_back(fi);
979             i->u->sendCtrl(UMSG_SHUTDOWN);
980         }
981     }
982 
983     // NOTE: it is "believed" here that all CUDT objects will not be
984     // deleted in the meantime. This is based on a statement that at worst
985     // they have been "just" declared failed and it will pass at least 1s until
986     // they are moved to ClosedSockets and it is believed that this function will
987     // not be held on mutexes that long.
988 
989     for (vector<LinkStatusInfo>::iterator i = toRemove.begin(); i != toRemove.end(); ++i)
990     {
991         HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id);
992         //
993         // Setting m_bConnecting to false, and need to remove the socket from the rendezvous queue
994         // because the next CUDT::close will not remove it from the queue when m_bConnecting = false,
995         // and may crash on next pass.
996         //
997         // TODO: maybe lock i->u->m_ConnectionLock?
998         i->u->m_bConnecting = false;
999         remove(i->u->m_SocketID);
1000 
1001         // DO NOT close the socket here because in this case it might be
1002         // unable to get status from at the right moment. Also only member
1003         // sockets should be taken care of internally - single sockets should
1004         // be normally closed by the application, after it is done with them.
1005 
1006         // app can call any UDT API to learn the connection_broken error
1007         CUDT::s_UDTUnited.m_EPoll.update_events(
1008             i->u->m_SocketID, i->u->m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true);
1009 
1010         i->u->completeBrokenConnectionDependencies(i->errorcode);
1011     }
1012 
1013     {
1014         // Now, additionally for every failed link reset the TTL so that
1015         // they are set expired right now.
1016         ScopedLock vg(m_RIDListLock);
1017         for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i)
1018         {
1019             if (find_if(toRemove.begin(), toRemove.end(), LinkStatusInfo::HasID(i->m_iID)) != toRemove.end())
1020             {
1021                 LOGC(cnlog.Error,
1022                      log << "updateConnStatus: processAsyncConnectRequest FAILED on @" << i->m_iID
1023                          << ". Setting TTL as EXPIRED.");
1024                 i->m_tsTTL =
1025                     steady_clock::time_point(); // Make it expire right now, will be picked up at the next iteration
1026             }
1027         }
1028     }
1029 }
1030 
qualifyToHandle(EReadStatus rst,EConnectStatus cst SRT_ATR_UNUSED,int iDstSockID,vector<LinkStatusInfo> & toRemove,vector<LinkStatusInfo> & toProcess)1031 bool srt::CRendezvousQueue::qualifyToHandle(EReadStatus    rst,
1032                                        EConnectStatus cst      SRT_ATR_UNUSED,
1033                                        int                     iDstSockID,
1034                                        vector<LinkStatusInfo>& toRemove,
1035                                        vector<LinkStatusInfo>& toProcess)
1036 {
1037     ScopedLock vg(m_RIDListLock);
1038 
1039     if (m_lRendezvousID.empty())
1040         return false; // nothing to process.
1041 
1042     HLOGC(cnlog.Debug,
1043           log << "updateConnStatus: updating after getting pkt with DST socket ID @" << iDstSockID
1044               << " status: " << ConnectStatusStr(cst));
1045 
1046     for (list<CRL>::iterator i = m_lRendezvousID.begin(), i_next = i; i != m_lRendezvousID.end(); i = i_next)
1047     {
1048         // Safe iterator to the next element. If the current element is erased, the iterator is updated again.
1049         ++i_next;
1050 
1051         const steady_clock::time_point tsNow = steady_clock::now();
1052 
1053         if (tsNow >= i->m_tsTTL)
1054         {
1055             HLOGC(cnlog.Debug,
1056                   log << "RID: socket @" << i->m_iID
1057                       << " removed - EXPIRED ("
1058                       // The "enforced on FAILURE" is below when processAsyncConnectRequest failed.
1059                       << (is_zero(i->m_tsTTL) ? "enforced on FAILURE" : "passed TTL") << "). WILL REMOVE from queue.");
1060 
1061             // Set appropriate error information, but do not update yet.
1062             // Exit the lock first. Collect objects to update them later.
1063             int ccerror = SRT_ECONNREJ;
1064             if (i->m_pUDT->m_RejectReason == SRT_REJ_UNKNOWN)
1065             {
1066                 if (!is_zero(i->m_tsTTL))
1067                 {
1068                     // Timer expired, set TIMEOUT forcefully
1069                     i->m_pUDT->m_RejectReason = SRT_REJ_TIMEOUT;
1070                     ccerror                   = SRT_ENOSERVER;
1071                 }
1072                 else
1073                 {
1074                     // In case of unknown reason, rejection should at least
1075                     // suggest error on the peer
1076                     i->m_pUDT->m_RejectReason = SRT_REJ_PEER;
1077                 }
1078             }
1079 
1080             // The call to completeBrokenConnectionDependencies() cannot happen here
1081             // under the lock of m_RIDListLock as it risks a deadlock.
1082             // Collect in 'toRemove' to update later.
1083             LinkStatusInfo fi = {i->m_pUDT, i->m_iID, ccerror, i->m_PeerAddr, -1};
1084             toRemove.push_back(fi);
1085 
1086             // i_next was preincremented, but this is guaranteed to point to
1087             // the element next to erased one.
1088             i_next = m_lRendezvousID.erase(i);
1089             continue;
1090         }
1091         else
1092         {
1093             HLOGC(cnlog.Debug,
1094                   log << "RID: socket @" << i->m_iID << " still active (remaining " << std::fixed
1095                       << (count_microseconds(i->m_tsTTL - tsNow) / 1000000.0) << "s of TTL)...");
1096         }
1097 
1098         const steady_clock::time_point tsLastReq = i->m_pUDT->m_tsLastReqTime;
1099         const steady_clock::time_point tsRepeat =
1100             tsLastReq + milliseconds_from(250); // Repeat connection request (send HS).
1101 
1102         // A connection request is repeated every 250 ms if there was no response from the peer:
1103         // - RST_AGAIN means no packet was received over UDP.
1104         // - a packet was received, but not for THIS socket.
1105         if ((rst == RST_AGAIN || i->m_iID != iDstSockID) && tsNow <= tsRepeat)
1106         {
1107             HLOGC(cnlog.Debug,
1108                   log << "RID:@" << i->m_iID << std::fixed << count_microseconds(tsNow - tsLastReq) / 1000.0
1109                       << " ms passed since last connection request.");
1110 
1111             continue;
1112         }
1113 
1114         HLOGC(cnlog.Debug,
1115               log << "RID:@" << i->m_iID << " cst=" << ConnectStatusStr(cst) << " -- repeating connection request.");
1116 
1117         // This queue is used only in case of Async mode (rendezvous or caller-listener).
1118         // Synchronous connection requests are handled in startConnect() completely.
1119         if (!i->m_pUDT->m_config.bSynRecving)
1120         {
1121             // Collect them so that they can be updated out of m_RIDListLock.
1122             LinkStatusInfo fi = {i->m_pUDT, i->m_iID, SRT_SUCCESS, i->m_PeerAddr, -1};
1123             toProcess.push_back(fi);
1124         }
1125         else
1126         {
1127             HLOGC(cnlog.Debug, log << "RID: socket @" << i->m_iID << " is SYNCHRONOUS, NOT UPDATING");
1128         }
1129     }
1130 
1131     return !toRemove.empty() || !toProcess.empty();
1132 }
1133 
1134 //
CRcvQueue()1135 srt::CRcvQueue::CRcvQueue()
1136     : m_WorkerThread()
1137     , m_UnitQueue()
1138     , m_pRcvUList(NULL)
1139     , m_pHash(NULL)
1140     , m_pChannel(NULL)
1141     , m_pTimer(NULL)
1142     , m_szPayloadSize()
1143     , m_bClosing(false)
1144     , m_LSLock()
1145     , m_pListener(NULL)
1146     , m_pRendezvousQueue(NULL)
1147     , m_vNewEntry()
1148     , m_IDLock()
1149     , m_mBuffer()
1150     , m_BufferCond()
1151 {
1152     setupCond(m_BufferCond, "QueueBuffer");
1153 }
1154 
~CRcvQueue()1155 srt::CRcvQueue::~CRcvQueue()
1156 {
1157     m_bClosing = true;
1158 
1159     if (m_WorkerThread.joinable())
1160     {
1161         HLOGC(rslog.Debug, log << "RcvQueue: EXIT");
1162         m_WorkerThread.join();
1163     }
1164     releaseCond(m_BufferCond);
1165 
1166     delete m_pRcvUList;
1167     delete m_pHash;
1168     delete m_pRendezvousQueue;
1169 
1170     // remove all queued messages
1171     for (map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++i)
1172     {
1173         while (!i->second.empty())
1174         {
1175             CPacket* pkt = i->second.front();
1176             delete[] pkt->m_pcData;
1177             delete pkt;
1178             i->second.pop();
1179         }
1180     }
1181 }
1182 
1183 #if ENABLE_LOGGING
1184 int srt::CRcvQueue::m_counter = 0;
1185 #endif
1186 
init(int qsize,size_t payload,int version,int hsize,CChannel * cc,CTimer * t)1187 void srt::CRcvQueue::init(int qsize, size_t payload, int version, int hsize, CChannel* cc, CTimer* t)
1188 {
1189     m_szPayloadSize = payload;
1190 
1191     m_UnitQueue.init(qsize, (int)payload, version);
1192 
1193     m_pHash = new CHash;
1194     m_pHash->init(hsize);
1195 
1196     m_pChannel = cc;
1197     m_pTimer   = t;
1198 
1199     m_pRcvUList        = new CRcvUList;
1200     m_pRendezvousQueue = new CRendezvousQueue;
1201 
1202 #if ENABLE_LOGGING
1203     ++m_counter;
1204     const std::string thrname = "SRT:RcvQ:w" + Sprint(m_counter);
1205 #else
1206     const std::string thrname = "SRT:RcvQ:w";
1207 #endif
1208 
1209     if (!StartThread(m_WorkerThread, CRcvQueue::worker, this, thrname.c_str()))
1210     {
1211         throw CUDTException(MJ_SYSTEMRES, MN_THREAD);
1212     }
1213 }
1214 
worker(void * param)1215 void* srt::CRcvQueue::worker(void* param)
1216 {
1217     CRcvQueue*   self = (CRcvQueue*)param;
1218     sockaddr_any sa(self->m_UnitQueue.getIPversion());
1219     int32_t      id = 0;
1220 
1221 #if ENABLE_LOGGING
1222     THREAD_STATE_INIT(("SRT:RcvQ:w" + Sprint(m_counter)).c_str());
1223 #else
1224     THREAD_STATE_INIT("SRT:RcvQ:worker");
1225 #endif
1226 
1227     CUnit*         unit = 0;
1228     EConnectStatus cst  = CONN_AGAIN;
1229     while (!self->m_bClosing)
1230     {
1231         bool        have_received = false;
1232         EReadStatus rst           = self->worker_RetrieveUnit((id), (unit), (sa));
1233         if (rst == RST_OK)
1234         {
1235             if (id < 0)
1236             {
1237                 // User error on peer. May log something, but generally can only ignore it.
1238                 // XXX Think maybe about sending some "connection rejection response".
1239                 HLOGC(qrlog.Debug,
1240                       log << self->CONID() << "RECEIVED negative socket id '" << id
1241                           << "', rejecting (POSSIBLE ATTACK)");
1242                 continue;
1243             }
1244 
1245             // NOTE: cst state is being changed here.
1246             // This state should be maintained through any next failed calls to worker_RetrieveUnit.
1247             // Any error switches this to rejection, just for a case.
1248 
1249             // Note to rendezvous connection. This can accept:
1250             // - ID == 0 - take the first waiting rendezvous socket
1251             // - ID > 0  - find the rendezvous socket that has this ID.
1252             if (id == 0)
1253             {
1254                 // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
1255                 cst = self->worker_ProcessConnectionRequest(unit, sa);
1256             }
1257             else
1258             {
1259                 // Otherwise ID is expected to be associated with:
1260                 // - an enqueued rendezvous socket
1261                 // - a socket connected to a peer
1262                 cst = self->worker_ProcessAddressedPacket(id, unit, sa);
1263                 // CAN RETURN CONN_REJECT, but m_RejectReason is already set
1264             }
1265             HLOGC(qrlog.Debug, log << self->CONID() << "worker: result for the unit: " << ConnectStatusStr(cst));
1266             if (cst == CONN_AGAIN)
1267             {
1268                 HLOGC(qrlog.Debug, log << self->CONID() << "worker: packet not dispatched, continuing reading.");
1269                 continue;
1270             }
1271             have_received = true;
1272         }
1273         else if (rst == RST_ERROR)
1274         {
1275             // According to the description by CChannel::recvfrom, this can be either of:
1276             // - IPE: all errors except EBADF
1277             // - socket was closed in the meantime by another thread: EBADF
1278             // If EBADF, then it's expected that the "closing" state is also set.
1279             // Check that just to report possible errors, but interrupt the loop anyway.
1280             if (self->m_bClosing)
1281             {
1282                 HLOGC(qrlog.Debug,
1283                       log << self->CONID() << "CChannel reported error, but Queue is closing - INTERRUPTING worker.");
1284             }
1285             else
1286             {
1287                 LOGC(qrlog.Fatal,
1288                      log << self->CONID()
1289                          << "CChannel reported ERROR DURING TRANSMISSION - IPE. INTERRUPTING worker anyway.");
1290             }
1291             cst = CONN_REJECT;
1292             break;
1293         }
1294         // OTHERWISE: this is an "AGAIN" situation. No data was read, but the process should continue.
1295 
1296         // take care of the timing event for all UDT sockets
1297         const steady_clock::time_point curtime_minus_syn =
1298             steady_clock::now() - microseconds_from(CUDT::COMM_SYN_INTERVAL_US);
1299 
1300         CRNode* ul = self->m_pRcvUList->m_pUList;
1301         while ((NULL != ul) && (ul->m_tsTimeStamp < curtime_minus_syn))
1302         {
1303             CUDT* u = ul->m_pUDT;
1304 
1305             if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
1306             {
1307                 u->checkTimers();
1308                 self->m_pRcvUList->update(u);
1309             }
1310             else
1311             {
1312                 HLOGC(qrlog.Debug,
1313                       log << CUDTUnited::CONID(u->m_SocketID) << " SOCKET broken, REMOVING FROM RCV QUEUE/MAP.");
1314                 // the socket must be removed from Hash table first, then RcvUList
1315                 self->m_pHash->remove(u->m_SocketID);
1316                 self->m_pRcvUList->remove(u);
1317                 u->m_pRNode->m_bOnList = false;
1318             }
1319 
1320             ul = self->m_pRcvUList->m_pUList;
1321         }
1322 
1323         if (have_received)
1324         {
1325             HLOGC(qrlog.Debug,
1326                   log << "worker: RECEIVED PACKET --> updateConnStatus. cst=" << ConnectStatusStr(cst) << " id=" << id
1327                       << " pkt-payload-size=" << unit->m_Packet.getLength());
1328         }
1329 
1330         // Check connection requests status for all sockets in the RendezvousQueue.
1331         // Pass the connection status from the last call of:
1332         // worker_ProcessAddressedPacket --->
1333         // worker_TryAsyncRend_OrStore --->
1334         // CUDT::processAsyncConnectResponse --->
1335         // CUDT::processConnectResponse
1336         self->m_pRendezvousQueue->updateConnStatus(rst, cst, unit);
1337 
1338         // XXX updateConnStatus may have removed the connector from the list,
1339         // however there's still m_mBuffer in CRcvQueue for that socket to care about.
1340     }
1341 
1342     HLOGC(qrlog.Debug, log << "worker: EXIT");
1343 
1344     THREAD_EXIT();
1345     return NULL;
1346 }
1347 
worker_RetrieveUnit(int32_t & w_id,CUnit * & w_unit,sockaddr_any & w_addr)1348 EReadStatus srt::CRcvQueue::worker_RetrieveUnit(int32_t& w_id, CUnit*& w_unit, sockaddr_any& w_addr)
1349 {
1350 #if !USE_BUSY_WAITING
1351     // This might be not really necessary, and probably
1352     // not good for extensive bidirectional communication.
1353     m_pTimer->tick();
1354 #endif
1355 
1356     // check waiting list, if new socket, insert it to the list
1357     while (ifNewEntry())
1358     {
1359         CUDT* ne = getNewEntry();
1360         if (ne)
1361         {
1362             HLOGC(qrlog.Debug,
1363                   log << CUDTUnited::CONID(ne->m_SocketID)
1364                       << " SOCKET pending for connection - ADDING TO RCV QUEUE/MAP");
1365             m_pRcvUList->insert(ne);
1366             m_pHash->insert(ne->m_SocketID, ne);
1367         }
1368     }
1369     // find next available slot for incoming packet
1370     w_unit = m_UnitQueue.getNextAvailUnit();
1371     if (!w_unit)
1372     {
1373         // no space, skip this packet
1374         CPacket temp;
1375         temp.m_pcData = new char[m_szPayloadSize];
1376         temp.setLength(m_szPayloadSize);
1377         THREAD_PAUSED();
1378         EReadStatus rst = m_pChannel->recvfrom((w_addr), (temp));
1379         THREAD_RESUMED();
1380         // Note: this will print nothing about the packet details unless heavy logging is on.
1381         LOGC(qrlog.Error, log << CONID() << "LOCAL STORAGE DEPLETED. Dropping 1 packet: " << temp.Info());
1382         delete[] temp.m_pcData;
1383 
1384         // Be transparent for RST_ERROR, but ignore the correct
1385         // data read and fake that the packet was dropped.
1386         return rst == RST_ERROR ? RST_ERROR : RST_AGAIN;
1387     }
1388 
1389     w_unit->m_Packet.setLength(m_szPayloadSize);
1390 
1391     // reading next incoming packet, recvfrom returns -1 is nothing has been received
1392     THREAD_PAUSED();
1393     EReadStatus rst = m_pChannel->recvfrom((w_addr), (w_unit->m_Packet));
1394     THREAD_RESUMED();
1395 
1396     if (rst == RST_OK)
1397     {
1398         w_id = w_unit->m_Packet.m_iID;
1399         HLOGC(qrlog.Debug,
1400               log << "INCOMING PACKET: FROM=" << w_addr.str() << " BOUND=" << m_pChannel->bindAddressAny().str() << " "
1401                   << w_unit->m_Packet.Info());
1402     }
1403     return rst;
1404 }
1405 
worker_ProcessConnectionRequest(CUnit * unit,const sockaddr_any & addr)1406 EConnectStatus srt::CRcvQueue::worker_ProcessConnectionRequest(CUnit* unit, const sockaddr_any& addr)
1407 {
1408     HLOGC(cnlog.Debug,
1409           log << "Got sockID=0 from " << addr.str() << " - trying to resolve it as a connection request...");
1410     // Introduced protection because it may potentially happen
1411     // that another thread could have closed the socket at
1412     // the same time and inject a bug between checking the
1413     // pointer for NULL and using it.
1414     int  listener_ret  = SRT_REJ_UNKNOWN;
1415     bool have_listener = false;
1416     {
1417         ScopedLock cg(m_LSLock);
1418         if (m_pListener)
1419         {
1420             LOGC(cnlog.Note, log << "PASSING request from: " << addr.str() << " to agent:" << m_pListener->socketID());
1421             listener_ret = m_pListener->processConnectRequest(addr, unit->m_Packet);
1422 
1423             // This function does return a code, but it's hard to say as to whether
1424             // anything can be done about it. In case when it's stated possible, the
1425             // listener will try to send some rejection response to the caller, but
1426             // that's already done inside this function. So it's only used for
1427             // displaying the error in logs.
1428 
1429             have_listener = true;
1430         }
1431     }
1432 
1433     // NOTE: Rendezvous sockets do bind(), but not listen(). It means that the socket is
1434     // ready to accept connection requests, but they are not being redirected to the listener
1435     // socket, as this is not a listener socket at all. This goes then HERE.
1436 
1437     if (have_listener) // That is, the above block with m_pListener->processConnectRequest was executed
1438     {
1439         LOGC(cnlog.Note,
1440              log << CONID() << "Listener managed the connection request from: " << addr.str()
1441                  << " result:" << RequestTypeStr(UDTRequestType(listener_ret)));
1442         return listener_ret == SRT_REJ_UNKNOWN ? CONN_CONTINUE : CONN_REJECT;
1443     }
1444 
1445     // If there's no listener waiting for the packet, just store it into the queue.
1446     return worker_TryAsyncRend_OrStore(0, unit, addr); // 0 id because the packet came in with that very ID.
1447 }
1448 
worker_ProcessAddressedPacket(int32_t id,CUnit * unit,const sockaddr_any & addr)1449 EConnectStatus srt::CRcvQueue::worker_ProcessAddressedPacket(int32_t id, CUnit* unit, const sockaddr_any& addr)
1450 {
1451     CUDT* u = m_pHash->lookup(id);
1452     if (!u)
1453     {
1454         // Pass this to either async rendezvous connection,
1455         // or store the packet in the queue.
1456         HLOGC(cnlog.Debug, log << "worker_ProcessAddressedPacket: resending to QUEUED socket @" << id);
1457         return worker_TryAsyncRend_OrStore(id, unit, addr);
1458     }
1459 
1460     // Found associated CUDT - process this as control or data packet
1461     // addressed to an associated socket.
1462     if (addr != u->m_PeerAddr)
1463     {
1464         HLOGC(cnlog.Debug,
1465               log << CONID() << "Packet for SID=" << id << " asoc with " << u->m_PeerAddr.str() << " received from "
1466                   << addr.str() << " (CONSIDERED ATTACK ATTEMPT)");
1467         // This came not from the address that is the peer associated
1468         // with the socket. Ignore it.
1469         return CONN_AGAIN;
1470     }
1471 
1472     if (!u->m_bConnected || u->m_bBroken || u->m_bClosing)
1473     {
1474         u->m_RejectReason = SRT_REJ_CLOSE;
1475         // The socket is currently in the process of being disconnected
1476         // or destroyed. Ignore.
1477         // XXX send UMSG_SHUTDOWN in this case?
1478         // XXX May it require mutex protection?
1479         return CONN_REJECT;
1480     }
1481 
1482     if (unit->m_Packet.isControl())
1483         u->processCtrl(unit->m_Packet);
1484     else
1485         u->processData(unit);
1486 
1487     u->checkTimers();
1488     m_pRcvUList->update(u);
1489 
1490     return CONN_RUNNING;
1491 }
1492 
1493 // This function responds to the fact that a packet has come
1494 // for a socket that does not expect to receive a normal connection
1495 // request. This can be then:
1496 // - a normal packet of whatever kind, just to be processed by the message loop
1497 // - a rendezvous connection
1498 // This function then tries to manage the packet as a rendezvous connection
1499 // request in ASYNC mode; when this is not applicable, it stores the packet
1500 // in the "receiving queue" so that it will be picked up in the "main" thread.
worker_TryAsyncRend_OrStore(int32_t id,CUnit * unit,const sockaddr_any & addr)1501 EConnectStatus srt::CRcvQueue::worker_TryAsyncRend_OrStore(int32_t id, CUnit* unit, const sockaddr_any& addr)
1502 {
1503     // This 'retrieve' requires that 'id' be either one of those
1504     // stored in the rendezvous queue (see CRcvQueue::registerConnector)
1505     // or simply 0, but then at least the address must match one of these.
1506     // If the id was 0, it will be set to the actual socket ID of the returned CUDT.
1507     CUDT* u = m_pRendezvousQueue->retrieve(addr, (id));
1508     if (!u)
1509     {
1510         // this socket is then completely unknown to the system.
1511         // Note that this situation may also happen at a very unfortunate
1512         // coincidence that the socket is already bound, but the registerConnector()
1513         // has not yet started. In case of rendezvous this may mean that the other
1514         // side just started sending its handshake packets, the local side has already
1515         // run the CRcvQueue::worker thread, and this worker thread is trying to dispatch
1516         // the handshake packet too early, before the dispatcher has a chance to see
1517         // this socket registerred in the RendezvousQueue, which causes the packet unable
1518         // to be dispatched. Therefore simply treat every "out of band" packet (with socket
1519         // not belonging to the connection and not registered as rendezvous) as "possible
1520         // attack" and ignore it. This also should better protect the rendezvous socket
1521         // against a rogue connector.
1522         if (id == 0)
1523         {
1524             HLOGC(cnlog.Debug,
1525                   log << CONID() << "AsyncOrRND: no sockets expect connection from " << addr.str()
1526                       << " - POSSIBLE ATTACK, ignore packet");
1527         }
1528         else
1529         {
1530             HLOGC(cnlog.Debug,
1531                   log << CONID() << "AsyncOrRND: no sockets expect socket " << id << " from " << addr.str()
1532                       << " - POSSIBLE ATTACK, ignore packet");
1533         }
1534         return CONN_AGAIN; // This means that the packet should be ignored.
1535     }
1536 
1537     // asynchronous connect: call connect here
1538     // otherwise wait for the UDT socket to retrieve this packet
1539     if (!u->m_config.bSynRecving)
1540     {
1541         HLOGC(cnlog.Debug, log << "AsyncOrRND: packet RESOLVED TO @" << id << " -- continuing as ASYNC CONNECT");
1542         // This is practically same as processConnectResponse, just this applies
1543         // appropriate mutex lock - which can't be done here because it's intentionally private.
1544         // OTOH it can't be applied to processConnectResponse because the synchronous
1545         // call to this method applies the lock by itself, and same-thread-double-locking is nonportable (crashable).
1546         EConnectStatus cst = u->processAsyncConnectResponse(unit->m_Packet);
1547 
1548         if (cst == CONN_CONFUSED)
1549         {
1550             LOGC(cnlog.Warn, log << "AsyncOrRND: PACKET NOT HANDSHAKE - re-requesting handshake from peer");
1551             storePkt(id, unit->m_Packet.clone());
1552             if (!u->processAsyncConnectRequest(RST_AGAIN, CONN_CONTINUE, &unit->m_Packet, u->m_PeerAddr))
1553             {
1554                 // Reuse previous behavior to reject a packet
1555                 cst = CONN_REJECT;
1556             }
1557             else
1558             {
1559                 cst = CONN_CONTINUE;
1560             }
1561         }
1562 
1563         // It might be that this is a data packet, which has turned the connection
1564         // into "connected" state, removed the connector (so since now every next packet
1565         // will land directly in the queue), but this data packet shall still be delivered.
1566         if (cst == CONN_ACCEPT && !unit->m_Packet.isControl())
1567         {
1568             // The process as called through processAsyncConnectResponse() should have put the
1569             // socket into the pending queue for pending connection (don't ask me, this is so).
1570             // This pending queue is being purged every time in the beginning of this loop, so
1571             // currently the socket is in the pending queue, but not yet in the connection queue.
1572             // It will be done at the next iteration of the reading loop, but it will be too late,
1573             // we have a pending data packet now and we must either dispatch it to an already connected
1574             // socket or disregard it, and rather prefer the former. So do this transformation now
1575             // that we KNOW (by the cst == CONN_ACCEPT result) that the socket should be inserted
1576             // into the pending anteroom.
1577 
1578             CUDT* ne = getNewEntry(); // This function actuall removes the entry and returns it.
1579             // This **should** now always return a non-null value, but check it first
1580             // because if this accidentally isn't true, the call to worker_ProcessAddressedPacket will
1581             // result in redirecting it to here and so on until the call stack overflow. In case of
1582             // this "accident" simply disregard the packet from any further processing, it will be later
1583             // loss-recovered.
1584             // XXX (Probably the old contents of UDT's CRcvQueue::worker should be shaped a little bit
1585             // differently throughout the functions).
1586             if (ne)
1587             {
1588                 HLOGC(cnlog.Debug,
1589                       log << CUDTUnited::CONID(ne->m_SocketID)
1590                           << " SOCKET pending for connection - ADDING TO RCV QUEUE/MAP");
1591                 m_pRcvUList->insert(ne);
1592                 m_pHash->insert(ne->m_SocketID, ne);
1593 
1594                 // The current situation is that this has passed processAsyncConnectResponse, but actually
1595                 // this packet *SHOULD HAVE BEEN* handled by worker_ProcessAddressedPacket, however the
1596                 // connection state wasn't completed at the moment when dispatching this packet. This has
1597                 // been now completed inside the call to processAsyncConnectResponse, but this is still a
1598                 // data packet that should have expected the connection to be already established. Therefore
1599                 // redirect it once again into worker_ProcessAddressedPacket here.
1600 
1601                 HLOGC(cnlog.Debug,
1602                       log << "AsyncOrRND: packet SWITCHED TO CONNECTED with ID=" << id
1603                           << " -- passing to worker_ProcessAddressedPacket");
1604 
1605                 // Theoretically we should check if m_pHash->lookup(ne->m_SocketID) returns 'ne', but this
1606                 // has been just added to m_pHash, so the check would be extremely paranoid here.
1607                 cst = worker_ProcessAddressedPacket(id, unit, addr);
1608                 if (cst == CONN_REJECT)
1609                     return cst;
1610                 return CONN_ACCEPT; // this function usually will return CONN_CONTINUE, which doesn't represent current
1611                                     // situation.
1612             }
1613             else
1614             {
1615                 LOGC(cnlog.Error,
1616                      log << "IPE: AsyncOrRND: packet SWITCHED TO CONNECTED, but ID=" << id
1617                          << " is still not present in the socket ID dispatch hash - DISREGARDING");
1618             }
1619         }
1620         return cst;
1621     }
1622     HLOGC(cnlog.Debug,
1623           log << "AsyncOrRND: packet RESOLVED TO ID=" << id << " -- continuing through CENTRAL PACKET QUEUE");
1624     // This is where also the packets for rendezvous connection will be landing,
1625     // in case of a synchronous connection.
1626     storePkt(id, unit->m_Packet.clone());
1627 
1628     return CONN_CONTINUE;
1629 }
1630 
stopWorker()1631 void srt::CRcvQueue::stopWorker()
1632 {
1633     // We use the decent way, so we say to the thread "please exit".
1634     m_bClosing = true;
1635 
1636     // Sanity check of the function's affinity.
1637     if (srt::sync::this_thread::get_id() == m_WorkerThread.get_id())
1638     {
1639         LOGC(rslog.Error, log << "IPE: RcvQ:WORKER TRIES TO CLOSE ITSELF!");
1640         return; // do nothing else, this would cause a hangup or crash.
1641     }
1642 
1643     HLOGC(rslog.Debug, log << "RcvQueue: EXIT (forced)");
1644     // And we trust the thread that it does.
1645     m_WorkerThread.join();
1646 }
1647 
recvfrom(int32_t id,CPacket & w_packet)1648 int srt::CRcvQueue::recvfrom(int32_t id, CPacket& w_packet)
1649 {
1650     UniqueLock bufferlock(m_BufferLock);
1651     CSync      buffercond(m_BufferCond, bufferlock);
1652 
1653     map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
1654 
1655     if (i == m_mBuffer.end())
1656     {
1657         THREAD_PAUSED();
1658         buffercond.wait_for(seconds_from(1));
1659         THREAD_RESUMED();
1660 
1661         i = m_mBuffer.find(id);
1662         if (i == m_mBuffer.end())
1663         {
1664             w_packet.setLength(-1);
1665             return -1;
1666         }
1667     }
1668 
1669     // retrieve the earliest packet
1670     CPacket* newpkt = i->second.front();
1671 
1672     if (w_packet.getLength() < newpkt->getLength())
1673     {
1674         w_packet.setLength(-1);
1675         return -1;
1676     }
1677 
1678     // copy packet content
1679     // XXX Check if this wouldn't be better done by providing
1680     // copy constructor for DynamicStruct.
1681     // XXX Another thing: this looks wasteful. This expects an already
1682     // allocated memory on the packet, this thing gets the packet,
1683     // copies it into the passed packet and then the source packet
1684     // gets deleted. Why not simply return the originally stored packet,
1685     // without copying, allocation and deallocation?
1686     memcpy((w_packet.m_nHeader), newpkt->m_nHeader, CPacket::HDR_SIZE);
1687     memcpy((w_packet.m_pcData), newpkt->m_pcData, newpkt->getLength());
1688     w_packet.setLength(newpkt->getLength());
1689 
1690     delete[] newpkt->m_pcData;
1691     delete newpkt;
1692 
1693     // remove this message from queue,
1694     // if no more messages left for this socket, release its data structure
1695     i->second.pop();
1696     if (i->second.empty())
1697         m_mBuffer.erase(i);
1698 
1699     return (int)w_packet.getLength();
1700 }
1701 
setListener(CUDT * u)1702 int srt::CRcvQueue::setListener(CUDT* u)
1703 {
1704     ScopedLock lslock(m_LSLock);
1705 
1706     if (NULL != m_pListener)
1707         return -1;
1708 
1709     m_pListener = u;
1710     return 0;
1711 }
1712 
removeListener(const CUDT * u)1713 void srt::CRcvQueue::removeListener(const CUDT* u)
1714 {
1715     ScopedLock lslock(m_LSLock);
1716 
1717     if (u == m_pListener)
1718         m_pListener = NULL;
1719 }
1720 
registerConnector(const SRTSOCKET & id,CUDT * u,const sockaddr_any & addr,const steady_clock::time_point & ttl)1721 void srt::CRcvQueue::registerConnector(const SRTSOCKET&                id,
1722                                   CUDT*                           u,
1723                                   const sockaddr_any&             addr,
1724                                   const steady_clock::time_point& ttl)
1725 {
1726     HLOGC(cnlog.Debug,
1727           log << "registerConnector: adding @" << id << " addr=" << addr.str() << " TTL=" << FormatTime(ttl));
1728     m_pRendezvousQueue->insert(id, u, addr, ttl);
1729 }
1730 
removeConnector(const SRTSOCKET & id)1731 void srt::CRcvQueue::removeConnector(const SRTSOCKET& id)
1732 {
1733     HLOGC(cnlog.Debug, log << "removeConnector: removing @" << id);
1734     m_pRendezvousQueue->remove(id);
1735 
1736     ScopedLock bufferlock(m_BufferLock);
1737 
1738     map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
1739     if (i != m_mBuffer.end())
1740     {
1741         HLOGC(cnlog.Debug,
1742               log << "removeConnector: ... and its packet queue with " << i->second.size() << " packets collected");
1743         while (!i->second.empty())
1744         {
1745             delete[] i->second.front()->m_pcData;
1746             delete i->second.front();
1747             i->second.pop();
1748         }
1749         m_mBuffer.erase(i);
1750     }
1751 }
1752 
setNewEntry(CUDT * u)1753 void srt::CRcvQueue::setNewEntry(CUDT* u)
1754 {
1755     HLOGC(cnlog.Debug, log << CUDTUnited::CONID(u->m_SocketID) << "setting socket PENDING FOR CONNECTION");
1756     ScopedLock listguard(m_IDLock);
1757     m_vNewEntry.push_back(u);
1758 }
1759 
ifNewEntry()1760 bool srt::CRcvQueue::ifNewEntry()
1761 {
1762     return !(m_vNewEntry.empty());
1763 }
1764 
getNewEntry()1765 srt::CUDT* srt::CRcvQueue::getNewEntry()
1766 {
1767     ScopedLock listguard(m_IDLock);
1768 
1769     if (m_vNewEntry.empty())
1770         return NULL;
1771 
1772     CUDT* u = (CUDT*)*(m_vNewEntry.begin());
1773     m_vNewEntry.erase(m_vNewEntry.begin());
1774 
1775     return u;
1776 }
1777 
storePkt(int32_t id,CPacket * pkt)1778 void srt::CRcvQueue::storePkt(int32_t id, CPacket* pkt)
1779 {
1780     UniqueLock bufferlock(m_BufferLock);
1781     CSync      passcond(m_BufferCond, bufferlock);
1782 
1783     map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
1784 
1785     if (i == m_mBuffer.end())
1786     {
1787         m_mBuffer[id].push(pkt);
1788         passcond.signal_locked(bufferlock);
1789     }
1790     else
1791     {
1792         // avoid storing too many packets, in case of malfunction or attack
1793         if (i->second.size() > 16)
1794             return;
1795 
1796         i->second.push(pkt);
1797     }
1798 }
1799 
destroy()1800 void srt::CMultiplexer::destroy()
1801 {
1802     // Reverse order of the assigned
1803     delete m_pRcvQueue;
1804     delete m_pSndQueue;
1805     delete m_pTimer;
1806 
1807     if (m_pChannel)
1808     {
1809         m_pChannel->close();
1810         delete m_pChannel;
1811     }
1812 }
1813