1 /*
2 * SRT - Secure, Reliable, Transport
3 * Copyright (c) 2018 Haivision Systems Inc.
4 *
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 *
9 */
10
11 /*****************************************************************************
12 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
13 All rights reserved.
14
15 Redistribution and use in source and binary forms, with or without
16 modification, are permitted provided that the following conditions are
17 met:
18
19 * Redistributions of source code must retain the above
20 copyright notice, this list of conditions and the
21 following disclaimer.
22
23 * Redistributions in binary form must reproduce the
24 above copyright notice, this list of conditions
25 and the following disclaimer in the documentation
26 and/or other materials provided with the distribution.
27
28 * Neither the name of the University of Illinois
29 nor the names of its contributors may be used to
30 endorse or promote products derived from this
31 software without specific prior written permission.
32
33 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
34 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
35 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
36 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
37 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
38 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
39 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
40 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
41 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
42 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
43 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
44 *****************************************************************************/
45
46 /*****************************************************************************
47 written by
48 Yunhong Gu, last updated 05/05/2011
49 modified by
50 Haivision Systems Inc.
51 *****************************************************************************/
52
53 #include "platform_sys.h"
54
55 #include <cstring>
56
57 #include "common.h"
58 #include "api.h"
59 #include "netinet_any.h"
60 #include "threadname.h"
61 #include "logging.h"
62 #include "queue.h"
63
64 using namespace std;
65 using namespace srt::sync;
66 using namespace srt_logging;
67
CUnitQueue()68 srt::CUnitQueue::CUnitQueue()
69 : m_pQEntry(NULL)
70 , m_pCurrQueue(NULL)
71 , m_pLastQueue(NULL)
72 , m_iSize(0)
73 , m_iCount(0)
74 , m_iMSS()
75 , m_iIPversion()
76 {
77 }
78
~CUnitQueue()79 srt::CUnitQueue::~CUnitQueue()
80 {
81 CQEntry* p = m_pQEntry;
82
83 while (p != NULL)
84 {
85 delete[] p->m_pUnit;
86 delete[] p->m_pBuffer;
87
88 CQEntry* q = p;
89 if (p == m_pLastQueue)
90 p = NULL;
91 else
92 p = p->m_pNext;
93 delete q;
94 }
95 }
96
init(int size,int mss,int version)97 int srt::CUnitQueue::init(int size, int mss, int version)
98 {
99 CQEntry* tempq = NULL;
100 CUnit* tempu = NULL;
101 char* tempb = NULL;
102
103 try
104 {
105 tempq = new CQEntry;
106 tempu = new CUnit[size];
107 tempb = new char[size * mss];
108 }
109 catch (...)
110 {
111 delete tempq;
112 delete[] tempu;
113 delete[] tempb;
114
115 return -1;
116 }
117
118 for (int i = 0; i < size; ++i)
119 {
120 tempu[i].m_iFlag = CUnit::FREE;
121 tempu[i].m_Packet.m_pcData = tempb + i * mss;
122 }
123 tempq->m_pUnit = tempu;
124 tempq->m_pBuffer = tempb;
125 tempq->m_iSize = size;
126
127 m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq;
128 m_pQEntry->m_pNext = m_pQEntry;
129
130 m_pAvailUnit = m_pCurrQueue->m_pUnit;
131
132 m_iSize = size;
133 m_iMSS = mss;
134 m_iIPversion = version;
135
136 return 0;
137 }
138
139 // XXX Lots of common code with CUnitQueue:init.
140 // Consider merging.
increase()141 int srt::CUnitQueue::increase()
142 {
143 // adjust/correct m_iCount
144 int real_count = 0;
145 CQEntry* p = m_pQEntry;
146 while (p != NULL)
147 {
148 CUnit* u = p->m_pUnit;
149 for (CUnit* end = u + p->m_iSize; u != end; ++u)
150 if (u->m_iFlag != CUnit::FREE)
151 ++real_count;
152
153 if (p == m_pLastQueue)
154 p = NULL;
155 else
156 p = p->m_pNext;
157 }
158 m_iCount = real_count;
159 if (double(m_iCount) / m_iSize < 0.9)
160 return -1;
161
162 CQEntry* tempq = NULL;
163 CUnit* tempu = NULL;
164 char* tempb = NULL;
165
166 // all queues have the same size
167 const int size = m_pQEntry->m_iSize;
168
169 try
170 {
171 tempq = new CQEntry;
172 tempu = new CUnit[size];
173 tempb = new char[size * m_iMSS];
174 }
175 catch (...)
176 {
177 delete tempq;
178 delete[] tempu;
179 delete[] tempb;
180
181 LOGC(rslog.Error,
182 log << "CUnitQueue:increase: failed to allocate " << size << " new units."
183 << " Current size=" << m_iSize);
184 return -1;
185 }
186
187 for (int i = 0; i < size; ++i)
188 {
189 tempu[i].m_iFlag = CUnit::FREE;
190 tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
191 }
192 tempq->m_pUnit = tempu;
193 tempq->m_pBuffer = tempb;
194 tempq->m_iSize = size;
195
196 m_pLastQueue->m_pNext = tempq;
197 m_pLastQueue = tempq;
198 m_pLastQueue->m_pNext = m_pQEntry;
199
200 m_iSize += size;
201
202 return 0;
203 }
204
shrink()205 int srt::CUnitQueue::shrink()
206 {
207 // currently queue cannot be shrunk.
208 return -1;
209 }
210
getNextAvailUnit()211 srt::CUnit* srt::CUnitQueue::getNextAvailUnit()
212 {
213 if (m_iCount * 10 > m_iSize * 9)
214 increase();
215
216 if (m_iCount >= m_iSize)
217 return NULL;
218
219 int units_checked = 0;
220 do
221 {
222 const CUnit* end = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize;
223 for (; m_pAvailUnit != end; ++m_pAvailUnit, ++units_checked)
224 {
225 if (m_pAvailUnit->m_iFlag == CUnit::FREE)
226 {
227 return m_pAvailUnit;
228 }
229 }
230
231 m_pCurrQueue = m_pCurrQueue->m_pNext;
232 m_pAvailUnit = m_pCurrQueue->m_pUnit;
233 } while (units_checked < m_iSize);
234
235 increase();
236
237 return NULL;
238 }
239
makeUnitFree(CUnit * unit)240 void srt::CUnitQueue::makeUnitFree(CUnit* unit)
241 {
242 SRT_ASSERT(unit != NULL);
243 SRT_ASSERT(unit->m_iFlag != CUnit::FREE);
244 unit->m_iFlag = CUnit::FREE;
245
246 --m_iCount;
247 }
248
makeUnitGood(CUnit * unit)249 void srt::CUnitQueue::makeUnitGood(CUnit* unit)
250 {
251 ++m_iCount;
252
253 SRT_ASSERT(unit != NULL);
254 SRT_ASSERT(unit->m_iFlag == CUnit::FREE);
255 unit->m_iFlag = CUnit::GOOD;
256 }
257
CSndUList(sync::CTimer * pTimer)258 srt::CSndUList::CSndUList(sync::CTimer* pTimer)
259 : m_pHeap(NULL)
260 , m_iArrayLength(512)
261 , m_iLastEntry(-1)
262 , m_ListLock()
263 , m_pTimer(pTimer)
264 {
265 setupCond(m_ListCond, "CSndUListCond");
266 m_pHeap = new CSNode*[m_iArrayLength];
267 }
268
~CSndUList()269 srt::CSndUList::~CSndUList()
270 {
271 releaseCond(m_ListCond);
272 delete[] m_pHeap;
273 }
274
update(const CUDT * u,EReschedule reschedule,sync::steady_clock::time_point ts)275 void srt::CSndUList::update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts)
276 {
277 ScopedLock listguard(m_ListLock);
278
279 CSNode* n = u->m_pSNode;
280
281 if (n->m_iHeapLoc >= 0)
282 {
283 if (reschedule == DONT_RESCHEDULE)
284 return;
285
286 if (n->m_tsTimeStamp <= ts)
287 return;
288
289 if (n->m_iHeapLoc == 0)
290 {
291 n->m_tsTimeStamp = ts;
292 m_pTimer->interrupt();
293 return;
294 }
295
296 remove_(u);
297 insert_norealloc_(ts, u);
298 return;
299 }
300
301 insert_(ts, u);
302 }
303
pop()304 srt::CUDT* srt::CSndUList::pop()
305 {
306 ScopedLock listguard(m_ListLock);
307
308 if (-1 == m_iLastEntry)
309 return NULL;
310
311 // no pop until the next scheduled time
312 if (m_pHeap[0]->m_tsTimeStamp > steady_clock::now())
313 return NULL;
314
315 CUDT* u = m_pHeap[0]->m_pUDT;
316 remove_(u);
317 return u;
318 }
319
remove(const CUDT * u)320 void srt::CSndUList::remove(const CUDT* u)
321 {
322 ScopedLock listguard(m_ListLock);
323 remove_(u);
324 }
325
getNextProcTime()326 steady_clock::time_point srt::CSndUList::getNextProcTime()
327 {
328 ScopedLock listguard(m_ListLock);
329
330 if (-1 == m_iLastEntry)
331 return steady_clock::time_point();
332
333 return m_pHeap[0]->m_tsTimeStamp;
334 }
335
waitNonEmpty() const336 void srt::CSndUList::waitNonEmpty() const
337 {
338 UniqueLock listguard(m_ListLock);
339 if (m_iLastEntry >= 0)
340 return;
341
342 m_ListCond.wait(listguard);
343 }
344
signalInterrupt() const345 void srt::CSndUList::signalInterrupt() const
346 {
347 ScopedLock listguard(m_ListLock);
348 m_ListCond.notify_all();
349 }
350
realloc_()351 void srt::CSndUList::realloc_()
352 {
353 CSNode** temp = NULL;
354
355 try
356 {
357 temp = new CSNode*[2 * m_iArrayLength];
358 }
359 catch (...)
360 {
361 throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
362 }
363
364 memcpy((temp), m_pHeap, sizeof(CSNode*) * m_iArrayLength);
365 m_iArrayLength *= 2;
366 delete[] m_pHeap;
367 m_pHeap = temp;
368 }
369
insert_(const steady_clock::time_point & ts,const CUDT * u)370 void srt::CSndUList::insert_(const steady_clock::time_point& ts, const CUDT* u)
371 {
372 // increase the heap array size if necessary
373 if (m_iLastEntry == m_iArrayLength - 1)
374 realloc_();
375
376 insert_norealloc_(ts, u);
377 }
378
insert_norealloc_(const steady_clock::time_point & ts,const CUDT * u)379 void srt::CSndUList::insert_norealloc_(const steady_clock::time_point& ts, const CUDT* u)
380 {
381 CSNode* n = u->m_pSNode;
382
383 // do not insert repeated node
384 if (n->m_iHeapLoc >= 0)
385 return;
386
387 SRT_ASSERT(m_iLastEntry < m_iArrayLength);
388
389 m_iLastEntry++;
390 m_pHeap[m_iLastEntry] = n;
391 n->m_tsTimeStamp = ts;
392
393 int q = m_iLastEntry;
394 int p = q;
395 while (p != 0)
396 {
397 p = (q - 1) >> 1;
398 if (m_pHeap[p]->m_tsTimeStamp <= m_pHeap[q]->m_tsTimeStamp)
399 break;
400
401 swap(m_pHeap[p], m_pHeap[q]);
402 m_pHeap[q]->m_iHeapLoc = q;
403 q = p;
404 }
405
406 n->m_iHeapLoc = q;
407
408 // an earlier event has been inserted, wake up sending worker
409 if (n->m_iHeapLoc == 0)
410 m_pTimer->interrupt();
411
412 // first entry, activate the sending queue
413 if (0 == m_iLastEntry)
414 {
415 // m_ListLock is assumed to be locked.
416 m_ListCond.notify_all();
417 }
418 }
419
remove_(const CUDT * u)420 void srt::CSndUList::remove_(const CUDT* u)
421 {
422 CSNode* n = u->m_pSNode;
423
424 if (n->m_iHeapLoc >= 0)
425 {
426 // remove the node from heap
427 m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
428 m_iLastEntry--;
429 m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc.load();
430
431 int q = n->m_iHeapLoc;
432 int p = q * 2 + 1;
433 while (p <= m_iLastEntry)
434 {
435 if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_tsTimeStamp > m_pHeap[p + 1]->m_tsTimeStamp))
436 p++;
437
438 if (m_pHeap[q]->m_tsTimeStamp > m_pHeap[p]->m_tsTimeStamp)
439 {
440 swap(m_pHeap[p], m_pHeap[q]);
441 m_pHeap[p]->m_iHeapLoc = p;
442 m_pHeap[q]->m_iHeapLoc = q;
443
444 q = p;
445 p = q * 2 + 1;
446 }
447 else
448 break;
449 }
450
451 n->m_iHeapLoc = -1;
452 }
453
454 // the only event has been deleted, wake up immediately
455 if (0 == m_iLastEntry)
456 m_pTimer->interrupt();
457 }
458
459 //
CSndQueue()460 srt::CSndQueue::CSndQueue()
461 : m_pSndUList(NULL)
462 , m_pChannel(NULL)
463 , m_pTimer(NULL)
464 , m_bClosing(false)
465 {
466 }
467
~CSndQueue()468 srt::CSndQueue::~CSndQueue()
469 {
470 m_bClosing = true;
471
472 if (m_pTimer != NULL)
473 {
474 m_pTimer->interrupt();
475 }
476
477 // Unblock CSndQueue worker thread if it is waiting.
478 m_pSndUList->signalInterrupt();
479
480 if (m_WorkerThread.joinable())
481 {
482 HLOGC(rslog.Debug, log << "SndQueue: EXIT");
483 m_WorkerThread.join();
484 }
485
486 delete m_pSndUList;
487 }
488
ioctlQuery(int type) const489 int srt::CSndQueue::ioctlQuery(int type) const
490 {
491 return m_pChannel->ioctlQuery(type);
492 }
sockoptQuery(int level,int type) const493 int srt::CSndQueue::sockoptQuery(int level, int type) const
494 {
495 return m_pChannel->sockoptQuery(level, type);
496 }
497
498 #if ENABLE_LOGGING
499 int srt::CSndQueue::m_counter = 0;
500 #endif
501
init(CChannel * c,CTimer * t)502 void srt::CSndQueue::init(CChannel* c, CTimer* t)
503 {
504 m_pChannel = c;
505 m_pTimer = t;
506 m_pSndUList = new CSndUList(t);
507
508 #if ENABLE_LOGGING
509 ++m_counter;
510 const std::string thrname = "SRT:SndQ:w" + Sprint(m_counter);
511 const char* thname = thrname.c_str();
512 #else
513 const char* thname = "SRT:SndQ";
514 #endif
515 if (!StartThread(m_WorkerThread, CSndQueue::worker, this, thname))
516 throw CUDTException(MJ_SYSTEMRES, MN_THREAD);
517 }
518
getIpTTL() const519 int srt::CSndQueue::getIpTTL() const
520 {
521 return m_pChannel ? m_pChannel->getIpTTL() : -1;
522 }
523
getIpToS() const524 int srt::CSndQueue::getIpToS() const
525 {
526 return m_pChannel ? m_pChannel->getIpToS() : -1;
527 }
528
529 #ifdef SRT_ENABLE_BINDTODEVICE
getBind(char * dst,size_t len) const530 bool srt::CSndQueue::getBind(char* dst, size_t len) const
531 {
532 return m_pChannel ? m_pChannel->getBind(dst, len) : false;
533 }
534 #endif
535
worker(void * param)536 void* srt::CSndQueue::worker(void* param)
537 {
538 CSndQueue* self = (CSndQueue*)param;
539
540 #if ENABLE_LOGGING
541 THREAD_STATE_INIT(("SRT:SndQ:w" + Sprint(m_counter)).c_str());
542 #else
543 THREAD_STATE_INIT("SRT:SndQ:worker");
544 #endif
545
546 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
547 CTimer::rdtsc(self->m_ullDbgTime);
548 self->m_ullDbgPeriod = uint64_t(5000000) * CTimer::getCPUFrequency();
549 self->m_ullDbgTime += self->m_ullDbgPeriod;
550 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
551
552 while (!self->m_bClosing)
553 {
554 const steady_clock::time_point next_time = self->m_pSndUList->getNextProcTime();
555
556 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
557 self->m_WorkerStats.lIteration++;
558 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
559
560 if (is_zero(next_time))
561 {
562 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
563 self->m_WorkerStats.lNotReadyTs++;
564 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
565
566 // wait here if there is no sockets with data to be sent
567 THREAD_PAUSED();
568 if (!self->m_bClosing)
569 {
570 self->m_pSndUList->waitNonEmpty();
571
572 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
573 self->m_WorkerStats.lCondWait++;
574 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
575 }
576 THREAD_RESUMED();
577
578 continue;
579 }
580
581 // wait until next processing time of the first socket on the list
582 const steady_clock::time_point currtime = steady_clock::now();
583
584 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
585 if (self->m_ullDbgTime <= currtime)
586 {
587 fprintf(stdout,
588 "SndQueue %lu slt:%lu nrp:%lu snt:%lu nrt:%lu ctw:%lu\n",
589 self->m_WorkerStats.lIteration,
590 self->m_WorkerStats.lSleepTo,
591 self->m_WorkerStats.lNotReadyPop,
592 self->m_WorkerStats.lSendTo,
593 self->m_WorkerStats.lNotReadyTs,
594 self->m_WorkerStats.lCondWait);
595 memset(&self->m_WorkerStats, 0, sizeof(self->m_WorkerStats));
596 self->m_ullDbgTime = currtime + self->m_ullDbgPeriod;
597 }
598 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
599
600 THREAD_PAUSED();
601 if (currtime < next_time)
602 {
603 self->m_pTimer->sleep_until(next_time);
604
605 #if defined(HAI_DEBUG_SNDQ_HIGHRATE)
606 self->m_WorkerStats.lSleepTo++;
607 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
608 }
609 THREAD_RESUMED();
610
611 // Get a socket with a send request if any.
612 CUDT* u = self->m_pSndUList->pop();
613 if (u == NULL)
614 {
615 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
616 self->m_WorkerStats.lNotReadyPop++;
617 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
618 continue;
619 }
620
621 #define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " "
622 HLOGC(qslog.Debug,
623 log << "CSndQueue: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening)
624 << UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth)
625 << UST(Opened));
626 #undef UST
627
628 if (!u->m_bConnected || u->m_bBroken)
629 {
630 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
631 self->m_WorkerStats.lNotReadyPop++;
632 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
633 continue;
634 }
635
636 // pack a packet from the socket
637 CPacket pkt;
638 const std::pair<int, steady_clock::time_point> res_time = u->packData((pkt));
639
640 // Check if payload size is invalid.
641 if (res_time.first <= 0)
642 {
643 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
644 self->m_WorkerStats.lNotReadyPop++;
645 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
646 continue;
647 }
648
649 const sockaddr_any addr = u->m_PeerAddr;
650 const steady_clock::time_point next_send_time = res_time.second;
651 if (!is_zero(next_send_time))
652 self->m_pSndUList->update(u, CSndUList::DO_RESCHEDULE, next_send_time);
653
654 HLOGC(qslog.Debug, log << self->CONID() << "chn:SENDING: " << pkt.Info());
655 self->m_pChannel->sendto(addr, pkt);
656
657 #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
658 self->m_WorkerStats.lSendTo++;
659 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
660 }
661
662 THREAD_EXIT();
663 return NULL;
664 }
665
sendto(const sockaddr_any & w_addr,CPacket & w_packet)666 int srt::CSndQueue::sendto(const sockaddr_any& w_addr, CPacket& w_packet)
667 {
668 // send out the packet immediately (high priority), this is a control packet
669 m_pChannel->sendto(w_addr, w_packet);
670 return (int)w_packet.getLength();
671 }
672
673 //
CRcvUList()674 srt::CRcvUList::CRcvUList()
675 : m_pUList(NULL)
676 , m_pLast(NULL)
677 {
678 }
679
~CRcvUList()680 srt::CRcvUList::~CRcvUList() {}
681
insert(const CUDT * u)682 void srt::CRcvUList::insert(const CUDT* u)
683 {
684 CRNode* n = u->m_pRNode;
685 n->m_tsTimeStamp = steady_clock::now();
686
687 if (NULL == m_pUList)
688 {
689 // empty list, insert as the single node
690 n->m_pPrev = n->m_pNext = NULL;
691 m_pLast = m_pUList = n;
692
693 return;
694 }
695
696 // always insert at the end for RcvUList
697 n->m_pPrev = m_pLast;
698 n->m_pNext = NULL;
699 m_pLast->m_pNext = n;
700 m_pLast = n;
701 }
702
remove(const CUDT * u)703 void srt::CRcvUList::remove(const CUDT* u)
704 {
705 CRNode* n = u->m_pRNode;
706
707 if (!n->m_bOnList)
708 return;
709
710 if (NULL == n->m_pPrev)
711 {
712 // n is the first node
713 m_pUList = n->m_pNext;
714 if (NULL == m_pUList)
715 m_pLast = NULL;
716 else
717 m_pUList->m_pPrev = NULL;
718 }
719 else
720 {
721 n->m_pPrev->m_pNext = n->m_pNext;
722 if (NULL == n->m_pNext)
723 {
724 // n is the last node
725 m_pLast = n->m_pPrev;
726 }
727 else
728 n->m_pNext->m_pPrev = n->m_pPrev;
729 }
730
731 n->m_pNext = n->m_pPrev = NULL;
732 }
733
update(const CUDT * u)734 void srt::CRcvUList::update(const CUDT* u)
735 {
736 CRNode* n = u->m_pRNode;
737
738 if (!n->m_bOnList)
739 return;
740
741 n->m_tsTimeStamp = steady_clock::now();
742
743 // if n is the last node, do not need to change
744 if (NULL == n->m_pNext)
745 return;
746
747 if (NULL == n->m_pPrev)
748 {
749 m_pUList = n->m_pNext;
750 m_pUList->m_pPrev = NULL;
751 }
752 else
753 {
754 n->m_pPrev->m_pNext = n->m_pNext;
755 n->m_pNext->m_pPrev = n->m_pPrev;
756 }
757
758 n->m_pPrev = m_pLast;
759 n->m_pNext = NULL;
760 m_pLast->m_pNext = n;
761 m_pLast = n;
762 }
763
764 //
CHash()765 srt::CHash::CHash()
766 : m_pBucket(NULL)
767 , m_iHashSize(0)
768 {
769 }
770
~CHash()771 srt::CHash::~CHash()
772 {
773 for (int i = 0; i < m_iHashSize; ++i)
774 {
775 CBucket* b = m_pBucket[i];
776 while (NULL != b)
777 {
778 CBucket* n = b->m_pNext;
779 delete b;
780 b = n;
781 }
782 }
783
784 delete[] m_pBucket;
785 }
786
init(int size)787 void srt::CHash::init(int size)
788 {
789 m_pBucket = new CBucket*[size];
790
791 for (int i = 0; i < size; ++i)
792 m_pBucket[i] = NULL;
793
794 m_iHashSize = size;
795 }
796
lookup(int32_t id)797 srt::CUDT* srt::CHash::lookup(int32_t id)
798 {
799 // simple hash function (% hash table size); suitable for socket descriptors
800 CBucket* b = m_pBucket[id % m_iHashSize];
801
802 while (NULL != b)
803 {
804 if (id == b->m_iID)
805 return b->m_pUDT;
806 b = b->m_pNext;
807 }
808
809 return NULL;
810 }
811
insert(int32_t id,CUDT * u)812 void srt::CHash::insert(int32_t id, CUDT* u)
813 {
814 CBucket* b = m_pBucket[id % m_iHashSize];
815
816 CBucket* n = new CBucket;
817 n->m_iID = id;
818 n->m_pUDT = u;
819 n->m_pNext = b;
820
821 m_pBucket[id % m_iHashSize] = n;
822 }
823
remove(int32_t id)824 void srt::CHash::remove(int32_t id)
825 {
826 CBucket* b = m_pBucket[id % m_iHashSize];
827 CBucket* p = NULL;
828
829 while (NULL != b)
830 {
831 if (id == b->m_iID)
832 {
833 if (NULL == p)
834 m_pBucket[id % m_iHashSize] = b->m_pNext;
835 else
836 p->m_pNext = b->m_pNext;
837
838 delete b;
839
840 return;
841 }
842
843 p = b;
844 b = b->m_pNext;
845 }
846 }
847
848 //
CRendezvousQueue()849 srt::CRendezvousQueue::CRendezvousQueue()
850 : m_lRendezvousID()
851 , m_RIDListLock()
852 {
853 }
854
~CRendezvousQueue()855 srt::CRendezvousQueue::~CRendezvousQueue()
856 {
857 m_lRendezvousID.clear();
858 }
859
insert(const SRTSOCKET & id,CUDT * u,const sockaddr_any & addr,const steady_clock::time_point & ttl)860 void srt::CRendezvousQueue::insert(const SRTSOCKET& id,
861 CUDT* u,
862 const sockaddr_any& addr,
863 const steady_clock::time_point& ttl)
864 {
865 ScopedLock vg(m_RIDListLock);
866
867 CRL r;
868 r.m_iID = id;
869 r.m_pUDT = u;
870 r.m_PeerAddr = addr;
871 r.m_tsTTL = ttl;
872
873 m_lRendezvousID.push_back(r);
874 HLOGC(cnlog.Debug,
875 log << "RID: adding socket @" << id << " for address: " << addr.str() << " expires: " << FormatTime(ttl)
876 << " (total connectors: " << m_lRendezvousID.size() << ")");
877 }
878
remove(const SRTSOCKET & id)879 void srt::CRendezvousQueue::remove(const SRTSOCKET& id)
880 {
881 ScopedLock lkv(m_RIDListLock);
882
883 for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i)
884 {
885 if (i->m_iID == id)
886 {
887 m_lRendezvousID.erase(i);
888 break;
889 }
890 }
891 }
892
retrieve(const sockaddr_any & addr,SRTSOCKET & w_id) const893 srt::CUDT* srt::CRendezvousQueue::retrieve(const sockaddr_any& addr, SRTSOCKET& w_id) const
894 {
895 ScopedLock vg(m_RIDListLock);
896
897 // TODO: optimize search
898 for (list<CRL>::const_iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i)
899 {
900 if (i->m_PeerAddr == addr && ((w_id == 0) || (w_id == i->m_iID)))
901 {
902 HLOGC(cnlog.Debug,
903 log << "RID: found id @" << i->m_iID << " while looking for "
904 << (w_id ? "THIS ID FROM " : "A NEW CONNECTION FROM ") << i->m_PeerAddr.str());
905 w_id = i->m_iID;
906 return i->m_pUDT;
907 }
908 }
909
910 #if ENABLE_HEAVY_LOGGING
911 std::ostringstream spec;
912 if (w_id == 0)
913 spec << "A NEW CONNECTION REQUEST";
914 else
915 spec << " AGENT @" << w_id;
916 HLOGC(cnlog.Debug,
917 log << "RID: NO CONNECTOR FOR ADR:" << addr.str() << " while looking for " << spec.str() << " ("
918 << m_lRendezvousID.size() << " connectors total)");
919 #endif
920
921 return NULL;
922 }
923
updateConnStatus(EReadStatus rst,EConnectStatus cst,CUnit * unit)924 void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst, CUnit* unit)
925 {
926 vector<LinkStatusInfo> toRemove, toProcess;
927
928 const CPacket* pkt = unit ? &unit->m_Packet : NULL;
929
930 // Need a stub value for a case when there's no unit provided ("storage depleted" case).
931 // It should be normally NOT IN USE because in case of "storage depleted", rst != RST_OK.
932 const SRTSOCKET dest_id = pkt ? pkt->m_iID : 0;
933
934 // If no socket were qualified for further handling, finish here.
935 // Otherwise toRemove and toProcess contain items to handle.
936 if (!qualifyToHandle(rst, cst, dest_id, (toRemove), (toProcess)))
937 return;
938
939 HLOGC(cnlog.Debug,
940 log << "updateConnStatus: collected " << toProcess.size() << " for processing, " << toRemove.size()
941 << " to close");
942
943 // Repeat (resend) connection request.
944 for (vector<LinkStatusInfo>::iterator i = toProcess.begin(); i != toProcess.end(); ++i)
945 {
946 // IMPORTANT INFORMATION concerning changes towards UDT legacy.
947 // In the UDT code there was no attempt to interpret any incoming data.
948 // All data from the incoming packet were considered to be already deployed into
949 // m_ConnRes field, and m_ConnReq field was considered at this time accordingly updated.
950 // Therefore this procedure did only one thing: craft a new handshake packet and send it.
951 // In SRT this may also interpret extra data (extensions in case when Agent is Responder)
952 // and the `pktIn` packet may sometimes contain no data. Therefore the passed `rst`
953 // must be checked to distinguish the call by periodic update (RST_AGAIN) from a call
954 // due to have received the packet (RST_OK).
955 //
956 // In the below call, only the underlying `processRendezvous` function will be attempting
957 // to interpret these data (for caller-listener this was already done by `processConnectRequest`
958 // before calling this function), and it checks for the data presence.
959
960 EReadStatus read_st = rst;
961 EConnectStatus conn_st = cst;
962
963 if (i->id != dest_id)
964 {
965 read_st = RST_AGAIN;
966 conn_st = CONN_AGAIN;
967 }
968
969 HLOGC(cnlog.Debug,
970 log << "updateConnStatus: processing async conn for @" << i->id << " FROM " << i->peeraddr.str());
971
972 if (!i->u->processAsyncConnectRequest(read_st, conn_st, pkt, i->peeraddr))
973 {
974 // cst == CONN_REJECT can only be result of worker_ProcessAddressedPacket and
975 // its already set in this case.
976 LinkStatusInfo fi = *i;
977 fi.errorcode = SRT_ECONNREJ;
978 toRemove.push_back(fi);
979 i->u->sendCtrl(UMSG_SHUTDOWN);
980 }
981 }
982
983 // NOTE: it is "believed" here that all CUDT objects will not be
984 // deleted in the meantime. This is based on a statement that at worst
985 // they have been "just" declared failed and it will pass at least 1s until
986 // they are moved to ClosedSockets and it is believed that this function will
987 // not be held on mutexes that long.
988
989 for (vector<LinkStatusInfo>::iterator i = toRemove.begin(); i != toRemove.end(); ++i)
990 {
991 HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id);
992 //
993 // Setting m_bConnecting to false, and need to remove the socket from the rendezvous queue
994 // because the next CUDT::close will not remove it from the queue when m_bConnecting = false,
995 // and may crash on next pass.
996 //
997 // TODO: maybe lock i->u->m_ConnectionLock?
998 i->u->m_bConnecting = false;
999 remove(i->u->m_SocketID);
1000
1001 // DO NOT close the socket here because in this case it might be
1002 // unable to get status from at the right moment. Also only member
1003 // sockets should be taken care of internally - single sockets should
1004 // be normally closed by the application, after it is done with them.
1005
1006 // app can call any UDT API to learn the connection_broken error
1007 CUDT::s_UDTUnited.m_EPoll.update_events(
1008 i->u->m_SocketID, i->u->m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true);
1009
1010 i->u->completeBrokenConnectionDependencies(i->errorcode);
1011 }
1012
1013 {
1014 // Now, additionally for every failed link reset the TTL so that
1015 // they are set expired right now.
1016 ScopedLock vg(m_RIDListLock);
1017 for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i)
1018 {
1019 if (find_if(toRemove.begin(), toRemove.end(), LinkStatusInfo::HasID(i->m_iID)) != toRemove.end())
1020 {
1021 LOGC(cnlog.Error,
1022 log << "updateConnStatus: processAsyncConnectRequest FAILED on @" << i->m_iID
1023 << ". Setting TTL as EXPIRED.");
1024 i->m_tsTTL =
1025 steady_clock::time_point(); // Make it expire right now, will be picked up at the next iteration
1026 }
1027 }
1028 }
1029 }
1030
qualifyToHandle(EReadStatus rst,EConnectStatus cst SRT_ATR_UNUSED,int iDstSockID,vector<LinkStatusInfo> & toRemove,vector<LinkStatusInfo> & toProcess)1031 bool srt::CRendezvousQueue::qualifyToHandle(EReadStatus rst,
1032 EConnectStatus cst SRT_ATR_UNUSED,
1033 int iDstSockID,
1034 vector<LinkStatusInfo>& toRemove,
1035 vector<LinkStatusInfo>& toProcess)
1036 {
1037 ScopedLock vg(m_RIDListLock);
1038
1039 if (m_lRendezvousID.empty())
1040 return false; // nothing to process.
1041
1042 HLOGC(cnlog.Debug,
1043 log << "updateConnStatus: updating after getting pkt with DST socket ID @" << iDstSockID
1044 << " status: " << ConnectStatusStr(cst));
1045
1046 for (list<CRL>::iterator i = m_lRendezvousID.begin(), i_next = i; i != m_lRendezvousID.end(); i = i_next)
1047 {
1048 // Safe iterator to the next element. If the current element is erased, the iterator is updated again.
1049 ++i_next;
1050
1051 const steady_clock::time_point tsNow = steady_clock::now();
1052
1053 if (tsNow >= i->m_tsTTL)
1054 {
1055 HLOGC(cnlog.Debug,
1056 log << "RID: socket @" << i->m_iID
1057 << " removed - EXPIRED ("
1058 // The "enforced on FAILURE" is below when processAsyncConnectRequest failed.
1059 << (is_zero(i->m_tsTTL) ? "enforced on FAILURE" : "passed TTL") << "). WILL REMOVE from queue.");
1060
1061 // Set appropriate error information, but do not update yet.
1062 // Exit the lock first. Collect objects to update them later.
1063 int ccerror = SRT_ECONNREJ;
1064 if (i->m_pUDT->m_RejectReason == SRT_REJ_UNKNOWN)
1065 {
1066 if (!is_zero(i->m_tsTTL))
1067 {
1068 // Timer expired, set TIMEOUT forcefully
1069 i->m_pUDT->m_RejectReason = SRT_REJ_TIMEOUT;
1070 ccerror = SRT_ENOSERVER;
1071 }
1072 else
1073 {
1074 // In case of unknown reason, rejection should at least
1075 // suggest error on the peer
1076 i->m_pUDT->m_RejectReason = SRT_REJ_PEER;
1077 }
1078 }
1079
1080 // The call to completeBrokenConnectionDependencies() cannot happen here
1081 // under the lock of m_RIDListLock as it risks a deadlock.
1082 // Collect in 'toRemove' to update later.
1083 LinkStatusInfo fi = {i->m_pUDT, i->m_iID, ccerror, i->m_PeerAddr, -1};
1084 toRemove.push_back(fi);
1085
1086 // i_next was preincremented, but this is guaranteed to point to
1087 // the element next to erased one.
1088 i_next = m_lRendezvousID.erase(i);
1089 continue;
1090 }
1091 else
1092 {
1093 HLOGC(cnlog.Debug,
1094 log << "RID: socket @" << i->m_iID << " still active (remaining " << std::fixed
1095 << (count_microseconds(i->m_tsTTL - tsNow) / 1000000.0) << "s of TTL)...");
1096 }
1097
1098 const steady_clock::time_point tsLastReq = i->m_pUDT->m_tsLastReqTime;
1099 const steady_clock::time_point tsRepeat =
1100 tsLastReq + milliseconds_from(250); // Repeat connection request (send HS).
1101
1102 // A connection request is repeated every 250 ms if there was no response from the peer:
1103 // - RST_AGAIN means no packet was received over UDP.
1104 // - a packet was received, but not for THIS socket.
1105 if ((rst == RST_AGAIN || i->m_iID != iDstSockID) && tsNow <= tsRepeat)
1106 {
1107 HLOGC(cnlog.Debug,
1108 log << "RID:@" << i->m_iID << std::fixed << count_microseconds(tsNow - tsLastReq) / 1000.0
1109 << " ms passed since last connection request.");
1110
1111 continue;
1112 }
1113
1114 HLOGC(cnlog.Debug,
1115 log << "RID:@" << i->m_iID << " cst=" << ConnectStatusStr(cst) << " -- repeating connection request.");
1116
1117 // This queue is used only in case of Async mode (rendezvous or caller-listener).
1118 // Synchronous connection requests are handled in startConnect() completely.
1119 if (!i->m_pUDT->m_config.bSynRecving)
1120 {
1121 // Collect them so that they can be updated out of m_RIDListLock.
1122 LinkStatusInfo fi = {i->m_pUDT, i->m_iID, SRT_SUCCESS, i->m_PeerAddr, -1};
1123 toProcess.push_back(fi);
1124 }
1125 else
1126 {
1127 HLOGC(cnlog.Debug, log << "RID: socket @" << i->m_iID << " is SYNCHRONOUS, NOT UPDATING");
1128 }
1129 }
1130
1131 return !toRemove.empty() || !toProcess.empty();
1132 }
1133
1134 //
CRcvQueue()1135 srt::CRcvQueue::CRcvQueue()
1136 : m_WorkerThread()
1137 , m_UnitQueue()
1138 , m_pRcvUList(NULL)
1139 , m_pHash(NULL)
1140 , m_pChannel(NULL)
1141 , m_pTimer(NULL)
1142 , m_szPayloadSize()
1143 , m_bClosing(false)
1144 , m_LSLock()
1145 , m_pListener(NULL)
1146 , m_pRendezvousQueue(NULL)
1147 , m_vNewEntry()
1148 , m_IDLock()
1149 , m_mBuffer()
1150 , m_BufferCond()
1151 {
1152 setupCond(m_BufferCond, "QueueBuffer");
1153 }
1154
~CRcvQueue()1155 srt::CRcvQueue::~CRcvQueue()
1156 {
1157 m_bClosing = true;
1158
1159 if (m_WorkerThread.joinable())
1160 {
1161 HLOGC(rslog.Debug, log << "RcvQueue: EXIT");
1162 m_WorkerThread.join();
1163 }
1164 releaseCond(m_BufferCond);
1165
1166 delete m_pRcvUList;
1167 delete m_pHash;
1168 delete m_pRendezvousQueue;
1169
1170 // remove all queued messages
1171 for (map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++i)
1172 {
1173 while (!i->second.empty())
1174 {
1175 CPacket* pkt = i->second.front();
1176 delete[] pkt->m_pcData;
1177 delete pkt;
1178 i->second.pop();
1179 }
1180 }
1181 }
1182
1183 #if ENABLE_LOGGING
1184 int srt::CRcvQueue::m_counter = 0;
1185 #endif
1186
init(int qsize,size_t payload,int version,int hsize,CChannel * cc,CTimer * t)1187 void srt::CRcvQueue::init(int qsize, size_t payload, int version, int hsize, CChannel* cc, CTimer* t)
1188 {
1189 m_szPayloadSize = payload;
1190
1191 m_UnitQueue.init(qsize, (int)payload, version);
1192
1193 m_pHash = new CHash;
1194 m_pHash->init(hsize);
1195
1196 m_pChannel = cc;
1197 m_pTimer = t;
1198
1199 m_pRcvUList = new CRcvUList;
1200 m_pRendezvousQueue = new CRendezvousQueue;
1201
1202 #if ENABLE_LOGGING
1203 ++m_counter;
1204 const std::string thrname = "SRT:RcvQ:w" + Sprint(m_counter);
1205 #else
1206 const std::string thrname = "SRT:RcvQ:w";
1207 #endif
1208
1209 if (!StartThread(m_WorkerThread, CRcvQueue::worker, this, thrname.c_str()))
1210 {
1211 throw CUDTException(MJ_SYSTEMRES, MN_THREAD);
1212 }
1213 }
1214
worker(void * param)1215 void* srt::CRcvQueue::worker(void* param)
1216 {
1217 CRcvQueue* self = (CRcvQueue*)param;
1218 sockaddr_any sa(self->m_UnitQueue.getIPversion());
1219 int32_t id = 0;
1220
1221 #if ENABLE_LOGGING
1222 THREAD_STATE_INIT(("SRT:RcvQ:w" + Sprint(m_counter)).c_str());
1223 #else
1224 THREAD_STATE_INIT("SRT:RcvQ:worker");
1225 #endif
1226
1227 CUnit* unit = 0;
1228 EConnectStatus cst = CONN_AGAIN;
1229 while (!self->m_bClosing)
1230 {
1231 bool have_received = false;
1232 EReadStatus rst = self->worker_RetrieveUnit((id), (unit), (sa));
1233 if (rst == RST_OK)
1234 {
1235 if (id < 0)
1236 {
1237 // User error on peer. May log something, but generally can only ignore it.
1238 // XXX Think maybe about sending some "connection rejection response".
1239 HLOGC(qrlog.Debug,
1240 log << self->CONID() << "RECEIVED negative socket id '" << id
1241 << "', rejecting (POSSIBLE ATTACK)");
1242 continue;
1243 }
1244
1245 // NOTE: cst state is being changed here.
1246 // This state should be maintained through any next failed calls to worker_RetrieveUnit.
1247 // Any error switches this to rejection, just for a case.
1248
1249 // Note to rendezvous connection. This can accept:
1250 // - ID == 0 - take the first waiting rendezvous socket
1251 // - ID > 0 - find the rendezvous socket that has this ID.
1252 if (id == 0)
1253 {
1254 // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
1255 cst = self->worker_ProcessConnectionRequest(unit, sa);
1256 }
1257 else
1258 {
1259 // Otherwise ID is expected to be associated with:
1260 // - an enqueued rendezvous socket
1261 // - a socket connected to a peer
1262 cst = self->worker_ProcessAddressedPacket(id, unit, sa);
1263 // CAN RETURN CONN_REJECT, but m_RejectReason is already set
1264 }
1265 HLOGC(qrlog.Debug, log << self->CONID() << "worker: result for the unit: " << ConnectStatusStr(cst));
1266 if (cst == CONN_AGAIN)
1267 {
1268 HLOGC(qrlog.Debug, log << self->CONID() << "worker: packet not dispatched, continuing reading.");
1269 continue;
1270 }
1271 have_received = true;
1272 }
1273 else if (rst == RST_ERROR)
1274 {
1275 // According to the description by CChannel::recvfrom, this can be either of:
1276 // - IPE: all errors except EBADF
1277 // - socket was closed in the meantime by another thread: EBADF
1278 // If EBADF, then it's expected that the "closing" state is also set.
1279 // Check that just to report possible errors, but interrupt the loop anyway.
1280 if (self->m_bClosing)
1281 {
1282 HLOGC(qrlog.Debug,
1283 log << self->CONID() << "CChannel reported error, but Queue is closing - INTERRUPTING worker.");
1284 }
1285 else
1286 {
1287 LOGC(qrlog.Fatal,
1288 log << self->CONID()
1289 << "CChannel reported ERROR DURING TRANSMISSION - IPE. INTERRUPTING worker anyway.");
1290 }
1291 cst = CONN_REJECT;
1292 break;
1293 }
1294 // OTHERWISE: this is an "AGAIN" situation. No data was read, but the process should continue.
1295
1296 // take care of the timing event for all UDT sockets
1297 const steady_clock::time_point curtime_minus_syn =
1298 steady_clock::now() - microseconds_from(CUDT::COMM_SYN_INTERVAL_US);
1299
1300 CRNode* ul = self->m_pRcvUList->m_pUList;
1301 while ((NULL != ul) && (ul->m_tsTimeStamp < curtime_minus_syn))
1302 {
1303 CUDT* u = ul->m_pUDT;
1304
1305 if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
1306 {
1307 u->checkTimers();
1308 self->m_pRcvUList->update(u);
1309 }
1310 else
1311 {
1312 HLOGC(qrlog.Debug,
1313 log << CUDTUnited::CONID(u->m_SocketID) << " SOCKET broken, REMOVING FROM RCV QUEUE/MAP.");
1314 // the socket must be removed from Hash table first, then RcvUList
1315 self->m_pHash->remove(u->m_SocketID);
1316 self->m_pRcvUList->remove(u);
1317 u->m_pRNode->m_bOnList = false;
1318 }
1319
1320 ul = self->m_pRcvUList->m_pUList;
1321 }
1322
1323 if (have_received)
1324 {
1325 HLOGC(qrlog.Debug,
1326 log << "worker: RECEIVED PACKET --> updateConnStatus. cst=" << ConnectStatusStr(cst) << " id=" << id
1327 << " pkt-payload-size=" << unit->m_Packet.getLength());
1328 }
1329
1330 // Check connection requests status for all sockets in the RendezvousQueue.
1331 // Pass the connection status from the last call of:
1332 // worker_ProcessAddressedPacket --->
1333 // worker_TryAsyncRend_OrStore --->
1334 // CUDT::processAsyncConnectResponse --->
1335 // CUDT::processConnectResponse
1336 self->m_pRendezvousQueue->updateConnStatus(rst, cst, unit);
1337
1338 // XXX updateConnStatus may have removed the connector from the list,
1339 // however there's still m_mBuffer in CRcvQueue for that socket to care about.
1340 }
1341
1342 HLOGC(qrlog.Debug, log << "worker: EXIT");
1343
1344 THREAD_EXIT();
1345 return NULL;
1346 }
1347
worker_RetrieveUnit(int32_t & w_id,CUnit * & w_unit,sockaddr_any & w_addr)1348 EReadStatus srt::CRcvQueue::worker_RetrieveUnit(int32_t& w_id, CUnit*& w_unit, sockaddr_any& w_addr)
1349 {
1350 #if !USE_BUSY_WAITING
1351 // This might be not really necessary, and probably
1352 // not good for extensive bidirectional communication.
1353 m_pTimer->tick();
1354 #endif
1355
1356 // check waiting list, if new socket, insert it to the list
1357 while (ifNewEntry())
1358 {
1359 CUDT* ne = getNewEntry();
1360 if (ne)
1361 {
1362 HLOGC(qrlog.Debug,
1363 log << CUDTUnited::CONID(ne->m_SocketID)
1364 << " SOCKET pending for connection - ADDING TO RCV QUEUE/MAP");
1365 m_pRcvUList->insert(ne);
1366 m_pHash->insert(ne->m_SocketID, ne);
1367 }
1368 }
1369 // find next available slot for incoming packet
1370 w_unit = m_UnitQueue.getNextAvailUnit();
1371 if (!w_unit)
1372 {
1373 // no space, skip this packet
1374 CPacket temp;
1375 temp.m_pcData = new char[m_szPayloadSize];
1376 temp.setLength(m_szPayloadSize);
1377 THREAD_PAUSED();
1378 EReadStatus rst = m_pChannel->recvfrom((w_addr), (temp));
1379 THREAD_RESUMED();
1380 // Note: this will print nothing about the packet details unless heavy logging is on.
1381 LOGC(qrlog.Error, log << CONID() << "LOCAL STORAGE DEPLETED. Dropping 1 packet: " << temp.Info());
1382 delete[] temp.m_pcData;
1383
1384 // Be transparent for RST_ERROR, but ignore the correct
1385 // data read and fake that the packet was dropped.
1386 return rst == RST_ERROR ? RST_ERROR : RST_AGAIN;
1387 }
1388
1389 w_unit->m_Packet.setLength(m_szPayloadSize);
1390
1391 // reading next incoming packet, recvfrom returns -1 is nothing has been received
1392 THREAD_PAUSED();
1393 EReadStatus rst = m_pChannel->recvfrom((w_addr), (w_unit->m_Packet));
1394 THREAD_RESUMED();
1395
1396 if (rst == RST_OK)
1397 {
1398 w_id = w_unit->m_Packet.m_iID;
1399 HLOGC(qrlog.Debug,
1400 log << "INCOMING PACKET: FROM=" << w_addr.str() << " BOUND=" << m_pChannel->bindAddressAny().str() << " "
1401 << w_unit->m_Packet.Info());
1402 }
1403 return rst;
1404 }
1405
worker_ProcessConnectionRequest(CUnit * unit,const sockaddr_any & addr)1406 EConnectStatus srt::CRcvQueue::worker_ProcessConnectionRequest(CUnit* unit, const sockaddr_any& addr)
1407 {
1408 HLOGC(cnlog.Debug,
1409 log << "Got sockID=0 from " << addr.str() << " - trying to resolve it as a connection request...");
1410 // Introduced protection because it may potentially happen
1411 // that another thread could have closed the socket at
1412 // the same time and inject a bug between checking the
1413 // pointer for NULL and using it.
1414 int listener_ret = SRT_REJ_UNKNOWN;
1415 bool have_listener = false;
1416 {
1417 ScopedLock cg(m_LSLock);
1418 if (m_pListener)
1419 {
1420 LOGC(cnlog.Note, log << "PASSING request from: " << addr.str() << " to agent:" << m_pListener->socketID());
1421 listener_ret = m_pListener->processConnectRequest(addr, unit->m_Packet);
1422
1423 // This function does return a code, but it's hard to say as to whether
1424 // anything can be done about it. In case when it's stated possible, the
1425 // listener will try to send some rejection response to the caller, but
1426 // that's already done inside this function. So it's only used for
1427 // displaying the error in logs.
1428
1429 have_listener = true;
1430 }
1431 }
1432
1433 // NOTE: Rendezvous sockets do bind(), but not listen(). It means that the socket is
1434 // ready to accept connection requests, but they are not being redirected to the listener
1435 // socket, as this is not a listener socket at all. This goes then HERE.
1436
1437 if (have_listener) // That is, the above block with m_pListener->processConnectRequest was executed
1438 {
1439 LOGC(cnlog.Note,
1440 log << CONID() << "Listener managed the connection request from: " << addr.str()
1441 << " result:" << RequestTypeStr(UDTRequestType(listener_ret)));
1442 return listener_ret == SRT_REJ_UNKNOWN ? CONN_CONTINUE : CONN_REJECT;
1443 }
1444
1445 // If there's no listener waiting for the packet, just store it into the queue.
1446 return worker_TryAsyncRend_OrStore(0, unit, addr); // 0 id because the packet came in with that very ID.
1447 }
1448
worker_ProcessAddressedPacket(int32_t id,CUnit * unit,const sockaddr_any & addr)1449 EConnectStatus srt::CRcvQueue::worker_ProcessAddressedPacket(int32_t id, CUnit* unit, const sockaddr_any& addr)
1450 {
1451 CUDT* u = m_pHash->lookup(id);
1452 if (!u)
1453 {
1454 // Pass this to either async rendezvous connection,
1455 // or store the packet in the queue.
1456 HLOGC(cnlog.Debug, log << "worker_ProcessAddressedPacket: resending to QUEUED socket @" << id);
1457 return worker_TryAsyncRend_OrStore(id, unit, addr);
1458 }
1459
1460 // Found associated CUDT - process this as control or data packet
1461 // addressed to an associated socket.
1462 if (addr != u->m_PeerAddr)
1463 {
1464 HLOGC(cnlog.Debug,
1465 log << CONID() << "Packet for SID=" << id << " asoc with " << u->m_PeerAddr.str() << " received from "
1466 << addr.str() << " (CONSIDERED ATTACK ATTEMPT)");
1467 // This came not from the address that is the peer associated
1468 // with the socket. Ignore it.
1469 return CONN_AGAIN;
1470 }
1471
1472 if (!u->m_bConnected || u->m_bBroken || u->m_bClosing)
1473 {
1474 u->m_RejectReason = SRT_REJ_CLOSE;
1475 // The socket is currently in the process of being disconnected
1476 // or destroyed. Ignore.
1477 // XXX send UMSG_SHUTDOWN in this case?
1478 // XXX May it require mutex protection?
1479 return CONN_REJECT;
1480 }
1481
1482 if (unit->m_Packet.isControl())
1483 u->processCtrl(unit->m_Packet);
1484 else
1485 u->processData(unit);
1486
1487 u->checkTimers();
1488 m_pRcvUList->update(u);
1489
1490 return CONN_RUNNING;
1491 }
1492
1493 // This function responds to the fact that a packet has come
1494 // for a socket that does not expect to receive a normal connection
1495 // request. This can be then:
1496 // - a normal packet of whatever kind, just to be processed by the message loop
1497 // - a rendezvous connection
1498 // This function then tries to manage the packet as a rendezvous connection
1499 // request in ASYNC mode; when this is not applicable, it stores the packet
1500 // in the "receiving queue" so that it will be picked up in the "main" thread.
worker_TryAsyncRend_OrStore(int32_t id,CUnit * unit,const sockaddr_any & addr)1501 EConnectStatus srt::CRcvQueue::worker_TryAsyncRend_OrStore(int32_t id, CUnit* unit, const sockaddr_any& addr)
1502 {
1503 // This 'retrieve' requires that 'id' be either one of those
1504 // stored in the rendezvous queue (see CRcvQueue::registerConnector)
1505 // or simply 0, but then at least the address must match one of these.
1506 // If the id was 0, it will be set to the actual socket ID of the returned CUDT.
1507 CUDT* u = m_pRendezvousQueue->retrieve(addr, (id));
1508 if (!u)
1509 {
1510 // this socket is then completely unknown to the system.
1511 // Note that this situation may also happen at a very unfortunate
1512 // coincidence that the socket is already bound, but the registerConnector()
1513 // has not yet started. In case of rendezvous this may mean that the other
1514 // side just started sending its handshake packets, the local side has already
1515 // run the CRcvQueue::worker thread, and this worker thread is trying to dispatch
1516 // the handshake packet too early, before the dispatcher has a chance to see
1517 // this socket registerred in the RendezvousQueue, which causes the packet unable
1518 // to be dispatched. Therefore simply treat every "out of band" packet (with socket
1519 // not belonging to the connection and not registered as rendezvous) as "possible
1520 // attack" and ignore it. This also should better protect the rendezvous socket
1521 // against a rogue connector.
1522 if (id == 0)
1523 {
1524 HLOGC(cnlog.Debug,
1525 log << CONID() << "AsyncOrRND: no sockets expect connection from " << addr.str()
1526 << " - POSSIBLE ATTACK, ignore packet");
1527 }
1528 else
1529 {
1530 HLOGC(cnlog.Debug,
1531 log << CONID() << "AsyncOrRND: no sockets expect socket " << id << " from " << addr.str()
1532 << " - POSSIBLE ATTACK, ignore packet");
1533 }
1534 return CONN_AGAIN; // This means that the packet should be ignored.
1535 }
1536
1537 // asynchronous connect: call connect here
1538 // otherwise wait for the UDT socket to retrieve this packet
1539 if (!u->m_config.bSynRecving)
1540 {
1541 HLOGC(cnlog.Debug, log << "AsyncOrRND: packet RESOLVED TO @" << id << " -- continuing as ASYNC CONNECT");
1542 // This is practically same as processConnectResponse, just this applies
1543 // appropriate mutex lock - which can't be done here because it's intentionally private.
1544 // OTOH it can't be applied to processConnectResponse because the synchronous
1545 // call to this method applies the lock by itself, and same-thread-double-locking is nonportable (crashable).
1546 EConnectStatus cst = u->processAsyncConnectResponse(unit->m_Packet);
1547
1548 if (cst == CONN_CONFUSED)
1549 {
1550 LOGC(cnlog.Warn, log << "AsyncOrRND: PACKET NOT HANDSHAKE - re-requesting handshake from peer");
1551 storePkt(id, unit->m_Packet.clone());
1552 if (!u->processAsyncConnectRequest(RST_AGAIN, CONN_CONTINUE, &unit->m_Packet, u->m_PeerAddr))
1553 {
1554 // Reuse previous behavior to reject a packet
1555 cst = CONN_REJECT;
1556 }
1557 else
1558 {
1559 cst = CONN_CONTINUE;
1560 }
1561 }
1562
1563 // It might be that this is a data packet, which has turned the connection
1564 // into "connected" state, removed the connector (so since now every next packet
1565 // will land directly in the queue), but this data packet shall still be delivered.
1566 if (cst == CONN_ACCEPT && !unit->m_Packet.isControl())
1567 {
1568 // The process as called through processAsyncConnectResponse() should have put the
1569 // socket into the pending queue for pending connection (don't ask me, this is so).
1570 // This pending queue is being purged every time in the beginning of this loop, so
1571 // currently the socket is in the pending queue, but not yet in the connection queue.
1572 // It will be done at the next iteration of the reading loop, but it will be too late,
1573 // we have a pending data packet now and we must either dispatch it to an already connected
1574 // socket or disregard it, and rather prefer the former. So do this transformation now
1575 // that we KNOW (by the cst == CONN_ACCEPT result) that the socket should be inserted
1576 // into the pending anteroom.
1577
1578 CUDT* ne = getNewEntry(); // This function actuall removes the entry and returns it.
1579 // This **should** now always return a non-null value, but check it first
1580 // because if this accidentally isn't true, the call to worker_ProcessAddressedPacket will
1581 // result in redirecting it to here and so on until the call stack overflow. In case of
1582 // this "accident" simply disregard the packet from any further processing, it will be later
1583 // loss-recovered.
1584 // XXX (Probably the old contents of UDT's CRcvQueue::worker should be shaped a little bit
1585 // differently throughout the functions).
1586 if (ne)
1587 {
1588 HLOGC(cnlog.Debug,
1589 log << CUDTUnited::CONID(ne->m_SocketID)
1590 << " SOCKET pending for connection - ADDING TO RCV QUEUE/MAP");
1591 m_pRcvUList->insert(ne);
1592 m_pHash->insert(ne->m_SocketID, ne);
1593
1594 // The current situation is that this has passed processAsyncConnectResponse, but actually
1595 // this packet *SHOULD HAVE BEEN* handled by worker_ProcessAddressedPacket, however the
1596 // connection state wasn't completed at the moment when dispatching this packet. This has
1597 // been now completed inside the call to processAsyncConnectResponse, but this is still a
1598 // data packet that should have expected the connection to be already established. Therefore
1599 // redirect it once again into worker_ProcessAddressedPacket here.
1600
1601 HLOGC(cnlog.Debug,
1602 log << "AsyncOrRND: packet SWITCHED TO CONNECTED with ID=" << id
1603 << " -- passing to worker_ProcessAddressedPacket");
1604
1605 // Theoretically we should check if m_pHash->lookup(ne->m_SocketID) returns 'ne', but this
1606 // has been just added to m_pHash, so the check would be extremely paranoid here.
1607 cst = worker_ProcessAddressedPacket(id, unit, addr);
1608 if (cst == CONN_REJECT)
1609 return cst;
1610 return CONN_ACCEPT; // this function usually will return CONN_CONTINUE, which doesn't represent current
1611 // situation.
1612 }
1613 else
1614 {
1615 LOGC(cnlog.Error,
1616 log << "IPE: AsyncOrRND: packet SWITCHED TO CONNECTED, but ID=" << id
1617 << " is still not present in the socket ID dispatch hash - DISREGARDING");
1618 }
1619 }
1620 return cst;
1621 }
1622 HLOGC(cnlog.Debug,
1623 log << "AsyncOrRND: packet RESOLVED TO ID=" << id << " -- continuing through CENTRAL PACKET QUEUE");
1624 // This is where also the packets for rendezvous connection will be landing,
1625 // in case of a synchronous connection.
1626 storePkt(id, unit->m_Packet.clone());
1627
1628 return CONN_CONTINUE;
1629 }
1630
stopWorker()1631 void srt::CRcvQueue::stopWorker()
1632 {
1633 // We use the decent way, so we say to the thread "please exit".
1634 m_bClosing = true;
1635
1636 // Sanity check of the function's affinity.
1637 if (srt::sync::this_thread::get_id() == m_WorkerThread.get_id())
1638 {
1639 LOGC(rslog.Error, log << "IPE: RcvQ:WORKER TRIES TO CLOSE ITSELF!");
1640 return; // do nothing else, this would cause a hangup or crash.
1641 }
1642
1643 HLOGC(rslog.Debug, log << "RcvQueue: EXIT (forced)");
1644 // And we trust the thread that it does.
1645 m_WorkerThread.join();
1646 }
1647
recvfrom(int32_t id,CPacket & w_packet)1648 int srt::CRcvQueue::recvfrom(int32_t id, CPacket& w_packet)
1649 {
1650 UniqueLock bufferlock(m_BufferLock);
1651 CSync buffercond(m_BufferCond, bufferlock);
1652
1653 map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
1654
1655 if (i == m_mBuffer.end())
1656 {
1657 THREAD_PAUSED();
1658 buffercond.wait_for(seconds_from(1));
1659 THREAD_RESUMED();
1660
1661 i = m_mBuffer.find(id);
1662 if (i == m_mBuffer.end())
1663 {
1664 w_packet.setLength(-1);
1665 return -1;
1666 }
1667 }
1668
1669 // retrieve the earliest packet
1670 CPacket* newpkt = i->second.front();
1671
1672 if (w_packet.getLength() < newpkt->getLength())
1673 {
1674 w_packet.setLength(-1);
1675 return -1;
1676 }
1677
1678 // copy packet content
1679 // XXX Check if this wouldn't be better done by providing
1680 // copy constructor for DynamicStruct.
1681 // XXX Another thing: this looks wasteful. This expects an already
1682 // allocated memory on the packet, this thing gets the packet,
1683 // copies it into the passed packet and then the source packet
1684 // gets deleted. Why not simply return the originally stored packet,
1685 // without copying, allocation and deallocation?
1686 memcpy((w_packet.m_nHeader), newpkt->m_nHeader, CPacket::HDR_SIZE);
1687 memcpy((w_packet.m_pcData), newpkt->m_pcData, newpkt->getLength());
1688 w_packet.setLength(newpkt->getLength());
1689
1690 delete[] newpkt->m_pcData;
1691 delete newpkt;
1692
1693 // remove this message from queue,
1694 // if no more messages left for this socket, release its data structure
1695 i->second.pop();
1696 if (i->second.empty())
1697 m_mBuffer.erase(i);
1698
1699 return (int)w_packet.getLength();
1700 }
1701
setListener(CUDT * u)1702 int srt::CRcvQueue::setListener(CUDT* u)
1703 {
1704 ScopedLock lslock(m_LSLock);
1705
1706 if (NULL != m_pListener)
1707 return -1;
1708
1709 m_pListener = u;
1710 return 0;
1711 }
1712
removeListener(const CUDT * u)1713 void srt::CRcvQueue::removeListener(const CUDT* u)
1714 {
1715 ScopedLock lslock(m_LSLock);
1716
1717 if (u == m_pListener)
1718 m_pListener = NULL;
1719 }
1720
registerConnector(const SRTSOCKET & id,CUDT * u,const sockaddr_any & addr,const steady_clock::time_point & ttl)1721 void srt::CRcvQueue::registerConnector(const SRTSOCKET& id,
1722 CUDT* u,
1723 const sockaddr_any& addr,
1724 const steady_clock::time_point& ttl)
1725 {
1726 HLOGC(cnlog.Debug,
1727 log << "registerConnector: adding @" << id << " addr=" << addr.str() << " TTL=" << FormatTime(ttl));
1728 m_pRendezvousQueue->insert(id, u, addr, ttl);
1729 }
1730
removeConnector(const SRTSOCKET & id)1731 void srt::CRcvQueue::removeConnector(const SRTSOCKET& id)
1732 {
1733 HLOGC(cnlog.Debug, log << "removeConnector: removing @" << id);
1734 m_pRendezvousQueue->remove(id);
1735
1736 ScopedLock bufferlock(m_BufferLock);
1737
1738 map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
1739 if (i != m_mBuffer.end())
1740 {
1741 HLOGC(cnlog.Debug,
1742 log << "removeConnector: ... and its packet queue with " << i->second.size() << " packets collected");
1743 while (!i->second.empty())
1744 {
1745 delete[] i->second.front()->m_pcData;
1746 delete i->second.front();
1747 i->second.pop();
1748 }
1749 m_mBuffer.erase(i);
1750 }
1751 }
1752
setNewEntry(CUDT * u)1753 void srt::CRcvQueue::setNewEntry(CUDT* u)
1754 {
1755 HLOGC(cnlog.Debug, log << CUDTUnited::CONID(u->m_SocketID) << "setting socket PENDING FOR CONNECTION");
1756 ScopedLock listguard(m_IDLock);
1757 m_vNewEntry.push_back(u);
1758 }
1759
ifNewEntry()1760 bool srt::CRcvQueue::ifNewEntry()
1761 {
1762 return !(m_vNewEntry.empty());
1763 }
1764
getNewEntry()1765 srt::CUDT* srt::CRcvQueue::getNewEntry()
1766 {
1767 ScopedLock listguard(m_IDLock);
1768
1769 if (m_vNewEntry.empty())
1770 return NULL;
1771
1772 CUDT* u = (CUDT*)*(m_vNewEntry.begin());
1773 m_vNewEntry.erase(m_vNewEntry.begin());
1774
1775 return u;
1776 }
1777
storePkt(int32_t id,CPacket * pkt)1778 void srt::CRcvQueue::storePkt(int32_t id, CPacket* pkt)
1779 {
1780 UniqueLock bufferlock(m_BufferLock);
1781 CSync passcond(m_BufferCond, bufferlock);
1782
1783 map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
1784
1785 if (i == m_mBuffer.end())
1786 {
1787 m_mBuffer[id].push(pkt);
1788 passcond.signal_locked(bufferlock);
1789 }
1790 else
1791 {
1792 // avoid storing too many packets, in case of malfunction or attack
1793 if (i->second.size() > 16)
1794 return;
1795
1796 i->second.push(pkt);
1797 }
1798 }
1799
destroy()1800 void srt::CMultiplexer::destroy()
1801 {
1802 // Reverse order of the assigned
1803 delete m_pRcvQueue;
1804 delete m_pSndQueue;
1805 delete m_pTimer;
1806
1807 if (m_pChannel)
1808 {
1809 m_pChannel->close();
1810 delete m_pChannel;
1811 }
1812 }
1813