1 /*
2 * SRT - Secure, Reliable, Transport
3 * Copyright (c) 2018 Haivision Systems Inc.
4 *
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 *
9 */
10
11 /*****************************************************************************
12 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
13 All rights reserved.
14
15 Redistribution and use in source and binary forms, with or without
16 modification, are permitted provided that the following conditions are
17 met:
18
19 * Redistributions of source code must retain the above
20 copyright notice, this list of conditions and the
21 following disclaimer.
22
23 * Redistributions in binary form must reproduce the
24 above copyright notice, this list of conditions
25 and the following disclaimer in the documentation
26 and/or other materials provided with the distribution.
27
28 * Neither the name of the University of Illinois
29 nor the names of its contributors may be used to
30 endorse or promote products derived from this
31 software without specific prior written permission.
32
33 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
34 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
35 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
36 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
37 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
38 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
39 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
40 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
41 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
42 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
43 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
44 *****************************************************************************/
45
46 /*****************************************************************************
47 written by
48 Yunhong Gu, last updated 07/09/2011
49 modified by
50 Haivision Systems Inc.
51 *****************************************************************************/
52
53 #include "platform_sys.h"
54
55 #include <exception>
56 #include <stdexcept>
57 #include <typeinfo>
58 #include <iterator>
59 #include <vector>
60
61 #include <cstring>
62 #include "utilities.h"
63 #include "netinet_any.h"
64 #include "api.h"
65 #include "core.h"
66 #include "epoll.h"
67 #include "logging.h"
68 #include "threadname.h"
69 #include "srt.h"
70 #include "udt.h"
71
72 #ifdef _WIN32
73 #include <win/wintime.h>
74 #endif
75
76 #ifdef _MSC_VER
77 #pragma warning(error: 4530)
78 #endif
79
80 using namespace std;
81 using namespace srt_logging;
82 using namespace srt::sync;
83 extern LogConfig srt_logger_config;
84
85
construct()86 void srt::CUDTSocket::construct()
87 {
88 #if ENABLE_EXPERIMENTAL_BONDING
89 m_GroupOf = NULL;
90 m_GroupMemberData = NULL;
91 #endif
92 setupMutex(m_AcceptLock, "Accept");
93 setupCond(m_AcceptCond, "Accept");
94 setupMutex(m_ControlLock, "Control");
95 }
96
~CUDTSocket()97 srt::CUDTSocket::~CUDTSocket()
98 {
99 releaseMutex(m_AcceptLock);
100 releaseCond(m_AcceptCond);
101 releaseMutex(m_ControlLock);
102 }
103
104
getStatus()105 SRT_SOCKSTATUS srt::CUDTSocket::getStatus()
106 {
107 // TTL in CRendezvousQueue::updateConnStatus() will set m_bConnecting to false.
108 // Although m_Status is still SRTS_CONNECTING, the connection is in fact to be closed due to TTL expiry.
109 // In this case m_bConnected is also false. Both checks are required to avoid hitting
110 // a regular state transition from CONNECTING to CONNECTED.
111
112 if (m_UDT.m_bBroken)
113 return SRTS_BROKEN;
114
115 // Connecting timed out
116 if ((m_Status == SRTS_CONNECTING) && !m_UDT.m_bConnecting && !m_UDT.m_bConnected)
117 return SRTS_BROKEN;
118
119 return m_Status;
120 }
121
122 // [[using locked(m_GlobControlLock)]]
breakSocket_LOCKED()123 void srt::CUDTSocket::breakSocket_LOCKED()
124 {
125 // This function is intended to be called from GC,
126 // under a lock of m_GlobControlLock.
127 m_UDT.m_bBroken = true;
128 m_UDT.m_iBrokenCounter = 0;
129 HLOGC(smlog.Debug, log << "@" << m_SocketID << " CLOSING AS SOCKET");
130 m_UDT.closeInternal();
131 setClosed();
132 }
133
setClosed()134 void srt::CUDTSocket::setClosed()
135 {
136 m_Status = SRTS_CLOSED;
137
138 // a socket will not be immediately removed when it is closed
139 // in order to prevent other methods from accessing invalid address
140 // a timer is started and the socket will be removed after approximately
141 // 1 second
142 m_tsClosureTimeStamp = steady_clock::now();
143 }
144
setBrokenClosed()145 void srt::CUDTSocket::setBrokenClosed()
146 {
147 m_UDT.m_iBrokenCounter = 60;
148 m_UDT.m_bBroken = true;
149 setClosed();
150 }
151
readReady()152 bool srt::CUDTSocket::readReady()
153 {
154 if (m_UDT.m_bConnected && m_UDT.m_pRcvBuffer->isRcvDataReady())
155 return true;
156 if (m_UDT.m_bListening)
157 return !m_QueuedSockets.empty();
158
159 return broken();
160 }
161
writeReady() const162 bool srt::CUDTSocket::writeReady() const
163 {
164 return (m_UDT.m_bConnected
165 && (m_UDT.m_pSndBuffer->getCurrBufSize() < m_UDT.m_config.iSndBufSize))
166 || broken();
167 }
168
broken() const169 bool srt::CUDTSocket::broken() const
170 {
171 return m_UDT.m_bBroken || !m_UDT.m_bConnected;
172 }
173
174 ////////////////////////////////////////////////////////////////////////////////
175
CUDTUnited()176 srt::CUDTUnited::CUDTUnited():
177 m_Sockets(),
178 m_GlobControlLock(),
179 m_IDLock(),
180 m_mMultiplexer(),
181 m_MultiplexerLock(),
182 m_pCache(NULL),
183 m_bClosing(false),
184 m_GCStopCond(),
185 m_InitLock(),
186 m_iInstanceCount(0),
187 m_bGCStatus(false),
188 m_ClosedSockets()
189 {
190 // Socket ID MUST start from a random value
191 m_SocketIDGenerator = genRandomInt(1, MAX_SOCKET_VAL);
192 m_SocketIDGenerator_init = m_SocketIDGenerator;
193
194 // XXX An unlikely exception thrown from the below calls
195 // might destroy the application before `main`. This shouldn't
196 // be a problem in general.
197 setupMutex(m_GlobControlLock, "GlobControl");
198 setupMutex(m_IDLock, "ID");
199 setupMutex(m_InitLock, "Init");
200
201 m_pCache = new CCache<CInfoBlock>;
202 }
203
~CUDTUnited()204 srt::CUDTUnited::~CUDTUnited()
205 {
206 // Call it if it wasn't called already.
207 // This will happen at the end of main() of the application,
208 // when the user didn't call srt_cleanup().
209 if (m_bGCStatus)
210 {
211 cleanup();
212 }
213
214 releaseMutex(m_GlobControlLock);
215 releaseMutex(m_IDLock);
216 releaseMutex(m_InitLock);
217
218 delete m_pCache;
219 }
220
CONID(SRTSOCKET sock)221 string srt::CUDTUnited::CONID(SRTSOCKET sock)
222 {
223 if ( sock == 0 )
224 return "";
225
226 std::ostringstream os;
227 os << "@" << sock << ":";
228 return os.str();
229 }
230
startup()231 int srt::CUDTUnited::startup()
232 {
233 ScopedLock gcinit(m_InitLock);
234
235 if (m_iInstanceCount++ > 0)
236 return 1;
237
238 // Global initialization code
239 #ifdef _WIN32
240 WORD wVersionRequested;
241 WSADATA wsaData;
242 wVersionRequested = MAKEWORD(2, 2);
243
244 if (0 != WSAStartup(wVersionRequested, &wsaData))
245 throw CUDTException(MJ_SETUP, MN_NONE, WSAGetLastError());
246 #endif
247
248 PacketFilter::globalInit();
249
250 if (m_bGCStatus)
251 return 1;
252
253 m_bClosing = false;
254
255 try
256 {
257 setupMutex(m_GCStopLock, "GCStop");
258 setupCond(m_GCStopCond, "GCStop");
259 }
260 catch (...)
261 {
262 return -1;
263 }
264 if (!StartThread(m_GCThread, garbageCollect, this, "SRT:GC"))
265 return -1;
266
267 m_bGCStatus = true;
268
269 HLOGC(inlog.Debug, log << "SRT Clock Type: " << SRT_SYNC_CLOCK_STR);
270
271 return 0;
272 }
273
cleanup()274 int srt::CUDTUnited::cleanup()
275 {
276 // IMPORTANT!!!
277 // In this function there must be NO LOGGING AT ALL. This function may
278 // potentially be called from within the global program destructor, and
279 // therefore some of the facilities used by the logging system - including
280 // the default std::cerr object bound to it by default, but also a different
281 // stream that the user's app has bound to it, and which got destroyed
282 // together with already exited main() - may be already deleted when
283 // executing this procedure.
284 ScopedLock gcinit(m_InitLock);
285
286 if (--m_iInstanceCount > 0)
287 return 0;
288
289 if (!m_bGCStatus)
290 return 0;
291
292 m_bClosing = true;
293 // NOTE: we can do relaxed signaling here because
294 // waiting on m_GCStopCond has a 1-second timeout,
295 // after which the m_bClosing flag is cheched, which
296 // is set here above. Worst case secenario, this
297 // pthread_join() call will block for 1 second.
298 CSync::signal_relaxed(m_GCStopCond);
299 m_GCThread.join();
300
301 // XXX There's some weird bug here causing this
302 // to hangup on Windows. This might be either something
303 // bigger, or some problem in pthread-win32. As this is
304 // the application cleanup section, this can be temporarily
305 // tolerated with simply exit the application without cleanup,
306 // counting on that the system will take care of it anyway.
307 #ifndef _WIN32
308 releaseCond(m_GCStopCond);
309 #endif
310
311 m_bGCStatus = false;
312
313 // Global destruction code
314 #ifdef _WIN32
315 WSACleanup();
316 #endif
317
318 return 0;
319 }
320
generateSocketID(bool for_group)321 SRTSOCKET srt::CUDTUnited::generateSocketID(bool for_group)
322 {
323 ScopedLock guard(m_IDLock);
324
325 int sockval = m_SocketIDGenerator - 1;
326
327 // First problem: zero-value should be avoided by various reasons.
328
329 if (sockval <= 0)
330 {
331 // We have a rollover on the socket value, so
332 // definitely we haven't made the Columbus mistake yet.
333 m_SocketIDGenerator = MAX_SOCKET_VAL;
334 }
335
336 // Check all sockets if any of them has this value.
337 // Socket IDs are begin created this way:
338 //
339 // Initial random
340 // |
341 // |
342 // |
343 // |
344 // ...
345 // The only problem might be if the number rolls over
346 // and reaches the same value from the opposite side.
347 // This is still a valid socket value, but this time
348 // we have to check, which sockets have been used already.
349 if ( sockval == m_SocketIDGenerator_init )
350 {
351 // Mark that since this point on the checks for
352 // whether the socket ID is in use must be done.
353 m_SocketIDGenerator_init = 0;
354 }
355
356 // This is when all socket numbers have been already used once.
357 // This may happen after many years of running an application
358 // constantly when the connection breaks and gets restored often.
359 if ( m_SocketIDGenerator_init == 0 )
360 {
361 int startval = sockval;
362 for (;;) // Roll until an unused value is found
363 {
364 enterCS(m_GlobControlLock);
365 const bool exists =
366 #if ENABLE_EXPERIMENTAL_BONDING
367 for_group
368 ? m_Groups.count(sockval | SRTGROUP_MASK)
369 :
370 #endif
371 m_Sockets.count(sockval);
372 leaveCS(m_GlobControlLock);
373
374 if (exists)
375 {
376 // The socket value is in use.
377 --sockval;
378 if (sockval <= 0)
379 sockval = MAX_SOCKET_VAL;
380
381 // Before continuing, check if we haven't rolled back to start again
382 // This is virtually impossible, so just make an RTI error.
383 if (sockval == startval)
384 {
385 // Of course, we don't lack memory, but actually this is so impossible
386 // that a complete memory extinction is much more possible than this.
387 // So treat this rather as a formal fallback for something that "should
388 // never happen". This should make the socket creation functions, from
389 // socket_create and accept, return this error.
390
391 m_SocketIDGenerator = sockval+1; // so that any next call will cause the same error
392 throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
393 }
394
395 // try again, if this is a free socket
396 continue;
397 }
398
399 // No socket found, this ID is free to use
400 m_SocketIDGenerator = sockval;
401 break;
402 }
403 }
404 else
405 {
406 m_SocketIDGenerator = sockval;
407 }
408
409 // The socket value counter remains with the value rolled
410 // without the group bit set; only the returned value may have
411 // the group bit set.
412
413 if (for_group)
414 sockval = m_SocketIDGenerator | SRTGROUP_MASK;
415 else
416 sockval = m_SocketIDGenerator;
417
418 LOGC(smlog.Debug, log << "generateSocketID: " << (for_group ? "(group)" : "") << ": @" << sockval);
419
420 return sockval;
421 }
422
newSocket(CUDTSocket ** pps)423 SRTSOCKET srt::CUDTUnited::newSocket(CUDTSocket** pps)
424 {
425 // XXX consider using some replacement of std::unique_ptr
426 // so that exceptions will clean up the object without the
427 // need for a dedicated code.
428 CUDTSocket* ns = NULL;
429
430 try
431 {
432 ns = new CUDTSocket;
433 }
434 catch (...)
435 {
436 delete ns;
437 throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
438 }
439
440 try
441 {
442 ns->m_SocketID = generateSocketID();
443 }
444 catch (...)
445 {
446 delete ns;
447 throw;
448 }
449 ns->m_Status = SRTS_INIT;
450 ns->m_ListenSocket = 0;
451 ns->core().m_SocketID = ns->m_SocketID;
452 ns->core().m_pCache = m_pCache;
453
454 try
455 {
456 HLOGC(smlog.Debug, log << CONID(ns->m_SocketID)
457 << "newSocket: mapping socket "
458 << ns->m_SocketID);
459
460 // protect the m_Sockets structure.
461 ScopedLock cs(m_GlobControlLock);
462 m_Sockets[ns->m_SocketID] = ns;
463 }
464 catch (...)
465 {
466 //failure and rollback
467 delete ns;
468 ns = NULL;
469 }
470
471 if (!ns)
472 throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
473
474 if (pps)
475 *pps = ns;
476
477 return ns->m_SocketID;
478 }
479
newConnection(const SRTSOCKET listen,const sockaddr_any & peer,const CPacket & hspkt,CHandShake & w_hs,int & w_error,CUDT * & w_acpu)480 int srt::CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer, const CPacket& hspkt,
481 CHandShake& w_hs, int& w_error, CUDT*& w_acpu)
482 {
483 CUDTSocket* ns = NULL;
484 w_acpu = NULL;
485
486 w_error = SRT_REJ_IPE;
487
488 // Can't manage this error through an exception because this is
489 // running in the listener loop.
490 CUDTSocket* ls = locateSocket(listen);
491 if (!ls)
492 {
493 LOGC(cnlog.Error, log << "IPE: newConnection by listener socket id=" << listen << " which DOES NOT EXIST.");
494 return -1;
495 }
496
497 HLOGC(cnlog.Debug, log << "newConnection: creating new socket after listener @"
498 << listen << " contacted with backlog=" << ls->m_uiBackLog);
499
500 // if this connection has already been processed
501 if ((ns = locatePeer(peer, w_hs.m_iID, w_hs.m_iISN)) != NULL)
502 {
503 if (ns->core().m_bBroken)
504 {
505 // last connection from the "peer" address has been broken
506 ns->setClosed();
507
508 ScopedLock acceptcg(ls->m_AcceptLock);
509 ls->m_QueuedSockets.erase(ns->m_SocketID);
510 }
511 else
512 {
513 // connection already exist, this is a repeated connection request
514 // respond with existing HS information
515 HLOGC(cnlog.Debug, log
516 << "newConnection: located a WORKING peer @"
517 << w_hs.m_iID << " - ADAPTING.");
518
519 w_hs.m_iISN = ns->core().m_iISN;
520 w_hs.m_iMSS = ns->core().MSS();
521 w_hs.m_iFlightFlagSize = ns->core().m_config.iFlightFlagSize;
522 w_hs.m_iReqType = URQ_CONCLUSION;
523 w_hs.m_iID = ns->m_SocketID;
524
525 // Report the original UDT because it will be
526 // required to complete the HS data for conclusion response.
527 w_acpu = &ns->core();
528
529 return 0;
530
531 //except for this situation a new connection should be started
532 }
533 }
534 else
535 {
536 HLOGC(cnlog.Debug, log << "newConnection: NOT located any peer @"
537 << w_hs.m_iID << " - resuming with initial connection.");
538 }
539
540 // exceeding backlog, refuse the connection request
541 if (ls->m_QueuedSockets.size() >= ls->m_uiBackLog)
542 {
543 w_error = SRT_REJ_BACKLOG;
544 LOGC(cnlog.Note, log << "newConnection: listen backlog=" << ls->m_uiBackLog << " EXCEEDED");
545 return -1;
546 }
547
548 try
549 {
550 ns = new CUDTSocket(*ls);
551 // No need to check the peer, this is the address from which the request has come.
552 ns->m_PeerAddr = peer;
553 }
554 catch (...)
555 {
556 w_error = SRT_REJ_RESOURCE;
557 delete ns;
558 LOGC(cnlog.Error, log << "IPE: newConnection: unexpected exception (probably std::bad_alloc)");
559 return -1;
560 }
561
562 ns->core().m_RejectReason = SRT_REJ_UNKNOWN; // pre-set a universal value
563
564 try
565 {
566 ns->m_SocketID = generateSocketID();
567 }
568 catch (const CUDTException&)
569 {
570 LOGF(cnlog.Fatal, "newConnection: IPE: all sockets occupied? Last gen=%d", m_SocketIDGenerator);
571 // generateSocketID throws exception, which can be naturally handled
572 // when the call is derived from the API call, but here it's called
573 // internally in response to receiving a handshake. It must be handled
574 // here and turned into an erroneous return value.
575 delete ns;
576 return -1;
577 }
578
579 ns->m_ListenSocket = listen;
580 ns->core().m_SocketID = ns->m_SocketID;
581 ns->m_PeerID = w_hs.m_iID;
582 ns->m_iISN = w_hs.m_iISN;
583
584 HLOGC(cnlog.Debug, log << "newConnection: DATA: lsnid=" << listen
585 << " id=" << ns->core().m_SocketID
586 << " peerid=" << ns->core().m_PeerID
587 << " ISN=" << ns->m_iISN);
588
589 int error = 0;
590 bool should_submit_to_accept = true;
591
592 // Set the error code for all prospective problems below.
593 // It won't be interpreted when result was successful.
594 w_error = SRT_REJ_RESOURCE;
595
596 // These can throw exception only when the memory allocation failed.
597 // CUDT::connect() translates exception into CUDTException.
598 // CUDT::open() may only throw original std::bad_alloc from new.
599 // This is only to make the library extra safe (when your machine lacks
600 // memory, it will continue to work, but fail to accept connection).
601
602 try
603 {
604 // This assignment must happen b4 the call to CUDT::connect() because
605 // this call causes sending the SRT Handshake through this socket.
606 // Without this mapping the socket cannot be found and therefore
607 // the SRT Handshake message would fail.
608 HLOGF(cnlog.Debug,
609 "newConnection: incoming %s, mapping socket %d",
610 peer.str().c_str(), ns->m_SocketID);
611 {
612 ScopedLock cg(m_GlobControlLock);
613 m_Sockets[ns->m_SocketID] = ns;
614 }
615
616 if (ls->core().m_cbAcceptHook)
617 {
618 if (!ls->core().runAcceptHook(&ns->core(), peer.get(), w_hs, hspkt))
619 {
620 w_error = ns->core().m_RejectReason;
621
622 error = 1;
623 goto ERR_ROLLBACK;
624 }
625 }
626
627 // bind to the same addr of listening socket
628 ns->core().open();
629 updateListenerMux(ns, ls);
630
631 ns->core().acceptAndRespond(ls->m_SelfAddr, peer, hspkt, (w_hs));
632 }
633 catch (...)
634 {
635 // Extract the error that was set in this new failed entity.
636 w_error = ns->core().m_RejectReason;
637 error = 1;
638 goto ERR_ROLLBACK;
639 }
640
641 ns->m_Status = SRTS_CONNECTED;
642
643 // copy address information of local node
644 // Precisely, what happens here is:
645 // - Get the IP address and port from the system database
646 ns->core().m_pSndQueue->m_pChannel->getSockAddr((ns->m_SelfAddr));
647 // - OVERWRITE just the IP address itself by a value taken from piSelfIP
648 // (the family is used exactly as the one taken from what has been returned
649 // by getsockaddr)
650 CIPAddress::pton((ns->m_SelfAddr), ns->core().m_piSelfIP, peer);
651
652 {
653 // protect the m_PeerRec structure (and group existence)
654 ScopedLock glock (m_GlobControlLock);
655 try
656 {
657 HLOGF(cnlog.Debug,
658 "newConnection: mapping peer %d to that socket (%d)\n",
659 ns->m_PeerID, ns->m_SocketID);
660 m_PeerRec[ns->getPeerSpec()].insert(ns->m_SocketID);
661 }
662 catch (...)
663 {
664 LOGC(cnlog.Error, log << "newConnection: error when mapping peer!");
665 error = 2;
666 }
667
668 // The access to m_GroupOf should be also protected, as the group
669 // could be requested deletion in the meantime. This will hold any possible
670 // removal from group and resetting m_GroupOf field.
671
672 #if ENABLE_EXPERIMENTAL_BONDING
673 if (ns->m_GroupOf)
674 {
675 // XXX this might require another check of group type.
676 // For redundancy group, at least, update the status in the group
677 CUDTGroup* g = ns->m_GroupOf;
678 ScopedLock glock (g->m_GroupLock);
679 if (g->m_bClosing)
680 {
681 error = 1; // "INTERNAL REJECTION"
682 goto ERR_ROLLBACK;
683 }
684
685 // Check if this is the first socket in the group.
686 // If so, give it up to accept, otherwise just do nothing
687 // The client will be informed about the newly added connection at the
688 // first moment when attempting to get the group status.
689 for (CUDTGroup::gli_t gi = g->m_Group.begin(); gi != g->m_Group.end(); ++gi)
690 {
691 if (gi->laststatus == SRTS_CONNECTED)
692 {
693 HLOGC(cnlog.Debug, log << "Found another connected socket in the group: $"
694 << gi->id << " - socket will be NOT given up for accepting");
695 should_submit_to_accept = false;
696 break;
697 }
698 }
699
700 // Update the status in the group so that the next
701 // operation can include the socket in the group operation.
702 CUDTGroup::SocketData* gm = ns->m_GroupMemberData;
703
704 HLOGC(cnlog.Debug, log << "newConnection(GROUP): Socket @" << ns->m_SocketID << " BELONGS TO $" << g->id()
705 << " - will " << (should_submit_to_accept? "" : "NOT ") << "report in accept");
706 gm->sndstate = SRT_GST_IDLE;
707 gm->rcvstate = SRT_GST_IDLE;
708 gm->laststatus = SRTS_CONNECTED;
709
710 if (!g->m_bConnected)
711 {
712 HLOGC(cnlog.Debug, log << "newConnection(GROUP): First socket connected, SETTING GROUP CONNECTED");
713 g->m_bConnected = true;
714 }
715
716 // XXX PROLBEM!!! These events are subscribed here so that this is done once, lazily,
717 // but groupwise connections could be accepted from multiple listeners for the same group!
718 // m_listener MUST BE A CONTAINER, NOT POINTER!!!
719 // ALSO: Maybe checking "the same listener" is not necessary as subscruption may be done
720 // multiple times anyway?
721 if (!g->m_listener)
722 {
723 // Newly created group from the listener, which hasn't yet
724 // the listener set.
725 g->m_listener = ls;
726
727 // Listen on both first connected socket and continued sockets.
728 // This might help with jump-over situations, and in regular continued
729 // sockets the IN event won't be reported anyway.
730 int listener_modes = SRT_EPOLL_ACCEPT | SRT_EPOLL_UPDATE;
731 epoll_add_usock_INTERNAL(g->m_RcvEID, ls, &listener_modes);
732
733 // This listening should be done always when a first connected socket
734 // appears as accepted off the listener. This is for the sake of swait() calls
735 // inside the group receiving and sending functions so that they get
736 // interrupted when a new socket is connected.
737 }
738
739 // Add also per-direction subscription for the about-to-be-accepted socket.
740 // Both first accepted socket that makes the group-accept and every next
741 // socket that adds a new link.
742 int read_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
743 int write_modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
744 epoll_add_usock_INTERNAL(g->m_RcvEID, ns, &read_modes);
745 epoll_add_usock_INTERNAL(g->m_SndEID, ns, &write_modes);
746
747 // With app reader, do not set groupPacketArrival (block the
748 // provider array feature completely for now).
749
750
751 /* SETUP HERE IF NEEDED
752 ns->core().m_cbPacketArrival.set(ns->m_pUDT, &CUDT::groupPacketArrival);
753 */
754 }
755 else
756 {
757 HLOGC(cnlog.Debug, log << "newConnection: Socket @" << ns->m_SocketID << " is not in a group");
758 }
759 #endif
760 }
761
762 if (should_submit_to_accept)
763 {
764 enterCS(ls->m_AcceptLock);
765 try
766 {
767 ls->m_QueuedSockets.insert(ns->m_SocketID);
768 }
769 catch (...)
770 {
771 LOGC(cnlog.Error, log << "newConnection: error when queuing socket!");
772 error = 3;
773 }
774 leaveCS(ls->m_AcceptLock);
775
776 HLOGC(cnlog.Debug, log << "ACCEPT: new socket @" << ns->m_SocketID << " submitted for acceptance");
777 // acknowledge users waiting for new connections on the listening socket
778 m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_ACCEPT, true);
779
780 CGlobEvent::triggerEvent();
781
782 // XXX the exact value of 'error' is ignored
783 if (error > 0)
784 {
785 goto ERR_ROLLBACK;
786 }
787
788 // wake up a waiting accept() call
789 CSync::lock_signal(ls->m_AcceptCond, ls->m_AcceptLock);
790 }
791 else
792 {
793 HLOGC(cnlog.Debug, log << "ACCEPT: new socket @" << ns->m_SocketID
794 << " NOT submitted to acceptance, another socket in the group is already connected");
795
796 // acknowledge INTERNAL users waiting for new connections on the listening socket
797 // that are reported when a new socket is connected within an already connected group.
798 m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_UPDATE, true);
799 CGlobEvent::triggerEvent();
800 }
801
802 ERR_ROLLBACK:
803 // XXX the exact value of 'error' is ignored
804 if (error > 0)
805 {
806 #if ENABLE_LOGGING
807 static const char* why [] = {
808 "UNKNOWN ERROR",
809 "INTERNAL REJECTION",
810 "IPE when mapping a socket",
811 "IPE when inserting a socket"
812 };
813 LOGC(cnlog.Warn, log << CONID(ns->m_SocketID) << "newConnection: connection rejected due to: "
814 << why[error] << " - " << RequestTypeStr(URQFailure(w_error)));
815 #endif
816
817 SRTSOCKET id = ns->m_SocketID;
818 ns->core().closeInternal();
819 ns->setClosed();
820
821 // The mapped socket should be now unmapped to preserve the situation that
822 // was in the original UDT code.
823 // In SRT additionally the acceptAndRespond() function (it was called probably
824 // connect() in UDT code) may fail, in which case this socket should not be
825 // further processed and should be removed.
826 {
827 ScopedLock cg(m_GlobControlLock);
828
829 #if ENABLE_EXPERIMENTAL_BONDING
830 if (ns->m_GroupOf)
831 {
832 HLOGC(smlog.Debug, log << "@" << ns->m_SocketID << " IS MEMBER OF $" << ns->m_GroupOf->id() << " - REMOVING FROM GROUP");
833 ns->removeFromGroup(true);
834 }
835 #endif
836 m_Sockets.erase(id);
837 m_ClosedSockets[id] = ns;
838 }
839
840 return -1;
841 }
842
843 return 1;
844 }
845
846 // static forwarder
installAcceptHook(SRTSOCKET lsn,srt_listen_callback_fn * hook,void * opaq)847 int srt::CUDT::installAcceptHook(SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq)
848 {
849 return s_UDTUnited.installAcceptHook(lsn, hook, opaq);
850 }
851
installAcceptHook(const SRTSOCKET lsn,srt_listen_callback_fn * hook,void * opaq)852 int srt::CUDTUnited::installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq)
853 {
854 try
855 {
856 CUDTSocket* s = locateSocket(lsn, ERH_THROW);
857 s->core().installAcceptHook(hook, opaq);
858 }
859 catch (CUDTException& e)
860 {
861 SetThreadLocalError(e);
862 return SRT_ERROR;
863 }
864
865 return 0;
866 }
867
installConnectHook(SRTSOCKET lsn,srt_connect_callback_fn * hook,void * opaq)868 int srt::CUDT::installConnectHook(SRTSOCKET lsn, srt_connect_callback_fn* hook, void* opaq)
869 {
870 return s_UDTUnited.installConnectHook(lsn, hook, opaq);
871 }
872
installConnectHook(const SRTSOCKET u,srt_connect_callback_fn * hook,void * opaq)873 int srt::CUDTUnited::installConnectHook(const SRTSOCKET u, srt_connect_callback_fn* hook, void* opaq)
874 {
875 try
876 {
877 #if ENABLE_EXPERIMENTAL_BONDING
878 if (u & SRTGROUP_MASK)
879 {
880 GroupKeeper k (*this, u, ERH_THROW);
881 k.group->installConnectHook(hook, opaq);
882 return 0;
883 }
884 #endif
885 CUDTSocket* s = locateSocket(u, ERH_THROW);
886 s->core().installConnectHook(hook, opaq);
887 }
888 catch (CUDTException& e)
889 {
890 SetThreadLocalError(e);
891 return SRT_ERROR;
892 }
893
894 return 0;
895 }
896
getStatus(const SRTSOCKET u)897 SRT_SOCKSTATUS srt::CUDTUnited::getStatus(const SRTSOCKET u)
898 {
899 // protects the m_Sockets structure
900 ScopedLock cg(m_GlobControlLock);
901
902 sockets_t::const_iterator i = m_Sockets.find(u);
903
904 if (i == m_Sockets.end())
905 {
906 if (m_ClosedSockets.find(u) != m_ClosedSockets.end())
907 return SRTS_CLOSED;
908
909 return SRTS_NONEXIST;
910 }
911 return i->second->getStatus();
912 }
913
bind(CUDTSocket * s,const sockaddr_any & name)914 int srt::CUDTUnited::bind(CUDTSocket* s, const sockaddr_any& name)
915 {
916 ScopedLock cg(s->m_ControlLock);
917
918 // cannot bind a socket more than once
919 if (s->m_Status != SRTS_INIT)
920 throw CUDTException(MJ_NOTSUP, MN_NONE, 0);
921
922 s->core().open();
923 updateMux(s, name);
924 s->m_Status = SRTS_OPENED;
925
926 // copy address information of local node
927 s->core().m_pSndQueue->m_pChannel->getSockAddr((s->m_SelfAddr));
928
929 return 0;
930 }
931
bind(CUDTSocket * s,UDPSOCKET udpsock)932 int srt::CUDTUnited::bind(CUDTSocket* s, UDPSOCKET udpsock)
933 {
934 ScopedLock cg(s->m_ControlLock);
935
936 // cannot bind a socket more than once
937 if (s->m_Status != SRTS_INIT)
938 throw CUDTException(MJ_NOTSUP, MN_NONE, 0);
939
940 sockaddr_any name;
941 socklen_t namelen = sizeof name; // max of inet and inet6
942
943 // This will preset the sa_family as well; the namelen is given simply large
944 // enough for any family here.
945 if (::getsockname(udpsock, &name.sa, &namelen) == -1)
946 throw CUDTException(MJ_NOTSUP, MN_INVAL);
947
948 // Successfully extracted, so update the size
949 name.len = namelen;
950
951 s->core().open();
952 updateMux(s, name, &udpsock);
953 s->m_Status = SRTS_OPENED;
954
955 // copy address information of local node
956 s->core().m_pSndQueue->m_pChannel->getSockAddr(s->m_SelfAddr);
957
958 return 0;
959 }
960
listen(const SRTSOCKET u,int backlog)961 int srt::CUDTUnited::listen(const SRTSOCKET u, int backlog)
962 {
963 if (backlog <= 0)
964 throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
965
966 // Don't search for the socket if it's already -1;
967 // this never is a valid socket.
968 if (u == UDT::INVALID_SOCK)
969 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
970
971 CUDTSocket* s = locateSocket(u);
972 if (!s)
973 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
974
975 ScopedLock cg(s->m_ControlLock);
976
977 // NOTE: since now the socket is protected against simultaneous access.
978 // In the meantime the socket might have been closed, which means that
979 // it could have changed the state. It could be also set listen in another
980 // thread, so check it out.
981
982 // do nothing if the socket is already listening
983 if (s->m_Status == SRTS_LISTENING)
984 return 0;
985
986 // a socket can listen only if is in OPENED status
987 if (s->m_Status != SRTS_OPENED)
988 throw CUDTException(MJ_NOTSUP, MN_ISUNBOUND, 0);
989
990 // [[using assert(s->m_Status == OPENED)]];
991
992 // listen is not supported in rendezvous connection setup
993 if (s->core().m_config.bRendezvous)
994 throw CUDTException(MJ_NOTSUP, MN_ISRENDEZVOUS, 0);
995
996 s->m_uiBackLog = backlog;
997
998 // [[using assert(s->m_Status == OPENED)]]; // (still, unchanged)
999
1000 s->core().setListenState(); // propagates CUDTException,
1001 // if thrown, remains in OPENED state if so.
1002 s->m_Status = SRTS_LISTENING;
1003
1004 return 0;
1005 }
1006
accept_bond(const SRTSOCKET listeners[],int lsize,int64_t msTimeOut)1007 SRTSOCKET srt::CUDTUnited::accept_bond(const SRTSOCKET listeners [], int lsize, int64_t msTimeOut)
1008 {
1009 CEPollDesc* ed = 0;
1010 int eid = m_EPoll.create(&ed);
1011
1012 // Destroy it at return - this function can be interrupted
1013 // by an exception.
1014 struct AtReturn
1015 {
1016 int eid;
1017 CUDTUnited* that;
1018 AtReturn(CUDTUnited* t, int e): eid(e), that(t) {}
1019 ~AtReturn()
1020 {
1021 that->m_EPoll.release(eid);
1022 }
1023 } l_ar(this, eid);
1024
1025 // Subscribe all of listeners for accept
1026 int events = SRT_EPOLL_ACCEPT;
1027
1028 for (int i = 0; i < lsize; ++i)
1029 {
1030 srt_epoll_add_usock(eid, listeners[i], &events);
1031 }
1032
1033 CEPoll::fmap_t st;
1034 m_EPoll.swait(*ed, (st), msTimeOut, true);
1035
1036 if (st.empty())
1037 {
1038 // Sanity check
1039 throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
1040 }
1041
1042 // Theoretically we can have a situation that more than one
1043 // listener is ready for accept. In this case simply get
1044 // only the first found.
1045 int lsn = st.begin()->first;
1046 sockaddr_storage dummy;
1047 int outlen = sizeof dummy;
1048 return accept(lsn, ((sockaddr*)&dummy), (&outlen));
1049 }
1050
accept(const SRTSOCKET listen,sockaddr * pw_addr,int * pw_addrlen)1051 SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_addrlen)
1052 {
1053 if (pw_addr && !pw_addrlen)
1054 {
1055 LOGC(cnlog.Error, log << "srt_accept: provided address, but address length parameter is missing");
1056 throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
1057 }
1058
1059 CUDTSocket* ls = locateSocket(listen);
1060
1061 if (ls == NULL)
1062 {
1063 LOGC(cnlog.Error, log << "srt_accept: invalid listener socket ID value: " << listen);
1064 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
1065 }
1066
1067 // the "listen" socket must be in LISTENING status
1068 if (ls->m_Status != SRTS_LISTENING)
1069 {
1070 LOGC(cnlog.Error, log << "srt_accept: socket @" << listen << " is not in listening state (forgot srt_listen?)");
1071 throw CUDTException(MJ_NOTSUP, MN_NOLISTEN, 0);
1072 }
1073
1074 // no "accept" in rendezvous connection setup
1075 if (ls->core().m_config.bRendezvous)
1076 {
1077 LOGC(cnlog.Fatal, log << "CUDTUnited::accept: RENDEZVOUS flag passed through check in srt_listen when it set listen state");
1078 // This problem should never happen because `srt_listen` function should have
1079 // checked this situation before and not set listen state in result.
1080 // Inform the user about the invalid state in the universal way.
1081 throw CUDTException(MJ_NOTSUP, MN_NOLISTEN, 0);
1082 }
1083
1084 SRTSOCKET u = CUDT::INVALID_SOCK;
1085 bool accepted = false;
1086
1087 // !!only one conection can be set up each time!!
1088 while (!accepted)
1089 {
1090 UniqueLock accept_lock(ls->m_AcceptLock);
1091 CSync accept_sync(ls->m_AcceptCond, accept_lock);
1092
1093 if ((ls->m_Status != SRTS_LISTENING) || ls->core().m_bBroken)
1094 {
1095 // This socket has been closed.
1096 accepted = true;
1097 }
1098 else if (ls->m_QueuedSockets.size() > 0)
1099 {
1100 set<SRTSOCKET>::iterator b = ls->m_QueuedSockets.begin();
1101 u = *b;
1102 ls->m_QueuedSockets.erase(b);
1103 accepted = true;
1104 }
1105 else if (!ls->core().m_config.bSynRecving)
1106 {
1107 accepted = true;
1108 }
1109
1110 if (!accepted && (ls->m_Status == SRTS_LISTENING))
1111 accept_sync.wait();
1112
1113 if (ls->m_QueuedSockets.empty())
1114 m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_ACCEPT, false);
1115 }
1116
1117 if (u == CUDT::INVALID_SOCK)
1118 {
1119 // non-blocking receiving, no connection available
1120 if (!ls->core().m_config.bSynRecving)
1121 {
1122 LOGC(cnlog.Error, log << "srt_accept: no pending connection available at the moment");
1123 throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
1124 }
1125
1126 LOGC(cnlog.Error, log << "srt_accept: listener socket @" << listen << " is already closed");
1127 // listening socket is closed
1128 throw CUDTException(MJ_SETUP, MN_CLOSED, 0);
1129 }
1130
1131 CUDTSocket* s = locateSocket(u);
1132 if (s == NULL)
1133 {
1134 LOGC(cnlog.Error, log << "srt_accept: pending connection has unexpectedly closed");
1135 throw CUDTException(MJ_SETUP, MN_CLOSED, 0);
1136 }
1137
1138 // Set properly the SRTO_GROUPCONNECT flag
1139 s->core().m_config.iGroupConnect = 0;
1140
1141 // Check if LISTENER has the SRTO_GROUPCONNECT flag set,
1142 // and the already accepted socket has successfully joined
1143 // the mirror group. If so, RETURN THE GROUP ID, not the socket ID.
1144 #if ENABLE_EXPERIMENTAL_BONDING
1145 if (ls->core().m_config.iGroupConnect == 1 && s->m_GroupOf)
1146 {
1147 // Put a lock to protect the group against accidental deletion
1148 // in the meantime.
1149 ScopedLock glock (m_GlobControlLock);
1150 // Check again; it's unlikely to happen, but
1151 // it's a theoretically possible scenario
1152 if (s->m_GroupOf)
1153 {
1154 u = s->m_GroupOf->m_GroupID;
1155 s->core().m_config.iGroupConnect = 1; // should be derived from ls, but make sure
1156
1157 // Mark the beginning of the connection at the moment
1158 // when the group ID is returned to the app caller
1159 s->m_GroupOf->m_stats.tsLastSampleTime = steady_clock::now();
1160 }
1161 else
1162 {
1163 LOGC(smlog.Error, log << "accept: IPE: socket's group deleted in the meantime of accept process???");
1164 }
1165 }
1166 #endif
1167
1168 ScopedLock cg(s->m_ControlLock);
1169
1170 if (pw_addr != NULL && pw_addrlen != NULL)
1171 {
1172 // Check if the length of the buffer to fill the name in
1173 // was large enough.
1174 const int len = s->m_PeerAddr.size();
1175 if (*pw_addrlen < len)
1176 throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
1177
1178 memcpy((pw_addr), &s->m_PeerAddr, len);
1179 *pw_addrlen = len;
1180 }
1181
1182 return u;
1183 }
1184
connect(SRTSOCKET u,const sockaddr * srcname,const sockaddr * tarname,int namelen)1185 int srt::CUDTUnited::connect(SRTSOCKET u, const sockaddr* srcname, const sockaddr* tarname, int namelen)
1186 {
1187 // Here both srcname and tarname must be specified
1188 if (!srcname || !tarname || size_t(namelen) < sizeof (sockaddr_in))
1189 {
1190 LOGC(aclog.Error, log << "connect(with source): invalid call: srcname="
1191 << srcname << " tarname=" << tarname << " namelen=" << namelen);
1192 throw CUDTException(MJ_NOTSUP, MN_INVAL);
1193 }
1194
1195 sockaddr_any source_addr(srcname, namelen);
1196 if (source_addr.len == 0)
1197 throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
1198 sockaddr_any target_addr(tarname, namelen);
1199 if (target_addr.len == 0)
1200 throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
1201
1202 #if ENABLE_EXPERIMENTAL_BONDING
1203 // Check affiliation of the socket. It's now allowed for it to be
1204 // a group or socket. For a group, add automatically a socket to
1205 // the group.
1206 if (u & SRTGROUP_MASK)
1207 {
1208 GroupKeeper k (*this, u, ERH_THROW);
1209 // Note: forced_isn is ignored when connecting a group.
1210 // The group manages the ISN by itself ALWAYS, that is,
1211 // it's generated anew for the very first socket, and then
1212 // derived by all sockets in the group.
1213 SRT_SOCKGROUPCONFIG gd[1] = { srt_prepare_endpoint(srcname, tarname, namelen) };
1214
1215 // When connecting to exactly one target, only this very target
1216 // can be returned as a socket, so rewritten back array can be ignored.
1217 return singleMemberConnect(k.group, gd);
1218 }
1219 #endif
1220
1221 CUDTSocket* s = locateSocket(u);
1222 if (s == NULL)
1223 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
1224
1225 // For a single socket, just do bind, then connect
1226 bind(s, source_addr);
1227 return connectIn(s, target_addr, SRT_SEQNO_NONE);
1228 }
1229
connect(const SRTSOCKET u,const sockaddr * name,int namelen,int32_t forced_isn)1230 int srt::CUDTUnited::connect(const SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn)
1231 {
1232 sockaddr_any target_addr(name, namelen);
1233 if (target_addr.len == 0)
1234 throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
1235
1236 #if ENABLE_EXPERIMENTAL_BONDING
1237 // Check affiliation of the socket. It's now allowed for it to be
1238 // a group or socket. For a group, add automatically a socket to
1239 // the group.
1240 if (u & SRTGROUP_MASK)
1241 {
1242 GroupKeeper k (*this, u, ERH_THROW);
1243
1244 // Note: forced_isn is ignored when connecting a group.
1245 // The group manages the ISN by itself ALWAYS, that is,
1246 // it's generated anew for the very first socket, and then
1247 // derived by all sockets in the group.
1248 SRT_SOCKGROUPCONFIG gd[1] = { srt_prepare_endpoint(NULL, name, namelen) };
1249 return singleMemberConnect(k.group, gd);
1250 }
1251 #endif
1252
1253 CUDTSocket* s = locateSocket(u);
1254 if (!s)
1255 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
1256
1257 return connectIn(s, target_addr, forced_isn);
1258 }
1259
1260 #if ENABLE_EXPERIMENTAL_BONDING
singleMemberConnect(CUDTGroup * pg,SRT_SOCKGROUPCONFIG * gd)1261 int srt::CUDTUnited::singleMemberConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* gd)
1262 {
1263 int gstat = groupConnect(pg, gd, 1);
1264 if (gstat == -1)
1265 {
1266 // We have only one element here, so refer to it.
1267 // Sanity check
1268 if (gd->errorcode == SRT_SUCCESS)
1269 gd->errorcode = SRT_EINVPARAM;
1270
1271 CodeMajor mj = CodeMajor(gd->errorcode / 1000);
1272 CodeMinor mn = CodeMinor(gd->errorcode % 1000);
1273
1274 return CUDT::APIError(mj, mn);
1275 }
1276
1277 return gstat;
1278 }
1279
1280 // [[using assert(pg->m_iBusy > 0)]]
groupConnect(CUDTGroup * pg,SRT_SOCKGROUPCONFIG * targets,int arraysize)1281 int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int arraysize)
1282 {
1283 CUDTGroup& g = *pg;
1284 SRT_ASSERT(g.m_iBusy > 0);
1285
1286 // The group must be managed to use srt_connect on it,
1287 // as it must create particular socket automatically.
1288
1289 // Non-managed groups can't be "connected" - at best you can connect
1290 // every socket individually.
1291 if (!g.managed())
1292 throw CUDTException(MJ_NOTSUP, MN_INVAL);
1293
1294 // Check and report errors on data brought in by srt_prepare_endpoint,
1295 // as the latter function has no possibility to report errors.
1296 for (int tii = 0; tii < arraysize; ++tii)
1297 {
1298 if (targets[tii].srcaddr.ss_family != targets[tii].peeraddr.ss_family)
1299 {
1300 LOGC(aclog.Error, log << "srt_connect/group: family differs on source and target address");
1301 throw CUDTException(MJ_NOTSUP, MN_INVAL);
1302 }
1303
1304 if (targets[tii].weight > CUDT::MAX_WEIGHT)
1305 {
1306 LOGC(aclog.Error, log << "srt_connect/group: weight value must be between 0 and " << (+CUDT::MAX_WEIGHT));
1307 throw CUDTException(MJ_NOTSUP, MN_INVAL);
1308 }
1309 }
1310
1311 // If the open state switched to OPENED, the blocking mode
1312 // must make it wait for connecting it. Doing connect when the
1313 // group is already OPENED returns immediately, regardless if the
1314 // connection is going to later succeed or fail (this will be
1315 // known in the group state information).
1316 bool block_new_opened = !g.m_bOpened && g.m_bSynRecving;
1317 const bool was_empty = g.groupEmpty();
1318
1319 // In case the group was retried connection, clear first all epoll readiness.
1320 const int ncleared = m_EPoll.update_events(g.id(), g.m_sPollID, SRT_EPOLL_ERR, false);
1321 if (was_empty || ncleared)
1322 {
1323 HLOGC(aclog.Debug, log << "srt_connect/group: clearing IN/OUT because was_empty=" << was_empty << " || ncleared=" << ncleared);
1324 // IN/OUT only in case when the group is empty, otherwise it would
1325 // clear out correct readiness resulting from earlier calls.
1326 // This also should happen if ERR flag was set, as IN and OUT could be set, too.
1327 m_EPoll.update_events(g.id(), g.m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT, false);
1328 }
1329 SRTSOCKET retval = -1;
1330
1331 int eid = -1;
1332 int connect_modes = SRT_EPOLL_CONNECT | SRT_EPOLL_ERR;
1333 if (block_new_opened)
1334 {
1335 // Create this eid only to block-wait for the first
1336 // connection.
1337 eid = srt_epoll_create();
1338 }
1339
1340 // Use private map to avoid searching in the
1341 // overall map.
1342 map<SRTSOCKET, CUDTSocket*> spawned;
1343
1344 HLOGC(aclog.Debug, log << "groupConnect: will connect " << arraysize << " links and "
1345 << (block_new_opened ? "BLOCK until any is ready" : "leave the process in background"));
1346
1347 for (int tii = 0; tii < arraysize; ++tii)
1348 {
1349 sockaddr_any target_addr(targets[tii].peeraddr);
1350 sockaddr_any source_addr(targets[tii].srcaddr);
1351 SRTSOCKET& sid_rloc = targets[tii].id;
1352 int& erc_rloc = targets[tii].errorcode;
1353 erc_rloc = SRT_SUCCESS; // preinitialized
1354 HLOGC(aclog.Debug, log << "groupConnect: taking on " << sockaddr_any(targets[tii].peeraddr).str());
1355
1356 CUDTSocket* ns = 0;
1357
1358 // NOTE: After calling newSocket, the socket is mapped into m_Sockets.
1359 // It must be MANUALLY removed from this list in case we need it deleted.
1360 SRTSOCKET sid = newSocket(&ns);
1361
1362 if (pg->m_cbConnectHook)
1363 {
1364 // Derive the connect hook by the socket, if set on the group
1365 ns->core().m_cbConnectHook = pg->m_cbConnectHook;
1366 }
1367
1368 SRT_SocketOptionObject* config = targets[tii].config;
1369
1370 // XXX Support non-blocking mode:
1371 // If the group has nonblocking set for connect (SNDSYN),
1372 // then it must set so on the socket. Then, the connection
1373 // process is asynchronous. The socket appears first as
1374 // GST_PENDING state, and only after the socket becomes
1375 // connected does its status in the group turn into GST_IDLE.
1376
1377 // Set all options that were requested by the options set on a group
1378 // prior to connecting.
1379 string error_reason SRT_ATR_UNUSED;
1380 try
1381 {
1382 for (size_t i = 0; i < g.m_config.size(); ++i)
1383 {
1384 HLOGC(aclog.Debug, log << "groupConnect: OPTION @" << sid << " #" << g.m_config[i].so);
1385 error_reason = "setting group-derived option: #" + Sprint(g.m_config[i].so);
1386 ns->core().setOpt(g.m_config[i].so, &g.m_config[i].value[0], (int) g.m_config[i].value.size());
1387 }
1388
1389 // Do not try to set a user option if failed already.
1390 if (config)
1391 {
1392 error_reason = "user option";
1393 ns->core().applyMemberConfigObject(*config);
1394 }
1395
1396 error_reason = "bound address";
1397 // We got it. Bind the socket, if the source address was set
1398 if (!source_addr.empty())
1399 bind(ns, source_addr);
1400
1401 }
1402 catch (CUDTException& e)
1403 {
1404 // Just notify the problem, but the loop must continue.
1405 // Set the original error as reported.
1406 targets[tii].errorcode = e.getErrorCode();
1407 LOGC(aclog.Error, log << "srt_connect_group: failed to set " << error_reason);
1408 }
1409 catch (...)
1410 {
1411 // Set the general EINVPARAM - this error should never happen
1412 LOGC(aclog.Error, log << "IPE: CUDT::setOpt reported unknown exception");
1413 targets[tii].errorcode = SRT_EINVPARAM;
1414 }
1415
1416 // Add socket to the group.
1417 // Do it after setting all stored options, as some of them may
1418 // influence some group data.
1419
1420 srt::groups::SocketData data = srt::groups::prepareSocketData(ns);
1421 if (targets[tii].token != -1)
1422 {
1423 // Reuse the token, if specified by the caller
1424 data.token = targets[tii].token;
1425 }
1426 else
1427 {
1428 // Otherwise generate and write back the token
1429 data.token = CUDTGroup::genToken();
1430 targets[tii].token = data.token;
1431 }
1432
1433 {
1434 ScopedLock cs(m_GlobControlLock);
1435 if (m_Sockets.count(sid) == 0)
1436 {
1437 HLOGC(aclog.Debug, log << "srt_connect_group: socket @" << sid << " deleted in process");
1438 // Someone deleted the socket in the meantime?
1439 // Unlikely, but possible in theory.
1440 // Don't delete anyhting - it's alreay done.
1441 continue;
1442 }
1443
1444 // There's nothing wrong with preparing the data first
1445 // even if this happens for nothing. But now, under the lock
1446 // and after checking that the socket still exists, check now
1447 // if this succeeded, and then also if the group is still usable.
1448 // The group will surely exist because it's set busy, until the
1449 // end of this function. But it might be simultaneously requested closed.
1450 bool proceed = true;
1451
1452 if (targets[tii].errorcode != SRT_SUCCESS)
1453 {
1454 HLOGC(aclog.Debug, log << "srt_connect_group: not processing @" << sid << " due to error in setting options");
1455 proceed = false;
1456 }
1457
1458 if (g.m_bClosing)
1459 {
1460 HLOGC(aclog.Debug, log << "srt_connect_group: not processing @" << sid << " due to CLOSED GROUP $" << g.m_GroupID);
1461 proceed = false;
1462 }
1463
1464 if (proceed)
1465 {
1466 CUDTGroup::SocketData* f = g.add(data);
1467 ns->m_GroupMemberData = f;
1468 ns->m_GroupOf = &g;
1469 f->weight = targets[tii].weight;
1470 LOGC(aclog.Note, log << "srt_connect_group: socket @" << sid << " added to group $" << g.m_GroupID);
1471 }
1472 else
1473 {
1474 targets[tii].id = CUDT::INVALID_SOCK;
1475 delete ns;
1476 m_Sockets.erase(sid);
1477
1478 // If failed to set options, then do not continue
1479 // neither with binding, nor with connecting.
1480 continue;
1481 }
1482 }
1483
1484
1485 // XXX This should be reenabled later, this should
1486 // be probably still in use to exchange information about
1487 // packets assymetrically lost. But for no other purpose.
1488 /*
1489 ns->core().m_cbPacketArrival.set(ns->m_pUDT, &CUDT::groupPacketArrival);
1490 */
1491
1492 int isn = g.currentSchedSequence();
1493
1494 // Don't synchronize ISN in case of synch on msgno. Every link
1495 // may send their own payloads independently.
1496 if (g.synconmsgno())
1497 {
1498 HLOGC(aclog.Debug, log << "groupConnect: NOT synchronizing sequence numbers: will sync on msgno");
1499 isn = -1;
1500 }
1501
1502 // Set it the groupconnect option, as all in-group sockets should have.
1503 ns->core().m_config.iGroupConnect = 1;
1504
1505 // Every group member will have always nonblocking
1506 // (this implies also non-blocking connect/accept).
1507 // The group facility functions will block when necessary
1508 // using epoll_wait.
1509 ns->core().m_config.bSynRecving = false;
1510 ns->core().m_config.bSynSending = false;
1511
1512 HLOGC(aclog.Debug, log << "groupConnect: NOTIFIED AS PENDING @" << sid << " both read and write");
1513 // If this socket is not to block the current connect process,
1514 // it may still be needed for the further check if the redundant
1515 // connection succeeded or failed and whether the new socket is
1516 // ready to use or needs to be closed.
1517 epoll_add_usock_INTERNAL(g.m_SndEID, ns, &connect_modes);
1518 epoll_add_usock_INTERNAL(g.m_RcvEID, ns, &connect_modes);
1519
1520 // Adding a socket on which we need to block to BOTH these tracking EIDs
1521 // and the blocker EID. We'll simply remove from them later all sockets that
1522 // got connected state or were broken.
1523
1524 if (block_new_opened)
1525 {
1526 HLOGC(aclog.Debug, log << "groupConnect: WILL BLOCK on @" << sid << " until connected");
1527 epoll_add_usock_INTERNAL(eid, ns, &connect_modes);
1528 }
1529
1530 // And connect
1531 try
1532 {
1533 HLOGC(aclog.Debug, log << "groupConnect: connecting a new socket with ISN=" << isn);
1534 connectIn(ns, target_addr, isn);
1535 }
1536 catch (const CUDTException& e)
1537 {
1538 LOGC(aclog.Error, log << "groupConnect: socket @" << sid << " in group " << pg->id() << " failed to connect");
1539 // We know it does belong to a group.
1540 // Remove it first because this involves a mutex, and we want
1541 // to avoid locking more than one mutex at a time.
1542 erc_rloc = e.getErrorCode();
1543 targets[tii].errorcode = e.getErrorCode();
1544 targets[tii].id = CUDT::INVALID_SOCK;
1545
1546 ScopedLock cl (m_GlobControlLock);
1547 ns->removeFromGroup(false);
1548 m_Sockets.erase(ns->m_SocketID);
1549 // Intercept to delete the socket on failure.
1550 delete ns;
1551 continue;
1552 }
1553 catch (...)
1554 {
1555 LOGC(aclog.Fatal, log << "groupConnect: IPE: UNKNOWN EXCEPTION from connectIn");
1556 targets[tii].errorcode = SRT_ESYSOBJ;
1557 targets[tii].id = CUDT::INVALID_SOCK;
1558 ScopedLock cl (m_GlobControlLock);
1559 ns->removeFromGroup(false);
1560 m_Sockets.erase(ns->m_SocketID);
1561 // Intercept to delete the socket on failure.
1562 delete ns;
1563
1564 // Do not use original exception, it may crash off a C API.
1565 throw CUDTException(MJ_SYSTEMRES, MN_OBJECT);
1566 }
1567
1568 SRT_SOCKSTATUS st;
1569 {
1570 ScopedLock grd (ns->m_ControlLock);
1571 st = ns->getStatus();
1572 }
1573
1574 {
1575 // NOTE: Not applying m_GlobControlLock because the group is now
1576 // set busy, so it won't be deleted, even if it was requested to be closed.
1577 ScopedLock grd (g.m_GroupLock);
1578
1579 if (!ns->m_GroupOf)
1580 {
1581 // The situation could get changed between the unlock and lock of m_GroupLock.
1582 // This must be checked again.
1583 // If a socket has been removed from group, it means that some other thread is
1584 // currently trying to delete the socket. Therefore it doesn't have, and even shouldn't,
1585 // be deleted here. Just exit with error report.
1586 LOGC(aclog.Error, log << "groupConnect: self-created member socket deleted during process, SKIPPING.");
1587
1588 // Do not report the error from here, just ignore this socket.
1589 continue;
1590 }
1591
1592 // If m_GroupOf is not NULL, the m_IncludedIter is still valid.
1593 CUDTGroup::SocketData* f = ns->m_GroupMemberData;
1594
1595 // Now under a group lock, we need to make sure the group isn't being closed
1596 // in order not to add a socket to a dead group.
1597 if (g.m_bClosing)
1598 {
1599 LOGC(aclog.Error, log << "groupConnect: group deleted while connecting; breaking the process");
1600
1601 // Set the status as pending so that the socket is taken care of later.
1602 // Note that all earlier sockets that were processed in this loop were either
1603 // set BROKEN or PENDING.
1604 f->sndstate = SRT_GST_PENDING;
1605 f->rcvstate = SRT_GST_PENDING;
1606 retval = -1;
1607 break;
1608 }
1609
1610 HLOGC(aclog.Debug, log << "groupConnect: @" << sid << " connection successful, setting group OPEN (was "
1611 << (g.m_bOpened ? "ALREADY" : "NOT") << "), will " << (block_new_opened ? "" : "NOT ")
1612 << "block the connect call, status:" << SockStatusStr(st));
1613
1614 // XXX OPEN OR CONNECTED?
1615 // BLOCK IF NOT OPEN OR BLOCK IF NOT CONNECTED?
1616 //
1617 // What happens to blocking when there are 2 connections
1618 // pending, about to be broken, and srt_connect() is called again?
1619 // SHOULD BLOCK the latter, even though is OPEN.
1620 // Or, OPEN should be removed from here and srt_connect(_group)
1621 // should block always if the group doesn't have neither 1 conencted link
1622 g.m_bOpened = true;
1623
1624 g.m_stats.tsLastSampleTime = steady_clock::now();
1625
1626 f->laststatus = st;
1627 // Check the socket status and update it.
1628 // Turn the group state of the socket to IDLE only if
1629 // connection is established or in progress
1630 f->agent = source_addr;
1631 f->peer = target_addr;
1632
1633 if (st >= SRTS_BROKEN)
1634 {
1635 f->sndstate = SRT_GST_BROKEN;
1636 f->rcvstate = SRT_GST_BROKEN;
1637 epoll_remove_socket_INTERNAL(g.m_SndEID, ns);
1638 epoll_remove_socket_INTERNAL(g.m_RcvEID, ns);
1639 }
1640 else
1641 {
1642 f->sndstate = SRT_GST_PENDING;
1643 f->rcvstate = SRT_GST_PENDING;
1644 spawned[sid] = ns;
1645
1646 sid_rloc = sid;
1647 erc_rloc = 0;
1648 retval = sid;
1649 }
1650 }
1651 }
1652
1653 if (retval == -1)
1654 {
1655 HLOGC(aclog.Debug, log << "groupConnect: none succeeded as background-spawn, exit with error");
1656 block_new_opened = false; // Avoid executing further while loop
1657 }
1658
1659 vector<SRTSOCKET> broken;
1660
1661 while (block_new_opened)
1662 {
1663 if (spawned.empty())
1664 {
1665 // All were removed due to errors.
1666 retval = -1;
1667 break;
1668 }
1669 HLOGC(aclog.Debug, log << "groupConnect: first connection, applying EPOLL WAITING.");
1670 int len = (int) spawned.size();
1671 vector<SRTSOCKET> ready(spawned.size());
1672 const int estat = srt_epoll_wait(eid,
1673 NULL, NULL, // IN/ACCEPT
1674 &ready[0], &len, // OUT/CONNECT
1675 -1, // indefinitely (FIXME Check if it needs to REGARD CONNECTION TIMEOUT!)
1676 NULL, NULL,
1677 NULL, NULL
1678 );
1679
1680 // Sanity check. Shouldn't happen if subs are in sync with spawned.
1681 if (estat == -1)
1682 {
1683 #if ENABLE_LOGGING
1684 CUDTException& x = CUDT::getlasterror();
1685 if (x.getErrorCode() != SRT_EPOLLEMPTY)
1686 {
1687 LOGC(aclog.Error, log << "groupConnect: srt_epoll_wait failed not because empty, unexpected IPE:"
1688 << x.getErrorMessage());
1689 }
1690 #endif
1691 HLOGC(aclog.Debug, log << "groupConnect: srt_epoll_wait failed - breaking the wait loop");
1692 retval = -1;
1693 break;
1694 }
1695
1696 // At the moment when you are going to work with real sockets,
1697 // lock the groups so that no one messes up with something here
1698 // in the meantime.
1699
1700 ScopedLock lock (*g.exp_groupLock());
1701
1702 // NOTE: UNDER m_GroupLock, NO API FUNCTION CALLS DARE TO HAPPEN BELOW!
1703
1704 // Check first if a socket wasn't closed in the meantime. It will be
1705 // automatically removed from all EIDs, but there's no sense in keeping
1706 // them in 'spawned' map.
1707 for (map<SRTSOCKET, CUDTSocket*>::iterator y = spawned.begin();
1708 y != spawned.end(); ++y)
1709 {
1710 SRTSOCKET sid = y->first;
1711 if (y->second->getStatus() >= SRTS_BROKEN)
1712 {
1713 HLOGC(aclog.Debug, log << "groupConnect: Socket @" << sid << " got BROKEN in the meantine during the check, remove from candidates");
1714 // Remove from spawned and try again
1715 broken.push_back(sid);
1716
1717 epoll_remove_socket_INTERNAL(eid, y->second);
1718 epoll_remove_socket_INTERNAL(g.m_SndEID, y->second);
1719 epoll_remove_socket_INTERNAL(g.m_RcvEID, y->second);
1720 }
1721 }
1722
1723 // Remove them outside the loop because this can't be done
1724 // while iterating over the same container.
1725 for (size_t i = 0; i < broken.size(); ++i)
1726 spawned.erase(broken[i]);
1727
1728 // Check the sockets if they were reported due
1729 // to have connected or due to have failed.
1730 // Distill successful ones. If distilled nothing, return -1.
1731 // If not all sockets were reported in this instance, repeat
1732 // the call until you get information about all of them.
1733 for (int i = 0; i < len; ++i)
1734 {
1735 map<SRTSOCKET, CUDTSocket*>::iterator x = spawned.find(ready[i]);
1736 if (x == spawned.end())
1737 {
1738 // Might be removed above - ignore it.
1739 continue;
1740 }
1741
1742 SRTSOCKET sid = x->first;
1743 CUDTSocket* s = x->second;
1744
1745 // Check status. If failed, remove from spawned
1746 // and try again.
1747 SRT_SOCKSTATUS st = s->getStatus();
1748 if (st >= SRTS_BROKEN)
1749 {
1750 HLOGC(aclog.Debug, log << "groupConnect: Socket @" << sid << " got BROKEN during background connect, remove & TRY AGAIN");
1751 // Remove from spawned and try again
1752 if (spawned.erase(sid))
1753 broken.push_back(sid);
1754
1755 epoll_remove_socket_INTERNAL(eid, s);
1756 epoll_remove_socket_INTERNAL(g.m_SndEID, s);
1757 epoll_remove_socket_INTERNAL(g.m_RcvEID, s);
1758
1759 continue;
1760 }
1761
1762 if (st == SRTS_CONNECTED)
1763 {
1764 HLOGC(aclog.Debug, log << "groupConnect: Socket @" << sid << " got CONNECTED as first in the group - reporting");
1765 retval = sid;
1766 g.m_bConnected = true;
1767 block_new_opened = false; // Interrupt also rolling epoll (outer loop)
1768
1769 // Remove this socket from SND EID because it doesn't need to
1770 // be connection-tracked anymore. Don't remove from the RCV EID
1771 // however because RCV procedure relies on epoll also for reading
1772 // and when found this socket connected it will "upgrade" it to
1773 // read-ready tracking only.
1774 epoll_remove_socket_INTERNAL(g.m_SndEID, s);
1775 break;
1776 }
1777
1778 // Spurious?
1779 HLOGC(aclog.Debug, log << "groupConnect: Socket @" << sid << " got spurious wakeup in "
1780 << SockStatusStr(st) << " TRY AGAIN");
1781 }
1782 // END of m_GroupLock CS - you can safely use API functions now.
1783 }
1784 // Finished, delete epoll.
1785 if (eid != -1)
1786 {
1787 HLOGC(aclog.Debug, log << "connect FIRST IN THE GROUP finished, removing E" << eid);
1788 srt_epoll_release(eid);
1789 }
1790
1791 for (vector<SRTSOCKET>::iterator b = broken.begin(); b != broken.end(); ++b)
1792 {
1793 CUDTSocket* s = locateSocket(*b, ERH_RETURN);
1794 if (!s)
1795 continue;
1796
1797 // This will also automatically remove it from the group and all eids
1798 close(s);
1799 }
1800
1801 // There's no possibility to report a problem on every connection
1802 // separately in case when every single connection has failed. What
1803 // is more interesting, it's only a matter of luck that all connections
1804 // fail at exactly the same time. OTOH if all are to fail, this
1805 // function will still be polling sockets to determine the last man
1806 // standing. Each one could, however, break by a different reason,
1807 // for example, one by timeout, another by wrong passphrase. Check
1808 // the `errorcode` field to determine the reaon for particular link.
1809 if (retval == -1)
1810 throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
1811
1812 return retval;
1813 }
1814 #endif
1815
1816
connectIn(CUDTSocket * s,const sockaddr_any & target_addr,int32_t forced_isn)1817 int srt::CUDTUnited::connectIn(CUDTSocket* s, const sockaddr_any& target_addr, int32_t forced_isn)
1818 {
1819 ScopedLock cg(s->m_ControlLock);
1820 // a socket can "connect" only if it is in the following states:
1821 // - OPENED: assume the socket binding parameters are configured
1822 // - INIT: configure binding parameters here
1823 // - any other (meaning, already connected): report error
1824
1825 if (s->m_Status == SRTS_INIT)
1826 {
1827 if (s->core().m_config.bRendezvous)
1828 throw CUDTException(MJ_NOTSUP, MN_ISRENDUNBOUND, 0);
1829
1830 // If bind() was done first on this socket, then the
1831 // socket will not perform this step. This actually does the
1832 // same thing as bind() does, just with empty address so that
1833 // the binding parameters are autoselected.
1834
1835 s->core().open();
1836 sockaddr_any autoselect_sa (target_addr.family());
1837 // This will create such a sockaddr_any that
1838 // will return true from empty().
1839 updateMux(s, autoselect_sa); // <<---- updateMux
1840 // -> C(Snd|Rcv)Queue::init
1841 // -> pthread_create(...C(Snd|Rcv)Queue::worker...)
1842 s->m_Status = SRTS_OPENED;
1843 }
1844 else
1845 {
1846 if (s->m_Status != SRTS_OPENED)
1847 throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0);
1848
1849 // status = SRTS_OPENED, so family should be known already.
1850 if (target_addr.family() != s->m_SelfAddr.family())
1851 {
1852 LOGP(cnlog.Error, "srt_connect: socket is bound to a different family than target address");
1853 throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
1854 }
1855 }
1856
1857
1858 // connect_complete() may be called before connect() returns.
1859 // So we need to update the status before connect() is called,
1860 // otherwise the status may be overwritten with wrong value
1861 // (CONNECTED vs. CONNECTING).
1862 s->m_Status = SRTS_CONNECTING;
1863
1864 /*
1865 * In blocking mode, connect can block for up to 30 seconds for
1866 * rendez-vous mode. Holding the s->m_ControlLock prevent close
1867 * from cancelling the connect
1868 */
1869 try
1870 {
1871 // record peer address
1872 s->m_PeerAddr = target_addr;
1873 s->core().startConnect(target_addr, forced_isn);
1874 }
1875 catch (CUDTException& e) // Interceptor, just to change the state.
1876 {
1877 s->m_Status = SRTS_OPENED;
1878 throw e;
1879 }
1880
1881 // ScopedLock destructor will delete cg and unlock s->m_ControlLock
1882
1883 return 0;
1884 }
1885
1886
close(const SRTSOCKET u)1887 int srt::CUDTUnited::close(const SRTSOCKET u)
1888 {
1889 #if ENABLE_EXPERIMENTAL_BONDING
1890 if (u & SRTGROUP_MASK)
1891 {
1892 GroupKeeper k (*this, u, ERH_THROW);
1893 k.group->close();
1894 deleteGroup(k.group);
1895 return 0;
1896 }
1897 #endif
1898 CUDTSocket* s = locateSocket(u);
1899 if (!s)
1900 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
1901
1902 return close(s);
1903 }
1904
1905 #if ENABLE_EXPERIMENTAL_BONDING
deleteGroup(CUDTGroup * g)1906 void srt::CUDTUnited::deleteGroup(CUDTGroup* g)
1907 {
1908 using srt_logging::gmlog;
1909
1910 srt::sync::ScopedLock cg (m_GlobControlLock);
1911 return deleteGroup_LOCKED(g);
1912 }
1913
1914 // [[using locked(m_GlobControlLock)]]
deleteGroup_LOCKED(CUDTGroup * g)1915 void srt::CUDTUnited::deleteGroup_LOCKED(CUDTGroup* g)
1916 {
1917 SRT_ASSERT(g->groupEmpty());
1918
1919 // After that the group is no longer findable by GroupKeeper
1920 m_Groups.erase(g->m_GroupID);
1921 m_ClosedGroups[g->m_GroupID] = g;
1922
1923 // Paranoid check: since the group is in m_ClosedGroups
1924 // it may potentially be deleted. Make sure no socket points
1925 // to it. Actually all sockets should have been already removed
1926 // from the group container, so if any does, it's invalid.
1927 for (sockets_t::iterator i = m_Sockets.begin();
1928 i != m_Sockets.end(); ++ i)
1929 {
1930 CUDTSocket* s = i->second;
1931 if (s->m_GroupOf == g)
1932 {
1933 HLOGC(smlog.Debug, log << "deleteGroup: IPE: existing @" << s->m_SocketID << " points to a dead group!");
1934 s->m_GroupOf = NULL;
1935 s->m_GroupMemberData = NULL;
1936 }
1937 }
1938
1939 // Just in case, do it in closed sockets, too, although this should be
1940 // always done before moving to it.
1941 for (sockets_t::iterator i = m_ClosedSockets.begin();
1942 i != m_ClosedSockets.end(); ++ i)
1943 {
1944 CUDTSocket* s = i->second;
1945 if (s->m_GroupOf == g)
1946 {
1947 HLOGC(smlog.Debug, log << "deleteGroup: IPE: closed @" << s->m_SocketID << " points to a dead group!");
1948 s->m_GroupOf = NULL;
1949 s->m_GroupMemberData = NULL;
1950 }
1951 }
1952 }
1953 #endif
1954
close(CUDTSocket * s)1955 int srt::CUDTUnited::close(CUDTSocket* s)
1956 {
1957 HLOGC(smlog.Debug, log << s->core().CONID() << " CLOSE. Acquiring control lock");
1958 ScopedLock socket_cg(s->m_ControlLock);
1959 HLOGC(smlog.Debug, log << s->core().CONID() << " CLOSING (removing from listening, closing CUDT)");
1960
1961 const bool synch_close_snd = s->core().m_config.bSynSending;
1962
1963 SRTSOCKET u = s->m_SocketID;
1964
1965 if (s->m_Status == SRTS_LISTENING)
1966 {
1967 if (s->core().m_bBroken)
1968 return 0;
1969
1970 s->m_tsClosureTimeStamp = steady_clock::now();
1971 s->core().m_bBroken = true;
1972
1973 // Change towards original UDT:
1974 // Leave all the closing activities for garbageCollect to happen,
1975 // however remove the listener from the RcvQueue IMMEDIATELY.
1976 // Even though garbageCollect would eventually remove the listener
1977 // as well, there would be some time interval between now and the
1978 // moment when it's done, and during this time the application will
1979 // be unable to bind to this port that the about-to-delete listener
1980 // is currently occupying (due to blocked slot in the RcvQueue).
1981
1982 HLOGC(smlog.Debug, log << s->core().CONID() << " CLOSING (removing listener immediately)");
1983 s->core().notListening();
1984
1985 // broadcast all "accept" waiting
1986 CSync::lock_broadcast(s->m_AcceptCond, s->m_AcceptLock);
1987 }
1988 else
1989 {
1990 // Note: this call may be done on a socket that hasn't finished
1991 // sending all packets scheduled for sending, which means, this call
1992 // may block INDEFINITELY. As long as it's acceptable to block the
1993 // call to srt_close(), and all functions in all threads where this
1994 // very socket is used, this shall not block the central database.
1995 s->core().closeInternal();
1996
1997 // synchronize with garbage collection.
1998 HLOGC(smlog.Debug, log << "@" << u << "U::close done. GLOBAL CLOSE: " << s->core().CONID() << ". Acquiring GLOBAL control lock");
1999 ScopedLock manager_cg(m_GlobControlLock);
2000 // since "s" is located before m_GlobControlLock, locate it again in case
2001 // it became invalid
2002 // XXX This is very weird; if we state that the CUDTSocket object
2003 // could not be deleted between locks, then definitely it couldn't
2004 // also change the pointer value. There's no other reason for getting
2005 // this iterator but to obtain the 's' pointer, which is impossible to
2006 // be different than previous 's' (m_Sockets is a map that stores pointers
2007 // transparently). This iterator isn't even later used to delete the socket
2008 // from the container, though it would be more efficient.
2009 // FURTHER RESEARCH REQUIRED.
2010 sockets_t::iterator i = m_Sockets.find(u);
2011 if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED))
2012 {
2013 HLOGC(smlog.Debug, log << "@" << u << "U::close: NOT AN ACTIVE SOCKET, returning.");
2014 return 0;
2015 }
2016 s = i->second;
2017 s->setClosed();
2018
2019 #if ENABLE_EXPERIMENTAL_BONDING
2020 if (s->m_GroupOf)
2021 {
2022 HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
2023 s->removeFromGroup(true);
2024 }
2025 #endif
2026
2027 m_Sockets.erase(s->m_SocketID);
2028 m_ClosedSockets[s->m_SocketID] = s;
2029 HLOGC(smlog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later.");
2030
2031 CGlobEvent::triggerEvent();
2032 }
2033
2034 HLOGC(smlog.Debug, log << "@" << u << ": GLOBAL: CLOSING DONE");
2035
2036 // Check if the ID is still in closed sockets before you access it
2037 // (the last triggerEvent could have deleted it).
2038 if ( synch_close_snd )
2039 {
2040 #if SRT_ENABLE_CLOSE_SYNCH
2041
2042 HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sync-waiting for releasing sender resources...");
2043 for (;;)
2044 {
2045 CSndBuffer* sb = s->core().m_pSndBuffer;
2046
2047 // Disconnected from buffer - nothing more to check.
2048 if (!sb)
2049 {
2050 HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer disconnected. Allowed to close.");
2051 break;
2052 }
2053
2054 // Sender buffer empty
2055 if (sb->getCurrBufSize() == 0)
2056 {
2057 HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer depleted. Allowed to close.");
2058 break;
2059 }
2060
2061 // Ok, now you are keeping GC thread hands off the internal data.
2062 // You can check then if it has already deleted the socket or not.
2063 // The socket is either in m_ClosedSockets or is already gone.
2064
2065 // Done the other way, but still done. You can stop waiting.
2066 bool isgone = false;
2067 {
2068 ScopedLock manager_cg(m_GlobControlLock);
2069 isgone = m_ClosedSockets.count(u) == 0;
2070 }
2071 if (!isgone)
2072 {
2073 isgone = !s->core().m_bOpened;
2074 }
2075 if (isgone)
2076 {
2077 HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: ... gone in the meantime, whatever. Exiting close().");
2078 break;
2079 }
2080
2081 HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: ... still waiting for any update.");
2082 // How to handle a possible error here?
2083 CGlobEvent::waitForEvent();
2084
2085 // Continue waiting in case when an event happened or 1s waiting time passed for checkpoint.
2086 }
2087 #endif
2088 }
2089
2090 /*
2091 This code is PUT ASIDE for now.
2092 Most likely this will be never required.
2093 It had to hold the closing activity until the time when the receiver buffer is depleted.
2094 However the closing of the socket should only happen when the receiver has received
2095 an information about that the reading is no longer possible (error report from recv/recvfile).
2096 When this happens, the receiver buffer is definitely depleted already and there's no need to check
2097 anything.
2098
2099 Should there appear any other conditions in future under which the closing process should be
2100 delayed until the receiver buffer is empty, this code can be filled here.
2101
2102 if ( synch_close_rcv )
2103 {
2104 ...
2105 }
2106 */
2107
2108 return 0;
2109 }
2110
getpeername(const SRTSOCKET u,sockaddr * pw_name,int * pw_namelen)2111 void srt::CUDTUnited::getpeername(const SRTSOCKET u, sockaddr* pw_name, int* pw_namelen)
2112 {
2113 if (!pw_name || !pw_namelen)
2114 throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
2115
2116 if (getStatus(u) != SRTS_CONNECTED)
2117 throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
2118
2119 CUDTSocket* s = locateSocket(u);
2120
2121 if (!s)
2122 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2123
2124 if (!s->core().m_bConnected || s->core().m_bBroken)
2125 throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
2126
2127 const int len = s->m_PeerAddr.size();
2128 if (*pw_namelen < len)
2129 throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
2130
2131 memcpy((pw_name), &s->m_PeerAddr.sa, len);
2132 *pw_namelen = len;
2133 }
2134
getsockname(const SRTSOCKET u,sockaddr * pw_name,int * pw_namelen)2135 void srt::CUDTUnited::getsockname(const SRTSOCKET u, sockaddr* pw_name, int* pw_namelen)
2136 {
2137 if (!pw_name || !pw_namelen)
2138 throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
2139
2140 CUDTSocket* s = locateSocket(u);
2141
2142 if (!s)
2143 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2144
2145 if (s->core().m_bBroken)
2146 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2147
2148 if (s->m_Status == SRTS_INIT)
2149 throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
2150
2151 const int len = s->m_SelfAddr.size();
2152 if (*pw_namelen < len)
2153 throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
2154
2155 memcpy((pw_name), &s->m_SelfAddr.sa, len);
2156 *pw_namelen = len;
2157 }
2158
select(UDT::UDSET * readfds,UDT::UDSET * writefds,UDT::UDSET * exceptfds,const timeval * timeout)2159 int srt::CUDTUnited::select(
2160 UDT::UDSET* readfds, UDT::UDSET* writefds, UDT::UDSET* exceptfds, const timeval* timeout)
2161 {
2162 const steady_clock::time_point entertime = steady_clock::now();
2163
2164 const int64_t timeo_us = timeout
2165 ? static_cast<int64_t>(timeout->tv_sec) * 1000000 + timeout->tv_usec
2166 : -1;
2167 const steady_clock::duration timeo(microseconds_from(timeo_us));
2168
2169 // initialize results
2170 int count = 0;
2171 set<SRTSOCKET> rs, ws, es;
2172
2173 // retrieve related UDT sockets
2174 vector<CUDTSocket*> ru, wu, eu;
2175 CUDTSocket* s;
2176 if (readfds)
2177 for (set<SRTSOCKET>::iterator i1 = readfds->begin();
2178 i1 != readfds->end(); ++ i1)
2179 {
2180 if (getStatus(*i1) == SRTS_BROKEN)
2181 {
2182 rs.insert(*i1);
2183 ++ count;
2184 }
2185 else if (!(s = locateSocket(*i1)))
2186 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2187 else
2188 ru.push_back(s);
2189 }
2190 if (writefds)
2191 for (set<SRTSOCKET>::iterator i2 = writefds->begin();
2192 i2 != writefds->end(); ++ i2)
2193 {
2194 if (getStatus(*i2) == SRTS_BROKEN)
2195 {
2196 ws.insert(*i2);
2197 ++ count;
2198 }
2199 else if (!(s = locateSocket(*i2)))
2200 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2201 else
2202 wu.push_back(s);
2203 }
2204 if (exceptfds)
2205 for (set<SRTSOCKET>::iterator i3 = exceptfds->begin();
2206 i3 != exceptfds->end(); ++ i3)
2207 {
2208 if (getStatus(*i3) == SRTS_BROKEN)
2209 {
2210 es.insert(*i3);
2211 ++ count;
2212 }
2213 else if (!(s = locateSocket(*i3)))
2214 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2215 else
2216 eu.push_back(s);
2217 }
2218
2219 do
2220 {
2221 // query read sockets
2222 for (vector<CUDTSocket*>::iterator j1 = ru.begin(); j1 != ru.end(); ++ j1)
2223 {
2224 s = *j1;
2225
2226 if (s->readReady() || s->m_Status == SRTS_CLOSED)
2227 {
2228 rs.insert(s->m_SocketID);
2229 ++ count;
2230 }
2231 }
2232
2233 // query write sockets
2234 for (vector<CUDTSocket*>::iterator j2 = wu.begin(); j2 != wu.end(); ++ j2)
2235 {
2236 s = *j2;
2237
2238 if (s->writeReady() || s->m_Status == SRTS_CLOSED)
2239 {
2240 ws.insert(s->m_SocketID);
2241 ++ count;
2242 }
2243 }
2244
2245 // query exceptions on sockets
2246 for (vector<CUDTSocket*>::iterator j3 = eu.begin(); j3 != eu.end(); ++ j3)
2247 {
2248 // check connection request status, not supported now
2249 }
2250
2251 if (0 < count)
2252 break;
2253
2254 CGlobEvent::waitForEvent();
2255 } while (timeo > steady_clock::now() - entertime);
2256
2257 if (readfds)
2258 *readfds = rs;
2259
2260 if (writefds)
2261 *writefds = ws;
2262
2263 if (exceptfds)
2264 *exceptfds = es;
2265
2266 return count;
2267 }
2268
selectEx(const vector<SRTSOCKET> & fds,vector<SRTSOCKET> * readfds,vector<SRTSOCKET> * writefds,vector<SRTSOCKET> * exceptfds,int64_t msTimeOut)2269 int srt::CUDTUnited::selectEx(
2270 const vector<SRTSOCKET>& fds,
2271 vector<SRTSOCKET>* readfds,
2272 vector<SRTSOCKET>* writefds,
2273 vector<SRTSOCKET>* exceptfds,
2274 int64_t msTimeOut)
2275 {
2276 const steady_clock::time_point entertime = steady_clock::now();
2277
2278 const int64_t timeo_us = msTimeOut >= 0
2279 ? msTimeOut * 1000
2280 : -1;
2281 const steady_clock::duration timeo(microseconds_from(timeo_us));
2282
2283 // initialize results
2284 int count = 0;
2285 if (readfds)
2286 readfds->clear();
2287 if (writefds)
2288 writefds->clear();
2289 if (exceptfds)
2290 exceptfds->clear();
2291
2292 do
2293 {
2294 for (vector<SRTSOCKET>::const_iterator i = fds.begin();
2295 i != fds.end(); ++ i)
2296 {
2297 CUDTSocket* s = locateSocket(*i);
2298
2299 if ((!s) || s->core().m_bBroken || (s->m_Status == SRTS_CLOSED))
2300 {
2301 if (exceptfds)
2302 {
2303 exceptfds->push_back(*i);
2304 ++ count;
2305 }
2306 continue;
2307 }
2308
2309 if (readfds)
2310 {
2311 if ((s->core().m_bConnected
2312 && s->core().m_pRcvBuffer->isRcvDataReady()
2313 )
2314 || (s->core().m_bListening
2315 && (s->m_QueuedSockets.size() > 0)))
2316 {
2317 readfds->push_back(s->m_SocketID);
2318 ++ count;
2319 }
2320 }
2321
2322 if (writefds)
2323 {
2324 if (s->core().m_bConnected
2325 && (s->core().m_pSndBuffer->getCurrBufSize()
2326 < s->core().m_config.iSndBufSize))
2327 {
2328 writefds->push_back(s->m_SocketID);
2329 ++ count;
2330 }
2331 }
2332 }
2333
2334 if (count > 0)
2335 break;
2336
2337 CGlobEvent::waitForEvent();
2338 } while (timeo > steady_clock::now() - entertime);
2339
2340 return count;
2341 }
2342
epoll_create()2343 int srt::CUDTUnited::epoll_create()
2344 {
2345 return m_EPoll.create();
2346 }
2347
epoll_clear_usocks(int eid)2348 int srt::CUDTUnited::epoll_clear_usocks(int eid)
2349 {
2350 return m_EPoll.clear_usocks(eid);
2351 }
2352
epoll_add_usock(const int eid,const SRTSOCKET u,const int * events)2353 int srt::CUDTUnited::epoll_add_usock(
2354 const int eid, const SRTSOCKET u, const int* events)
2355 {
2356 int ret = -1;
2357 #if ENABLE_EXPERIMENTAL_BONDING
2358 if (u & SRTGROUP_MASK)
2359 {
2360 GroupKeeper k (*this, u, ERH_THROW);
2361 ret = m_EPoll.update_usock(eid, u, events);
2362 k.group->addEPoll(eid);
2363 return 0;
2364 }
2365 #endif
2366
2367 CUDTSocket* s = locateSocket(u);
2368 if (s)
2369 {
2370 ret = epoll_add_usock_INTERNAL(eid, s, events);
2371 }
2372 else
2373 {
2374 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL);
2375 }
2376
2377 return ret;
2378 }
2379
2380 // NOTE: WILL LOCK (serially):
2381 // - CEPoll::m_EPollLock
2382 // - CUDT::m_RecvLock
epoll_add_usock_INTERNAL(const int eid,CUDTSocket * s,const int * events)2383 int srt::CUDTUnited::epoll_add_usock_INTERNAL(const int eid, CUDTSocket* s, const int* events)
2384 {
2385 int ret = m_EPoll.update_usock(eid, s->m_SocketID, events);
2386 s->core().addEPoll(eid);
2387 return ret;
2388 }
2389
epoll_add_ssock(const int eid,const SYSSOCKET s,const int * events)2390 int srt::CUDTUnited::epoll_add_ssock(
2391 const int eid, const SYSSOCKET s, const int* events)
2392 {
2393 return m_EPoll.add_ssock(eid, s, events);
2394 }
2395
epoll_update_ssock(const int eid,const SYSSOCKET s,const int * events)2396 int srt::CUDTUnited::epoll_update_ssock(
2397 const int eid, const SYSSOCKET s, const int* events)
2398 {
2399 return m_EPoll.update_ssock(eid, s, events);
2400 }
2401
2402 template <class EntityType>
epoll_remove_entity(const int eid,EntityType * ent)2403 int srt::CUDTUnited::epoll_remove_entity(const int eid, EntityType* ent)
2404 {
2405 // XXX Not sure if this is anyhow necessary because setting readiness
2406 // to false doesn't actually trigger any action. Further research needed.
2407 HLOGC(ealog.Debug, log << "epoll_remove_usock: CLEARING readiness on E" << eid << " of @" << ent->id());
2408 ent->removeEPollEvents(eid);
2409
2410 // First remove the EID from the subscribed in the socket so that
2411 // a possible call to update_events:
2412 // - if happens before this call, can find the epoll bit update possible
2413 // - if happens after this call, will not strike this EID
2414 HLOGC(ealog.Debug, log << "epoll_remove_usock: REMOVING E" << eid << " from back-subscirbers in @" << ent->id());
2415 ent->removeEPollID(eid);
2416
2417 HLOGC(ealog.Debug, log << "epoll_remove_usock: CLEARING subscription on E" << eid << " of @" << ent->id());
2418 int no_events = 0;
2419 int ret = m_EPoll.update_usock(eid, ent->id(), &no_events);
2420
2421 return ret;
2422 }
2423
2424 // Needed internal access!
epoll_remove_socket_INTERNAL(const int eid,CUDTSocket * s)2425 int srt::CUDTUnited::epoll_remove_socket_INTERNAL(const int eid, CUDTSocket* s)
2426 {
2427 return epoll_remove_entity(eid, &s->core());
2428 }
2429
2430 #if ENABLE_EXPERIMENTAL_BONDING
epoll_remove_group_INTERNAL(const int eid,CUDTGroup * g)2431 int srt::CUDTUnited::epoll_remove_group_INTERNAL(const int eid, CUDTGroup* g)
2432 {
2433 return epoll_remove_entity(eid, g);
2434 }
2435 #endif
2436
epoll_remove_usock(const int eid,const SRTSOCKET u)2437 int srt::CUDTUnited::epoll_remove_usock(const int eid, const SRTSOCKET u)
2438 {
2439 CUDTSocket* s = 0;
2440
2441 #if ENABLE_EXPERIMENTAL_BONDING
2442 CUDTGroup* g = 0;
2443 if (u & SRTGROUP_MASK)
2444 {
2445 GroupKeeper k (*this, u, ERH_THROW);
2446 g = k.group;
2447 return epoll_remove_entity(eid, g);
2448 }
2449 else
2450 #endif
2451 {
2452 s = locateSocket(u);
2453 if (s)
2454 return epoll_remove_entity(eid, &s->core());
2455 }
2456
2457 LOGC(ealog.Error, log << "remove_usock: @" << u
2458 << " not found as either socket or group. Removing only from epoll system.");
2459 int no_events = 0;
2460 return m_EPoll.update_usock(eid, u, &no_events);
2461 }
2462
epoll_remove_ssock(const int eid,const SYSSOCKET s)2463 int srt::CUDTUnited::epoll_remove_ssock(const int eid, const SYSSOCKET s)
2464 {
2465 return m_EPoll.remove_ssock(eid, s);
2466 }
2467
epoll_uwait(const int eid,SRT_EPOLL_EVENT * fdsSet,int fdsSize,int64_t msTimeOut)2468 int srt::CUDTUnited::epoll_uwait(
2469 const int eid,
2470 SRT_EPOLL_EVENT* fdsSet,
2471 int fdsSize,
2472 int64_t msTimeOut)
2473 {
2474 return m_EPoll.uwait(eid, fdsSet, fdsSize, msTimeOut);
2475 }
2476
epoll_set(int eid,int32_t flags)2477 int32_t srt::CUDTUnited::epoll_set(int eid, int32_t flags)
2478 {
2479 return m_EPoll.setflags(eid, flags);
2480 }
2481
epoll_release(const int eid)2482 int srt::CUDTUnited::epoll_release(const int eid)
2483 {
2484 return m_EPoll.release(eid);
2485 }
2486
locateSocket(const SRTSOCKET u,ErrorHandling erh)2487 srt::CUDTSocket* srt::CUDTUnited::locateSocket(const SRTSOCKET u, ErrorHandling erh)
2488 {
2489 ScopedLock cg (m_GlobControlLock);
2490 CUDTSocket* s = locateSocket_LOCKED(u);
2491 if (!s)
2492 {
2493 if (erh == ERH_RETURN)
2494 return NULL;
2495 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2496 }
2497
2498 return s;
2499 }
2500
2501 // [[using locked(m_GlobControlLock)]];
locateSocket_LOCKED(SRTSOCKET u)2502 srt::CUDTSocket* srt::CUDTUnited::locateSocket_LOCKED(SRTSOCKET u)
2503 {
2504 sockets_t::iterator i = m_Sockets.find(u);
2505
2506 if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED))
2507 {
2508 return NULL;
2509 }
2510
2511 return i->second;
2512 }
2513
2514 #if ENABLE_EXPERIMENTAL_BONDING
locateAcquireGroup(SRTSOCKET u,ErrorHandling erh)2515 srt::CUDTGroup* srt::CUDTUnited::locateAcquireGroup(SRTSOCKET u, ErrorHandling erh)
2516 {
2517 ScopedLock cg (m_GlobControlLock);
2518
2519 const groups_t::iterator i = m_Groups.find(u);
2520 if ( i == m_Groups.end() )
2521 {
2522 if (erh == ERH_THROW)
2523 throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2524 return NULL;
2525 }
2526
2527 ScopedLock cgroup (*i->second->exp_groupLock());
2528 i->second->apiAcquire();
2529 return i->second;
2530 }
2531
acquireSocketsGroup(CUDTSocket * s)2532 srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s)
2533 {
2534 ScopedLock cg (m_GlobControlLock);
2535 CUDTGroup* g = s->m_GroupOf;
2536 if (!g)
2537 return NULL;
2538
2539 // With m_GlobControlLock locked, we are sure the group
2540 // still exists, if it wasn't removed from this socket.
2541 g->apiAcquire();
2542 return g;
2543 }
2544 #endif
2545
locatePeer(const sockaddr_any & peer,const SRTSOCKET id,int32_t isn)2546 srt::CUDTSocket* srt::CUDTUnited::locatePeer(
2547 const sockaddr_any& peer,
2548 const SRTSOCKET id,
2549 int32_t isn)
2550 {
2551 ScopedLock cg(m_GlobControlLock);
2552
2553 map<int64_t, set<SRTSOCKET> >::iterator i = m_PeerRec.find(
2554 CUDTSocket::getPeerSpec(id, isn));
2555 if (i == m_PeerRec.end())
2556 return NULL;
2557
2558 for (set<SRTSOCKET>::iterator j = i->second.begin();
2559 j != i->second.end(); ++ j)
2560 {
2561 sockets_t::iterator k = m_Sockets.find(*j);
2562 // this socket might have been closed and moved m_ClosedSockets
2563 if (k == m_Sockets.end())
2564 continue;
2565
2566 if (k->second->m_PeerAddr == peer)
2567 {
2568 return k->second;
2569 }
2570 }
2571
2572 return NULL;
2573 }
2574
checkBrokenSockets()2575 void srt::CUDTUnited::checkBrokenSockets()
2576 {
2577 ScopedLock cg(m_GlobControlLock);
2578
2579 // set of sockets To Be Closed and To Be Removed
2580 vector<SRTSOCKET> tbc;
2581 vector<SRTSOCKET> tbr;
2582
2583 #if ENABLE_EXPERIMENTAL_BONDING
2584 vector<SRTSOCKET> delgids;
2585
2586 for (groups_t::iterator i = m_ClosedGroups.begin(); i != m_ClosedGroups.end(); ++i)
2587 {
2588 // isStillBusy requires lock on the group, so only after an API
2589 // function that uses it returns, and so clears the busy flag,
2590 // a new API function won't be called anyway until it can acquire
2591 // GlobControlLock, and all functions that have already seen this
2592 // group as closing will not continue with the API and return.
2593 // If we caught some API function still using the closed group,
2594 // it's not going to wait, will be checked next time.
2595 if (i->second->isStillBusy())
2596 continue;
2597
2598 delgids.push_back(i->first);
2599 delete i->second;
2600 i->second = NULL; // just for a case, avoid a dangling pointer
2601 }
2602
2603 for (vector<SRTSOCKET>::iterator di = delgids.begin(); di != delgids.end(); ++di)
2604 {
2605 m_ClosedGroups.erase(*di);
2606 }
2607
2608 #endif
2609
2610 for (sockets_t::iterator i = m_Sockets.begin();
2611 i != m_Sockets.end(); ++ i)
2612 {
2613 CUDTSocket* s = i->second;
2614
2615 // check broken connection
2616 if (s->core().m_bBroken)
2617 {
2618 if (s->m_Status == SRTS_LISTENING)
2619 {
2620 const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp;
2621 // for a listening socket, it should wait an extra 3 seconds
2622 // in case a client is connecting
2623 if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
2624 {
2625 continue;
2626 }
2627 }
2628 else if ((s->core().m_pRcvBuffer != NULL)
2629 // FIXED: calling isRcvDataAvailable() just to get the information
2630 // whether there are any data waiting in the buffer,
2631 // NOT WHETHER THEY ARE ALSO READY TO PLAY at the time when
2632 // this function is called (isRcvDataReady also checks if the
2633 // available data is "ready to play").
2634 && s->core().m_pRcvBuffer->isRcvDataAvailable())
2635 {
2636 const int bc = s->core().m_iBrokenCounter.load();
2637 if (bc > 0)
2638 {
2639 // HLOGF(smlog.Debug, "STILL KEEPING socket (still have data):
2640 // %d\n", i->first);
2641 // if there is still data in the receiver buffer, wait longer
2642 s->core().m_iBrokenCounter.store(bc - 1);
2643 continue;
2644 }
2645 }
2646
2647 #if ENABLE_EXPERIMENTAL_BONDING
2648 if (s->m_GroupOf)
2649 {
2650 LOGC(smlog.Note, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
2651 s->removeFromGroup(true);
2652 }
2653 #endif
2654
2655 HLOGC(smlog.Debug, log << "checkBrokenSockets: moving BROKEN socket to CLOSED: @" << i->first);
2656
2657 //close broken connections and start removal timer
2658 s->setClosed();
2659 tbc.push_back(i->first);
2660 m_ClosedSockets[i->first] = s;
2661
2662 // remove from listener's queue
2663 sockets_t::iterator ls = m_Sockets.find(s->m_ListenSocket);
2664 if (ls == m_Sockets.end())
2665 {
2666 ls = m_ClosedSockets.find(s->m_ListenSocket);
2667 if (ls == m_ClosedSockets.end())
2668 continue;
2669 }
2670
2671 enterCS(ls->second->m_AcceptLock);
2672 ls->second->m_QueuedSockets.erase(s->m_SocketID);
2673 leaveCS(ls->second->m_AcceptLock);
2674 }
2675 }
2676
2677 for (sockets_t::iterator j = m_ClosedSockets.begin();
2678 j != m_ClosedSockets.end(); ++ j)
2679 {
2680 // HLOGF(smlog.Debug, "checking CLOSED socket: %d\n", j->first);
2681 if (!is_zero(j->second->core().m_tsLingerExpiration))
2682 {
2683 // asynchronous close:
2684 if ((!j->second->core().m_pSndBuffer)
2685 || (0 == j->second->core().m_pSndBuffer->getCurrBufSize())
2686 || (j->second->core().m_tsLingerExpiration <= steady_clock::now()))
2687 {
2688 HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << j->second->m_SocketID);
2689 j->second->core().m_tsLingerExpiration = steady_clock::time_point();
2690 j->second->core().m_bClosing = true;
2691 j->second->m_tsClosureTimeStamp = steady_clock::now();
2692 }
2693 }
2694
2695 // timeout 1 second to destroy a socket AND it has been removed from
2696 // RcvUList
2697 const steady_clock::time_point now = steady_clock::now();
2698 const steady_clock::duration closed_ago = now - j->second->m_tsClosureTimeStamp;
2699 if (closed_ago > seconds_from(1))
2700 {
2701 CRNode* rnode = j->second->core().m_pRNode;
2702 if (!rnode || !rnode->m_bOnList)
2703 {
2704 HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << j->second->m_SocketID << " closed "
2705 << FormatDuration(closed_ago) << " ago and removed from RcvQ - will remove");
2706
2707 // HLOGF(smlog.Debug, "will unref socket: %d\n", j->first);
2708 tbr.push_back(j->first);
2709 }
2710 }
2711 }
2712
2713 // move closed sockets to the ClosedSockets structure
2714 for (vector<SRTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k)
2715 m_Sockets.erase(*k);
2716
2717 // remove those timeout sockets
2718 for (vector<SRTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)
2719 removeSocket(*l);
2720 }
2721
2722 // [[using locked(m_GlobControlLock)]]
removeSocket(const SRTSOCKET u)2723 void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
2724 {
2725 sockets_t::iterator i = m_ClosedSockets.find(u);
2726
2727 // invalid socket ID
2728 if (i == m_ClosedSockets.end())
2729 return;
2730
2731 CUDTSocket* const s = i->second;
2732
2733 // The socket may be in the trashcan now, but could
2734 // still be under processing in the sender/receiver worker
2735 // threads. If that's the case, SKIP IT THIS TIME. The
2736 // socket will be checked next time the GC rollover starts.
2737 CSNode* sn = s->core().m_pSNode;
2738 if (sn && sn->m_iHeapLoc != -1)
2739 return;
2740
2741 CRNode* rn = s->core().m_pRNode;
2742 if (rn && rn->m_bOnList)
2743 return;
2744
2745
2746 #if ENABLE_EXPERIMENTAL_BONDING
2747 if (s->m_GroupOf)
2748 {
2749 HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
2750 s->removeFromGroup(true);
2751 }
2752 #endif
2753 // decrease multiplexer reference count, and remove it if necessary
2754 const int mid = s->m_iMuxID;
2755
2756 {
2757 ScopedLock cg(s->m_AcceptLock);
2758
2759 // if it is a listener, close all un-accepted sockets in its queue
2760 // and remove them later
2761 for (set<SRTSOCKET>::iterator q = s->m_QueuedSockets.begin();
2762 q != s->m_QueuedSockets.end(); ++ q)
2763 {
2764 sockets_t::iterator si = m_Sockets.find(*q);
2765 if (si == m_Sockets.end())
2766 {
2767 // gone in the meantime
2768 LOGC(smlog.Error, log << "removeSocket: IPE? socket @" << (*q)
2769 << " being queued for listener socket @" << s->m_SocketID
2770 << " is GONE in the meantime ???");
2771 continue;
2772 }
2773
2774 CUDTSocket* as = si->second;
2775
2776 as->breakSocket_LOCKED();
2777 m_ClosedSockets[*q] = as;
2778 m_Sockets.erase(*q);
2779 }
2780 }
2781
2782 // remove from peer rec
2783 map<int64_t, set<SRTSOCKET> >::iterator j = m_PeerRec.find(
2784 s->getPeerSpec());
2785 if (j != m_PeerRec.end())
2786 {
2787 j->second.erase(u);
2788 if (j->second.empty())
2789 m_PeerRec.erase(j);
2790 }
2791
2792 /*
2793 * Socket may be deleted while still having ePoll events set that would
2794 * remains forever causing epoll_wait to unblock continuously for inexistent
2795 * sockets. Get rid of all events for this socket.
2796 */
2797 m_EPoll.update_events(u, s->core().m_sPollID,
2798 SRT_EPOLL_IN|SRT_EPOLL_OUT|SRT_EPOLL_ERR, false);
2799
2800 // delete this one
2801 m_ClosedSockets.erase(i);
2802
2803 HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u);
2804 s->core().closeInternal();
2805 HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u);
2806 delete s;
2807
2808 if (mid == -1)
2809 return;
2810
2811 map<int, CMultiplexer>::iterator m;
2812 m = m_mMultiplexer.find(mid);
2813 if (m == m_mMultiplexer.end())
2814 {
2815 LOGC(smlog.Fatal, log << "IPE: For socket @" << u << " MUXER id=" << mid << " NOT FOUND!");
2816 return;
2817 }
2818
2819 CMultiplexer& mx = m->second;
2820
2821 mx.m_iRefCount --;
2822 // HLOGF(smlog.Debug, "unrefing underlying socket for %u: %u\n",
2823 // u, mx.m_iRefCount);
2824 if (0 == mx.m_iRefCount)
2825 {
2826 HLOGC(smlog.Debug, log << "MUXER id=" << mid << " lost last socket @"
2827 << u << " - deleting muxer bound to port "
2828 << mx.m_pChannel->bindAddressAny().hport());
2829 // The channel has no access to the queues and
2830 // it looks like the multiplexer is the master of all of them.
2831 // The queues must be silenced before closing the channel
2832 // because this will cause error to be returned in any operation
2833 // being currently done in the queues, if any.
2834 mx.m_pSndQueue->setClosing();
2835 mx.m_pRcvQueue->setClosing();
2836 mx.destroy();
2837 m_mMultiplexer.erase(m);
2838 }
2839 }
2840
configureMuxer(CMultiplexer & w_m,const CUDTSocket * s,int af)2841 void srt::CUDTUnited::configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af)
2842 {
2843 w_m.m_mcfg = s->core().m_config;
2844 w_m.m_iIPversion = af;
2845 w_m.m_iRefCount = 1;
2846 w_m.m_iID = s->m_SocketID;
2847 }
2848
installMuxer(CUDTSocket * w_s,CMultiplexer & fw_sm)2849 uint16_t srt::CUDTUnited::installMuxer(CUDTSocket* w_s, CMultiplexer& fw_sm)
2850 {
2851 w_s->core().m_pSndQueue = fw_sm.m_pSndQueue;
2852 w_s->core().m_pRcvQueue = fw_sm.m_pRcvQueue;
2853 w_s->m_iMuxID = fw_sm.m_iID;
2854 sockaddr_any sa;
2855 fw_sm.m_pChannel->getSockAddr((sa));
2856 w_s->m_SelfAddr = sa; // Will be also completed later, but here it's needed for later checks
2857 return sa.hport();
2858 }
2859
channelSettingsMatch(const CMultiplexer & m,const CUDTSocket * s)2860 bool srt::CUDTUnited::channelSettingsMatch(const CMultiplexer& m, const CUDTSocket* s)
2861 {
2862 return m.m_mcfg.bReuseAddr && m.m_mcfg == s->core().m_config;
2863 }
2864
updateMux(CUDTSocket * s,const sockaddr_any & addr,const UDPSOCKET * udpsock)2865 void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* udpsock /*[[nullable]]*/)
2866 {
2867 ScopedLock cg(m_GlobControlLock);
2868
2869 // If udpsock is provided, then this socket will be simply
2870 // taken for binding as a good deal. It would be nice to make
2871 // a sanity check to see if this UDP socket isn't already installed
2872 // in some multiplexer, but we state this UDP socket isn't accessible
2873 // anyway so this wouldn't be possible.
2874 if (!udpsock)
2875 {
2876 // If not, we need to see if there exist already a multiplexer bound
2877 // to the same endpoint.
2878 const int port = addr.hport();
2879
2880 bool reuse_attempt = false;
2881 for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin();
2882 i != m_mMultiplexer.end(); ++ i)
2883 {
2884 CMultiplexer& m = i->second;
2885
2886 // First, we need to find a multiplexer with the same port.
2887 if (m.m_iPort != port)
2888 {
2889 HLOGC(smlog.Debug, log << "bind: muxer @" << m.m_iID << " found, but for port "
2890 << m.m_iPort << " (requested port: " << port << ")");
2891 continue;
2892 }
2893
2894 // If this is bound to the wildcard address, it can be reused if:
2895 // - addr is also a wildcard
2896 // - channel settings match
2897 // Otherwise it's a conflict.
2898 sockaddr_any sa;
2899 m.m_pChannel->getSockAddr((sa));
2900
2901 HLOGC(smlog.Debug, log << "bind: Found existing muxer @" << m.m_iID << " : " << sa.str()
2902 << " - check against " << addr.str());
2903
2904 if (sa.isany())
2905 {
2906 if (!addr.isany())
2907 {
2908 LOGC(smlog.Error, log << "bind: Address: " << addr.str()
2909 << " conflicts with existing wildcard binding: " << sa.str());
2910 throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
2911 }
2912
2913 // Still, for ANY you need either the same family, or open
2914 // for families.
2915 if (m.m_mcfg.iIpV6Only != -1 && m.m_mcfg.iIpV6Only != s->core().m_config.iIpV6Only)
2916 {
2917 LOGC(smlog.Error, log << "bind: Address: " << addr.str()
2918 << " conflicts with existing IPv6 wildcard binding: " << sa.str());
2919 throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
2920 }
2921
2922 if ((m.m_mcfg.iIpV6Only == 0 || s->core().m_config.iIpV6Only == 0) && m.m_iIPversion != addr.family())
2923 {
2924 LOGC(smlog.Error, log << "bind: Address: " << addr.str()
2925 << " conflicts with IPv6 wildcard binding: " << sa.str()
2926 << " : family " << (m.m_iIPversion == AF_INET ? "IPv4" : "IPv6")
2927 << " vs. " << (addr.family() == AF_INET ? "IPv4" : "IPv6"));
2928 throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
2929 }
2930 reuse_attempt = true;
2931 HLOGC(smlog.Debug, log << "bind: wildcard address - multiplexer reusable");
2932 }
2933 else if (addr.isany() && addr.family() == sa.family())
2934 {
2935 LOGC(smlog.Error, log << "bind: Wildcard address: " << addr.str()
2936 << " conflicts with existting IP binding: " << sa.str());
2937 throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
2938 }
2939 // If this is bound to a certain address, AND:
2940 else if (sa.equal_address(addr))
2941 {
2942 // - the address is the same as addr
2943 reuse_attempt = true;
2944 HLOGC(smlog.Debug, log << "bind: same IP address - multiplexer reusable");
2945 }
2946 else
2947 {
2948 HLOGC(smlog.Debug, log << "bind: IP addresses differ - ALLOWED to create a new multiplexer");
2949 }
2950 // Otherwise:
2951 // - the address is different than addr
2952 // - the address can't be reused, but this can go on with new one.
2953
2954 // If this is a reusage attempt:
2955 if (reuse_attempt)
2956 {
2957 // - if the channel settings match, it can be reused
2958 if (channelSettingsMatch(m, s))
2959 {
2960 HLOGC(smlog.Debug, log << "bind: reusing multiplexer for port " << port);
2961 // reuse the existing multiplexer
2962 ++ i->second.m_iRefCount;
2963 installMuxer((s), (i->second));
2964 return;
2965 }
2966 else
2967 {
2968 // - if not, it's a conflict
2969 LOGC(smlog.Error, log << "bind: Address: " << addr.str()
2970 << " conflicts with binding: " << sa.str() << " due to channel settings");
2971 throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
2972 }
2973 }
2974 // If not, proceed to the next one, and when there are no reusage
2975 // candidates, proceed with creating a new multiplexer.
2976
2977 // Note that a binding to a different IP address is not treated
2978 // as a candidate for either reuseage or conflict.
2979 }
2980 }
2981
2982 // a new multiplexer is needed
2983 CMultiplexer m;
2984 configureMuxer((m), s, addr.family());
2985
2986 try
2987 {
2988 m.m_pChannel = new CChannel();
2989 m.m_pChannel->setConfig(m.m_mcfg);
2990
2991 if (udpsock)
2992 {
2993 // In this case, addr contains the address
2994 // that has been extracted already from the
2995 // given socket
2996 m.m_pChannel->attach(*udpsock, addr);
2997 }
2998 else if (addr.empty())
2999 {
3000 // The case of previously used case of a NULL address.
3001 // This here is used to pass family only, in this case
3002 // just automatically bind to the "0" address to autoselect
3003 // everything.
3004 m.m_pChannel->open(addr.family());
3005 }
3006 else
3007 {
3008 // If at least the IP address is specified, then bind to that
3009 // address, but still possibly autoselect the outgoing port, if the
3010 // port was specified as 0.
3011 m.m_pChannel->open(addr);
3012 }
3013
3014 m.m_pTimer = new CTimer;
3015 m.m_pSndQueue = new CSndQueue;
3016 m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
3017 m.m_pRcvQueue = new CRcvQueue;
3018 m.m_pRcvQueue->init(
3019 32, s->core().maxPayloadSize(), m.m_iIPversion, 1024,
3020 m.m_pChannel, m.m_pTimer);
3021
3022 // Rewrite the port here, as it might be only known upon return
3023 // from CChannel::open.
3024 m.m_iPort = installMuxer((s), m);
3025 m_mMultiplexer[m.m_iID] = m;
3026 }
3027 catch (const CUDTException&)
3028 {
3029 m.destroy();
3030 throw;
3031 }
3032 catch (...)
3033 {
3034 m.destroy();
3035 throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
3036 }
3037
3038 HLOGC(smlog.Debug, log << "bind: creating new multiplexer for port " << m.m_iPort);
3039 }
3040
3041 // This function is going to find a multiplexer for the port contained
3042 // in the 'ls' listening socket. The multiplexer must exist when the listener
3043 // exists, otherwise the dispatching procedure wouldn't even call this
3044 // function. By historical reasons there's also a fallback for a case when the
3045 // multiplexer wasn't found by id, the search by port number continues.
updateListenerMux(CUDTSocket * s,const CUDTSocket * ls)3046 bool srt::CUDTUnited::updateListenerMux(CUDTSocket* s, const CUDTSocket* ls)
3047 {
3048 ScopedLock cg(m_GlobControlLock);
3049 const int port = ls->m_SelfAddr.hport();
3050
3051 HLOGC(smlog.Debug, log << "updateListenerMux: finding muxer of listener socket @"
3052 << ls->m_SocketID << " muxid=" << ls->m_iMuxID
3053 << " bound=" << ls->m_SelfAddr.str()
3054 << " FOR @" << s->m_SocketID << " addr="
3055 << s->m_SelfAddr.str() << "_->_" << s->m_PeerAddr.str());
3056
3057 // First thing that should be certain here is that there should exist
3058 // a muxer with the ID written in the listener socket's mux ID.
3059
3060 CMultiplexer* mux = map_getp(m_mMultiplexer, ls->m_iMuxID);
3061
3062 // NOTE:
3063 // THIS BELOW CODE is only for a highly unlikely, and probably buggy,
3064 // situation when the Multiplexer wasn't found by ID recorded in the listener.
3065 CMultiplexer* fallback = NULL;
3066 if (!mux)
3067 {
3068 LOGC(smlog.Error, log << "updateListenerMux: IPE? listener muxer not found by ID, trying by port");
3069
3070 // To be used as first found with different IP version
3071
3072 // find the listener's address
3073 for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin();
3074 i != m_mMultiplexer.end(); ++ i)
3075 {
3076 CMultiplexer& m = i->second;
3077
3078 #if ENABLE_HEAVY_LOGGING
3079 ostringstream that_muxer;
3080 that_muxer << "id=" << m.m_iID
3081 << " port=" << m.m_iPort
3082 << " ip=" << (m.m_iIPversion == AF_INET ? "v4" : "v6");
3083 #endif
3084
3085 if (m.m_iPort == port)
3086 {
3087 HLOGC(smlog.Debug, log << "updateListenerMux: reusing muxer: " << that_muxer.str());
3088 if (m.m_iIPversion == s->m_PeerAddr.family())
3089 {
3090 mux = &m; // best match
3091 break;
3092 }
3093 else
3094 {
3095 fallback = &m;
3096 }
3097 }
3098 else
3099 {
3100 HLOGC(smlog.Debug, log << "updateListenerMux: SKIPPING muxer: " << that_muxer.str());
3101 }
3102 }
3103
3104 if (!mux && fallback)
3105 {
3106 // It is allowed to reuse this multiplexer, but the socket must allow both IPv4 and IPv6
3107 if (fallback->m_mcfg.iIpV6Only == 0)
3108 {
3109 HLOGC(smlog.Warn, log << "updateListenerMux: reusing multiplexer from different family");
3110 mux = fallback;
3111 }
3112 }
3113 }
3114
3115 // Checking again because the above procedure could have set it
3116 if (mux)
3117 {
3118 // reuse the existing multiplexer
3119 ++ mux->m_iRefCount;
3120 s->core().m_pSndQueue = mux->m_pSndQueue;
3121 s->core().m_pRcvQueue = mux->m_pRcvQueue;
3122 s->m_iMuxID = mux->m_iID;
3123 return true;
3124 }
3125
3126 return false;
3127 }
3128
garbageCollect(void * p)3129 void* srt::CUDTUnited::garbageCollect(void* p)
3130 {
3131 CUDTUnited* self = (CUDTUnited*)p;
3132
3133 THREAD_STATE_INIT("SRT:GC");
3134
3135 UniqueLock gclock(self->m_GCStopLock);
3136
3137 while (!self->m_bClosing)
3138 {
3139 INCREMENT_THREAD_ITERATIONS();
3140 self->checkBrokenSockets();
3141
3142 HLOGC(inlog.Debug, log << "GC: sleep 1 s");
3143 self->m_GCStopCond.wait_for(gclock, seconds_from(1));
3144 }
3145
3146 // remove all sockets and multiplexers
3147 HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all pending sockets. Acquring control lock...");
3148
3149 {
3150 ScopedLock glock (self->m_GlobControlLock);
3151
3152 for (sockets_t::iterator i = self->m_Sockets.begin();
3153 i != self->m_Sockets.end(); ++ i)
3154 {
3155 CUDTSocket* s = i->second;
3156 s->breakSocket_LOCKED();
3157
3158 #if ENABLE_EXPERIMENTAL_BONDING
3159 if (s->m_GroupOf)
3160 {
3161 HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " (IPE?) - REMOVING FROM GROUP");
3162 s->removeFromGroup(false);
3163 }
3164 #endif
3165 self->m_ClosedSockets[i->first] = s;
3166
3167 // remove from listener's queue
3168 sockets_t::iterator ls = self->m_Sockets.find(
3169 s->m_ListenSocket);
3170 if (ls == self->m_Sockets.end())
3171 {
3172 ls = self->m_ClosedSockets.find(s->m_ListenSocket);
3173 if (ls == self->m_ClosedSockets.end())
3174 continue;
3175 }
3176
3177 enterCS(ls->second->m_AcceptLock);
3178 ls->second->m_QueuedSockets.erase(s->m_SocketID);
3179 leaveCS(ls->second->m_AcceptLock);
3180 }
3181 self->m_Sockets.clear();
3182
3183 for (sockets_t::iterator j = self->m_ClosedSockets.begin();
3184 j != self->m_ClosedSockets.end(); ++ j)
3185 {
3186 j->second->m_tsClosureTimeStamp = steady_clock::time_point();
3187 }
3188 }
3189
3190 HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all CLOSED sockets.");
3191 while (true)
3192 {
3193 self->checkBrokenSockets();
3194
3195 enterCS(self->m_GlobControlLock);
3196 bool empty = self->m_ClosedSockets.empty();
3197 leaveCS(self->m_GlobControlLock);
3198
3199 if (empty)
3200 break;
3201
3202 srt::sync::this_thread::sleep_for(milliseconds_from(1));
3203 }
3204
3205 THREAD_EXIT();
3206 return NULL;
3207 }
3208
3209 ////////////////////////////////////////////////////////////////////////////////
3210
startup()3211 int srt::CUDT::startup()
3212 {
3213 return s_UDTUnited.startup();
3214 }
3215
cleanup()3216 int srt::CUDT::cleanup()
3217 {
3218 return s_UDTUnited.cleanup();
3219 }
3220
socket()3221 SRTSOCKET srt::CUDT::socket()
3222 {
3223 if (!s_UDTUnited.m_bGCStatus)
3224 s_UDTUnited.startup();
3225
3226 try
3227 {
3228 return s_UDTUnited.newSocket();
3229 }
3230 catch (const CUDTException& e)
3231 {
3232 SetThreadLocalError(e);
3233 return INVALID_SOCK;
3234 }
3235 catch (const bad_alloc&)
3236 {
3237 SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
3238 return INVALID_SOCK;
3239 }
3240 catch (const std::exception& ee)
3241 {
3242 LOGC(aclog.Fatal, log << "socket: UNEXPECTED EXCEPTION: "
3243 << typeid(ee).name()
3244 << ": " << ee.what());
3245 SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
3246 return INVALID_SOCK;
3247 }
3248 }
3249
APIError(const CUDTException & e)3250 srt::CUDT::APIError::APIError(const CUDTException& e)
3251 {
3252 SetThreadLocalError(e);
3253 }
3254
APIError(CodeMajor mj,CodeMinor mn,int syserr)3255 srt::CUDT::APIError::APIError(CodeMajor mj, CodeMinor mn, int syserr)
3256 {
3257 SetThreadLocalError(CUDTException(mj, mn, syserr));
3258 }
3259
3260 #if ENABLE_EXPERIMENTAL_BONDING
3261 // This is an internal function; 'type' should be pre-checked if it has a correct value.
3262 // This doesn't have argument of GroupType due to header file conflicts.
3263
3264 // [[using locked(s_UDTUnited.m_GlobControlLock)]]
newGroup(const int type)3265 srt::CUDTGroup& srt::CUDT::newGroup(const int type)
3266 {
3267 const SRTSOCKET id = s_UDTUnited.generateSocketID(true);
3268
3269 // Now map the group
3270 return s_UDTUnited.addGroup(id, SRT_GROUP_TYPE(type)).set_id(id);
3271 }
3272
createGroup(SRT_GROUP_TYPE gt)3273 SRTSOCKET srt::CUDT::createGroup(SRT_GROUP_TYPE gt)
3274 {
3275 // Doing the same lazy-startup as with srt_create_socket()
3276 if (!s_UDTUnited.m_bGCStatus)
3277 s_UDTUnited.startup();
3278
3279 try
3280 {
3281 srt::sync::ScopedLock globlock (s_UDTUnited.m_GlobControlLock);
3282 return newGroup(gt).id();
3283 // Note: potentially, after this function exits, the group
3284 // could be deleted, immediately, from a separate thread (tho
3285 // unlikely because the other thread would need some handle to
3286 // keep it). But then, the first call to any API function would
3287 // return invalid ID error.
3288 }
3289 catch (const CUDTException& e)
3290 {
3291 return APIError(e);
3292 }
3293 catch (...)
3294 {
3295 return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3296 }
3297
3298 return SRT_INVALID_SOCK;
3299 }
3300
3301
addSocketToGroup(SRTSOCKET socket,SRTSOCKET group)3302 int srt::CUDT::addSocketToGroup(SRTSOCKET socket, SRTSOCKET group)
3303 {
3304 // Check if socket and group have been set correctly.
3305 int32_t sid = socket & ~SRTGROUP_MASK;
3306 int32_t gm = group & SRTGROUP_MASK;
3307
3308 if ( sid != socket || gm == 0 )
3309 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3310
3311 // Find the socket and the group
3312 CUDTSocket* s = s_UDTUnited.locateSocket(socket);
3313 CUDTUnited::GroupKeeper k (s_UDTUnited, group, s_UDTUnited.ERH_RETURN);
3314
3315 if (!s || !k.group)
3316 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3317
3318 // Check if the socket is already IN SOME GROUP.
3319 if (s->m_GroupOf)
3320 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3321
3322 CUDTGroup* g = k.group;
3323 if (g->managed())
3324 {
3325 // This can be changed as long as the group is empty.
3326 if (!g->groupEmpty())
3327 {
3328 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3329 }
3330 g->set_managed(false);
3331 }
3332
3333 ScopedLock cg (s->m_ControlLock);
3334 ScopedLock cglob (s_UDTUnited.m_GlobControlLock);
3335 if (g->closing())
3336 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3337
3338 // Check if the socket already is in the group
3339 srt::groups::SocketData* f;
3340 if (g->contains(socket, (f)))
3341 {
3342 // XXX This is internal error. Report it, but continue
3343 LOGC(aclog.Error, log << "IPE (non-fatal): the socket is in the group, but has no clue about it!");
3344 s->m_GroupMemberData = f;
3345 s->m_GroupOf = g;
3346 return 0;
3347 }
3348 s->m_GroupMemberData = g->add(srt::groups::prepareSocketData(s));
3349 s->m_GroupOf = g;
3350
3351 return 0;
3352 }
3353
3354 // dead function as for now. This is only for non-managed
3355 // groups.
removeSocketFromGroup(SRTSOCKET socket)3356 int srt::CUDT::removeSocketFromGroup(SRTSOCKET socket)
3357 {
3358 CUDTSocket* s = s_UDTUnited.locateSocket(socket);
3359 if (!s)
3360 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3361
3362 if (!s->m_GroupOf)
3363 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3364
3365 ScopedLock cg (s->m_ControlLock);
3366 ScopedLock glob_grd (s_UDTUnited.m_GlobControlLock);
3367 s->removeFromGroup(false);
3368 return 0;
3369 }
3370
3371 // [[using locked(m_ControlLock)]]
3372 // [[using locked(CUDT::s_UDTUnited.m_GlobControlLock)]]
removeFromGroup(bool broken)3373 void srt::CUDTSocket::removeFromGroup(bool broken)
3374 {
3375 CUDTGroup* g = m_GroupOf;
3376 if (g)
3377 {
3378 // Reset group-related fields immediately. They won't be accessed
3379 // in the below calls, while the iterator will be invalidated for
3380 // a short moment between removal from the group container and the end,
3381 // while the GroupLock would be already taken out. It is safer to reset
3382 // it to a NULL iterator before removal.
3383 m_GroupOf = NULL;
3384 m_GroupMemberData = NULL;
3385
3386 bool still_have = g->remove(m_SocketID);
3387 if (broken)
3388 {
3389 // Activate the SRT_EPOLL_UPDATE event on the group
3390 // if it was because of a socket that was earlier connected
3391 // and became broken. This is not to be sent in case when
3392 // it is a failure during connection, or the socket was
3393 // explicitly removed from the group.
3394 g->activateUpdateEvent(still_have);
3395 }
3396
3397 HLOGC(smlog.Debug, log << "removeFromGroup: socket @" << m_SocketID << " NO LONGER A MEMBER of $" << g->id() << "; group is "
3398 << (still_have ? "still ACTIVE" : "now EMPTY"));
3399 }
3400 }
3401
getGroupOfSocket(SRTSOCKET socket)3402 SRTSOCKET srt::CUDT::getGroupOfSocket(SRTSOCKET socket)
3403 {
3404 // Lock this for the whole function as we need the group
3405 // to persist the call.
3406 ScopedLock glock (s_UDTUnited.m_GlobControlLock);
3407 CUDTSocket* s = s_UDTUnited.locateSocket_LOCKED(socket);
3408 if (!s || !s->m_GroupOf)
3409 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3410
3411 return s->m_GroupOf->id();
3412 }
3413
configureGroup(SRTSOCKET groupid,const char * str)3414 int srt::CUDT::configureGroup(SRTSOCKET groupid, const char* str)
3415 {
3416 if ( (groupid & SRTGROUP_MASK) == 0)
3417 {
3418 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3419 }
3420
3421 CUDTUnited::GroupKeeper k (s_UDTUnited, groupid, s_UDTUnited.ERH_RETURN);
3422 if (!k.group)
3423 {
3424 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3425 }
3426
3427 return k.group->configure(str);
3428 }
3429
getGroupData(SRTSOCKET groupid,SRT_SOCKGROUPDATA * pdata,size_t * psize)3430 int srt::CUDT::getGroupData(SRTSOCKET groupid, SRT_SOCKGROUPDATA* pdata, size_t* psize)
3431 {
3432 if ((groupid & SRTGROUP_MASK) == 0 || !psize)
3433 {
3434 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3435 }
3436
3437 CUDTUnited::GroupKeeper k (s_UDTUnited, groupid, s_UDTUnited.ERH_RETURN);
3438 if (!k.group)
3439 {
3440 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3441 }
3442
3443 // To get only the size of the group pdata=NULL can be used
3444 return k.group->getGroupData(pdata, psize);
3445 }
3446 #endif
3447
bind(SRTSOCKET u,const sockaddr * name,int namelen)3448 int srt::CUDT::bind(SRTSOCKET u, const sockaddr* name, int namelen)
3449 {
3450 try
3451 {
3452 sockaddr_any sa (name, namelen);
3453 if (sa.len == 0)
3454 {
3455 // This happens if the namelen check proved it to be
3456 // too small for particular family, or that family is
3457 // not recognized (is none of AF_INET, AF_INET6).
3458 // This is a user error.
3459 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3460 }
3461 CUDTSocket* s = s_UDTUnited.locateSocket(u);
3462 if (!s)
3463 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3464
3465 return s_UDTUnited.bind(s, sa);
3466 }
3467 catch (const CUDTException& e)
3468 {
3469 return APIError(e);
3470 }
3471 catch (bad_alloc&)
3472 {
3473 return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3474 }
3475 catch (const std::exception& ee)
3476 {
3477 LOGC(aclog.Fatal, log << "bind: UNEXPECTED EXCEPTION: "
3478 << typeid(ee).name()
3479 << ": " << ee.what());
3480 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3481 }
3482 }
3483
bind(SRTSOCKET u,UDPSOCKET udpsock)3484 int srt::CUDT::bind(SRTSOCKET u, UDPSOCKET udpsock)
3485 {
3486 try
3487 {
3488 CUDTSocket* s = s_UDTUnited.locateSocket(u);
3489 if (!s)
3490 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3491
3492 return s_UDTUnited.bind(s, udpsock);
3493 }
3494 catch (const CUDTException& e)
3495 {
3496 return APIError(e);
3497 }
3498 catch (bad_alloc&)
3499 {
3500 return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3501 }
3502 catch (const std::exception& ee)
3503 {
3504 LOGC(aclog.Fatal, log << "bind/udp: UNEXPECTED EXCEPTION: "
3505 << typeid(ee).name() << ": " << ee.what());
3506 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3507 }
3508 }
3509
listen(SRTSOCKET u,int backlog)3510 int srt::CUDT::listen(SRTSOCKET u, int backlog)
3511 {
3512 try
3513 {
3514 return s_UDTUnited.listen(u, backlog);
3515 }
3516 catch (const CUDTException& e)
3517 {
3518 return APIError(e);
3519 }
3520 catch (bad_alloc&)
3521 {
3522 return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3523 }
3524 catch (const std::exception& ee)
3525 {
3526 LOGC(aclog.Fatal, log << "listen: UNEXPECTED EXCEPTION: "
3527 << typeid(ee).name() << ": " << ee.what());
3528 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3529 }
3530 }
3531
accept_bond(const SRTSOCKET listeners[],int lsize,int64_t msTimeOut)3532 SRTSOCKET srt::CUDT::accept_bond(const SRTSOCKET listeners [], int lsize, int64_t msTimeOut)
3533 {
3534 try
3535 {
3536 return s_UDTUnited.accept_bond(listeners, lsize, msTimeOut);
3537 }
3538 catch (const CUDTException& e)
3539 {
3540 SetThreadLocalError(e);
3541 return INVALID_SOCK;
3542 }
3543 catch (bad_alloc&)
3544 {
3545 SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
3546 return INVALID_SOCK;
3547 }
3548 catch (const std::exception& ee)
3549 {
3550 LOGC(aclog.Fatal, log << "accept_bond: UNEXPECTED EXCEPTION: "
3551 << typeid(ee).name() << ": " << ee.what());
3552 SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
3553 return INVALID_SOCK;
3554 }
3555 }
3556
accept(SRTSOCKET u,sockaddr * addr,int * addrlen)3557 SRTSOCKET srt::CUDT::accept(SRTSOCKET u, sockaddr* addr, int* addrlen)
3558 {
3559 try
3560 {
3561 return s_UDTUnited.accept(u, addr, addrlen);
3562 }
3563 catch (const CUDTException& e)
3564 {
3565 SetThreadLocalError(e);
3566 return INVALID_SOCK;
3567 }
3568 catch (const bad_alloc&)
3569 {
3570 SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
3571 return INVALID_SOCK;
3572 }
3573 catch (const std::exception& ee)
3574 {
3575 LOGC(aclog.Fatal, log << "accept: UNEXPECTED EXCEPTION: "
3576 << typeid(ee).name() << ": " << ee.what());
3577 SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
3578 return INVALID_SOCK;
3579 }
3580 }
3581
connect(SRTSOCKET u,const sockaddr * name,const sockaddr * tname,int namelen)3582 int srt::CUDT::connect(
3583 SRTSOCKET u, const sockaddr* name, const sockaddr* tname, int namelen)
3584 {
3585 try
3586 {
3587 return s_UDTUnited.connect(u, name, tname, namelen);
3588 }
3589 catch (const CUDTException& e)
3590 {
3591 return APIError(e);
3592 }
3593 catch (bad_alloc&)
3594 {
3595 return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3596 }
3597 catch (std::exception& ee)
3598 {
3599 LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: "
3600 << typeid(ee).name() << ": " << ee.what());
3601 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3602 }
3603 }
3604
3605 #if ENABLE_EXPERIMENTAL_BONDING
connectLinks(SRTSOCKET grp,SRT_SOCKGROUPCONFIG targets[],int arraysize)3606 int srt::CUDT::connectLinks(SRTSOCKET grp,
3607 SRT_SOCKGROUPCONFIG targets [], int arraysize)
3608 {
3609 if (arraysize <= 0)
3610 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3611
3612 if ( (grp & SRTGROUP_MASK) == 0)
3613 {
3614 // connectLinks accepts only GROUP id, not socket id.
3615 return APIError(MJ_NOTSUP, MN_SIDINVAL, 0);
3616 }
3617
3618 try
3619 {
3620 CUDTUnited::GroupKeeper k(s_UDTUnited, grp, s_UDTUnited.ERH_THROW);
3621 return s_UDTUnited.groupConnect(k.group, targets, arraysize);
3622 }
3623 catch (CUDTException& e)
3624 {
3625 return APIError(e);
3626 }
3627 catch (bad_alloc&)
3628 {
3629 return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3630 }
3631 catch (std::exception& ee)
3632 {
3633 LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: "
3634 << typeid(ee).name() << ": " << ee.what());
3635 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3636 }
3637 }
3638 #endif
3639
connect(SRTSOCKET u,const sockaddr * name,int namelen,int32_t forced_isn)3640 int srt::CUDT::connect(
3641 SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn)
3642 {
3643 try
3644 {
3645 return s_UDTUnited.connect(u, name, namelen, forced_isn);
3646 }
3647 catch (const CUDTException &e)
3648 {
3649 return APIError(e);
3650 }
3651 catch (bad_alloc&)
3652 {
3653 return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3654 }
3655 catch (const std::exception& ee)
3656 {
3657 LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: "
3658 << typeid(ee).name() << ": " << ee.what());
3659 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3660 }
3661 }
3662
close(SRTSOCKET u)3663 int srt::CUDT::close(SRTSOCKET u)
3664 {
3665 try
3666 {
3667 return s_UDTUnited.close(u);
3668 }
3669 catch (const CUDTException& e)
3670 {
3671 return APIError(e);
3672 }
3673 catch (const std::exception& ee)
3674 {
3675 LOGC(aclog.Fatal, log << "close: UNEXPECTED EXCEPTION: "
3676 << typeid(ee).name() << ": " << ee.what());
3677 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3678 }
3679 }
3680
getpeername(SRTSOCKET u,sockaddr * name,int * namelen)3681 int srt::CUDT::getpeername(SRTSOCKET u, sockaddr* name, int* namelen)
3682 {
3683 try
3684 {
3685 s_UDTUnited.getpeername(u, name, namelen);
3686 return 0;
3687 }
3688 catch (const CUDTException& e)
3689 {
3690 return APIError(e);
3691 }
3692 catch (const std::exception& ee)
3693 {
3694 LOGC(aclog.Fatal, log << "getpeername: UNEXPECTED EXCEPTION: "
3695 << typeid(ee).name() << ": " << ee.what());
3696 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3697 }
3698 }
3699
getsockname(SRTSOCKET u,sockaddr * name,int * namelen)3700 int srt::CUDT::getsockname(SRTSOCKET u, sockaddr* name, int* namelen)
3701 {
3702 try
3703 {
3704 s_UDTUnited.getsockname(u, name, namelen);
3705 return 0;
3706 }
3707 catch (const CUDTException& e)
3708 {
3709 return APIError(e);
3710 }
3711 catch (const std::exception& ee)
3712 {
3713 LOGC(aclog.Fatal, log << "getsockname: UNEXPECTED EXCEPTION: "
3714 << typeid(ee).name() << ": " << ee.what());
3715 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3716 }
3717 }
3718
getsockopt(SRTSOCKET u,int,SRT_SOCKOPT optname,void * pw_optval,int * pw_optlen)3719 int srt::CUDT::getsockopt(
3720 SRTSOCKET u, int, SRT_SOCKOPT optname, void* pw_optval, int* pw_optlen)
3721 {
3722 if (!pw_optval || !pw_optlen)
3723 {
3724 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3725 }
3726
3727 try
3728 {
3729 #if ENABLE_EXPERIMENTAL_BONDING
3730 if (u & SRTGROUP_MASK)
3731 {
3732 CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW);
3733 k.group->getOpt(optname, (pw_optval), (*pw_optlen));
3734 return 0;
3735 }
3736 #endif
3737
3738 CUDT& udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->core();
3739 udt.getOpt(optname, (pw_optval), (*pw_optlen));
3740 return 0;
3741 }
3742 catch (const CUDTException& e)
3743 {
3744 return APIError(e);
3745 }
3746 catch (const std::exception& ee)
3747 {
3748 LOGC(aclog.Fatal, log << "getsockopt: UNEXPECTED EXCEPTION: "
3749 << typeid(ee).name() << ": " << ee.what());
3750 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3751 }
3752 }
3753
setsockopt(SRTSOCKET u,int,SRT_SOCKOPT optname,const void * optval,int optlen)3754 int srt::CUDT::setsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, const void* optval, int optlen)
3755 {
3756 if (!optval)
3757 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3758
3759 try
3760 {
3761 #if ENABLE_EXPERIMENTAL_BONDING
3762 if (u & SRTGROUP_MASK)
3763 {
3764 CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW);
3765 k.group->setOpt(optname, optval, optlen);
3766 return 0;
3767 }
3768 #endif
3769
3770 CUDT& udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->core();
3771 udt.setOpt(optname, optval, optlen);
3772 return 0;
3773 }
3774 catch (const CUDTException& e)
3775 {
3776 return APIError(e);
3777 }
3778 catch (const std::exception& ee)
3779 {
3780 LOGC(aclog.Fatal, log << "setsockopt: UNEXPECTED EXCEPTION: "
3781 << typeid(ee).name() << ": " << ee.what());
3782 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3783 }
3784 }
3785
send(SRTSOCKET u,const char * buf,int len,int)3786 int srt::CUDT::send(SRTSOCKET u, const char* buf, int len, int)
3787 {
3788 SRT_MSGCTRL mctrl = srt_msgctrl_default;
3789 return sendmsg2(u, buf, len, (mctrl));
3790 }
3791
3792 // --> CUDT::recv moved down
3793
sendmsg(SRTSOCKET u,const char * buf,int len,int ttl,bool inorder,int64_t srctime)3794 int srt::CUDT::sendmsg(
3795 SRTSOCKET u, const char* buf, int len, int ttl, bool inorder,
3796 int64_t srctime)
3797 {
3798 SRT_MSGCTRL mctrl = srt_msgctrl_default;
3799 mctrl.msgttl = ttl;
3800 mctrl.inorder = inorder;
3801 mctrl.srctime = srctime;
3802 return sendmsg2(u, buf, len, (mctrl));
3803 }
3804
sendmsg2(SRTSOCKET u,const char * buf,int len,SRT_MSGCTRL & w_m)3805 int srt::CUDT::sendmsg2(
3806 SRTSOCKET u, const char* buf, int len, SRT_MSGCTRL& w_m)
3807 {
3808 try
3809 {
3810 #if ENABLE_EXPERIMENTAL_BONDING
3811 if (u & SRTGROUP_MASK)
3812 {
3813 CUDTUnited::GroupKeeper k (s_UDTUnited, u, s_UDTUnited.ERH_THROW);
3814 return k.group->send(buf, len, (w_m));
3815 }
3816 #endif
3817
3818 return s_UDTUnited.locateSocket(u, CUDTUnited::ERH_THROW)->core().sendmsg2(buf, len, (w_m));
3819 }
3820 catch (const CUDTException& e)
3821 {
3822 return APIError(e);
3823 }
3824 catch (bad_alloc&)
3825 {
3826 return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3827 }
3828 catch (const std::exception& ee)
3829 {
3830 LOGC(aclog.Fatal, log << "sendmsg: UNEXPECTED EXCEPTION: "
3831 << typeid(ee).name() << ": " << ee.what());
3832 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3833 }
3834 }
3835
recv(SRTSOCKET u,char * buf,int len,int)3836 int srt::CUDT::recv(SRTSOCKET u, char* buf, int len, int)
3837 {
3838 SRT_MSGCTRL mctrl = srt_msgctrl_default;
3839 int ret = recvmsg2(u, buf, len, (mctrl));
3840 return ret;
3841 }
3842
recvmsg(SRTSOCKET u,char * buf,int len,int64_t & srctime)3843 int srt::CUDT::recvmsg(SRTSOCKET u, char* buf, int len, int64_t& srctime)
3844 {
3845 SRT_MSGCTRL mctrl = srt_msgctrl_default;
3846 int ret = recvmsg2(u, buf, len, (mctrl));
3847 srctime = mctrl.srctime;
3848 return ret;
3849 }
3850
recvmsg2(SRTSOCKET u,char * buf,int len,SRT_MSGCTRL & w_m)3851 int srt::CUDT::recvmsg2(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL& w_m)
3852 {
3853 try
3854 {
3855 #if ENABLE_EXPERIMENTAL_BONDING
3856 if (u & SRTGROUP_MASK)
3857 {
3858 CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW);
3859 return k.group->recv(buf, len, (w_m));
3860 }
3861 #endif
3862
3863 return s_UDTUnited.locateSocket(u, CUDTUnited::ERH_THROW)->core().recvmsg2(buf, len, (w_m));
3864 }
3865 catch (const CUDTException& e)
3866 {
3867 return APIError(e);
3868 }
3869 catch (const std::exception& ee)
3870 {
3871 LOGC(aclog.Fatal, log << "recvmsg: UNEXPECTED EXCEPTION: "
3872 << typeid(ee).name() << ": " << ee.what());
3873 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3874 }
3875 }
3876
sendfile(SRTSOCKET u,fstream & ifs,int64_t & offset,int64_t size,int block)3877 int64_t srt::CUDT::sendfile(
3878 SRTSOCKET u, fstream& ifs, int64_t& offset, int64_t size, int block)
3879 {
3880 try
3881 {
3882 CUDT& udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->core();
3883 return udt.sendfile(ifs, offset, size, block);
3884 }
3885 catch (const CUDTException& e)
3886 {
3887 return APIError(e);
3888 }
3889 catch (bad_alloc&)
3890 {
3891 return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3892 }
3893 catch (const std::exception& ee)
3894 {
3895 LOGC(aclog.Fatal, log << "sendfile: UNEXPECTED EXCEPTION: "
3896 << typeid(ee).name() << ": " << ee.what());
3897 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3898 }
3899 }
3900
recvfile(SRTSOCKET u,fstream & ofs,int64_t & offset,int64_t size,int block)3901 int64_t srt::CUDT::recvfile(
3902 SRTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block)
3903 {
3904 try
3905 {
3906 return s_UDTUnited.locateSocket(u, CUDTUnited::ERH_THROW)->core().recvfile(ofs, offset, size, block);
3907 }
3908 catch (const CUDTException& e)
3909 {
3910 return APIError(e);
3911 }
3912 catch (const std::exception& ee)
3913 {
3914 LOGC(aclog.Fatal, log << "recvfile: UNEXPECTED EXCEPTION: "
3915 << typeid(ee).name() << ": " << ee.what());
3916 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3917 }
3918 }
3919
select(int,UDT::UDSET * readfds,UDT::UDSET * writefds,UDT::UDSET * exceptfds,const timeval * timeout)3920 int srt::CUDT::select(
3921 int,
3922 UDT::UDSET* readfds,
3923 UDT::UDSET* writefds,
3924 UDT::UDSET* exceptfds,
3925 const timeval* timeout)
3926 {
3927 if ((!readfds) && (!writefds) && (!exceptfds))
3928 {
3929 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3930 }
3931
3932 try
3933 {
3934 return s_UDTUnited.select(readfds, writefds, exceptfds, timeout);
3935 }
3936 catch (const CUDTException& e)
3937 {
3938 return APIError(e);
3939 }
3940 catch (bad_alloc&)
3941 {
3942 return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3943 }
3944 catch (const std::exception& ee)
3945 {
3946 LOGC(aclog.Fatal, log << "select: UNEXPECTED EXCEPTION: "
3947 << typeid(ee).name() << ": " << ee.what());
3948 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3949 }
3950 }
3951
selectEx(const vector<SRTSOCKET> & fds,vector<SRTSOCKET> * readfds,vector<SRTSOCKET> * writefds,vector<SRTSOCKET> * exceptfds,int64_t msTimeOut)3952 int srt::CUDT::selectEx(
3953 const vector<SRTSOCKET>& fds,
3954 vector<SRTSOCKET>* readfds,
3955 vector<SRTSOCKET>* writefds,
3956 vector<SRTSOCKET>* exceptfds,
3957 int64_t msTimeOut)
3958 {
3959 if ((!readfds) && (!writefds) && (!exceptfds))
3960 {
3961 return APIError(MJ_NOTSUP, MN_INVAL, 0);
3962 }
3963
3964 try
3965 {
3966 return s_UDTUnited.selectEx(fds, readfds, writefds, exceptfds, msTimeOut);
3967 }
3968 catch (const CUDTException& e)
3969 {
3970 return APIError(e);
3971 }
3972 catch (bad_alloc&)
3973 {
3974 return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3975 }
3976 catch (const std::exception& ee)
3977 {
3978 LOGC(aclog.Fatal, log << "selectEx: UNEXPECTED EXCEPTION: "
3979 << typeid(ee).name() << ": " << ee.what());
3980 return APIError(MJ_UNKNOWN);
3981 }
3982 }
3983
epoll_create()3984 int srt::CUDT::epoll_create()
3985 {
3986 try
3987 {
3988 return s_UDTUnited.epoll_create();
3989 }
3990 catch (const CUDTException& e)
3991 {
3992 return APIError(e);
3993 }
3994 catch (const std::exception& ee)
3995 {
3996 LOGC(aclog.Fatal, log << "epoll_create: UNEXPECTED EXCEPTION: "
3997 << typeid(ee).name() << ": " << ee.what());
3998 return APIError(MJ_UNKNOWN, MN_NONE, 0);
3999 }
4000 }
4001
epoll_clear_usocks(int eid)4002 int srt::CUDT::epoll_clear_usocks(int eid)
4003 {
4004 try
4005 {
4006 return s_UDTUnited.epoll_clear_usocks(eid);
4007 }
4008 catch (const CUDTException& e)
4009 {
4010 return APIError(e);
4011 }
4012 catch (std::exception& ee)
4013 {
4014 LOGC(aclog.Fatal, log << "epoll_clear_usocks: UNEXPECTED EXCEPTION: "
4015 << typeid(ee).name() << ": " << ee.what());
4016 return APIError(MJ_UNKNOWN, MN_NONE, 0);
4017 }
4018 }
4019
epoll_add_usock(const int eid,const SRTSOCKET u,const int * events)4020 int srt::CUDT::epoll_add_usock(const int eid, const SRTSOCKET u, const int* events)
4021 {
4022 try
4023 {
4024 return s_UDTUnited.epoll_add_usock(eid, u, events);
4025 }
4026 catch (const CUDTException& e)
4027 {
4028 return APIError(e);
4029 }
4030 catch (const std::exception& ee)
4031 {
4032 LOGC(aclog.Fatal, log << "epoll_add_usock: UNEXPECTED EXCEPTION: "
4033 << typeid(ee).name() << ": " << ee.what());
4034 return APIError(MJ_UNKNOWN, MN_NONE, 0);
4035 }
4036 }
4037
epoll_add_ssock(const int eid,const SYSSOCKET s,const int * events)4038 int srt::CUDT::epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events)
4039 {
4040 try
4041 {
4042 return s_UDTUnited.epoll_add_ssock(eid, s, events);
4043 }
4044 catch (const CUDTException& e)
4045 {
4046 return APIError(e);
4047 }
4048 catch (const std::exception& ee)
4049 {
4050 LOGC(aclog.Fatal, log << "epoll_add_ssock: UNEXPECTED EXCEPTION: "
4051 << typeid(ee).name() << ": " << ee.what());
4052 return APIError(MJ_UNKNOWN, MN_NONE, 0);
4053 }
4054 }
4055
epoll_update_usock(const int eid,const SRTSOCKET u,const int * events)4056 int srt::CUDT::epoll_update_usock(
4057 const int eid, const SRTSOCKET u, const int* events)
4058 {
4059 try
4060 {
4061 return s_UDTUnited.epoll_add_usock(eid, u, events);
4062 }
4063 catch (const CUDTException& e)
4064 {
4065 return APIError(e);
4066 }
4067 catch (const std::exception& ee)
4068 {
4069 LOGC(aclog.Fatal, log << "epoll_update_usock: UNEXPECTED EXCEPTION: "
4070 << typeid(ee).name() << ": " << ee.what());
4071 return APIError(MJ_UNKNOWN, MN_NONE, 0);
4072 }
4073 }
4074
epoll_update_ssock(const int eid,const SYSSOCKET s,const int * events)4075 int srt::CUDT::epoll_update_ssock(
4076 const int eid, const SYSSOCKET s, const int* events)
4077 {
4078 try
4079 {
4080 return s_UDTUnited.epoll_update_ssock(eid, s, events);
4081 }
4082 catch (const CUDTException& e)
4083 {
4084 return APIError(e);
4085 }
4086 catch (const std::exception& ee)
4087 {
4088 LOGC(aclog.Fatal, log << "epoll_update_ssock: UNEXPECTED EXCEPTION: "
4089 << typeid(ee).name() << ": " << ee.what());
4090 return APIError(MJ_UNKNOWN, MN_NONE, 0);
4091 }
4092 }
4093
4094
epoll_remove_usock(const int eid,const SRTSOCKET u)4095 int srt::CUDT::epoll_remove_usock(const int eid, const SRTSOCKET u)
4096 {
4097 try
4098 {
4099 return s_UDTUnited.epoll_remove_usock(eid, u);
4100 }
4101 catch (const CUDTException& e)
4102 {
4103 return APIError(e);
4104 }
4105 catch (const std::exception& ee)
4106 {
4107 LOGC(aclog.Fatal, log << "epoll_remove_usock: UNEXPECTED EXCEPTION: "
4108 << typeid(ee).name() << ": " << ee.what());
4109 return APIError(MJ_UNKNOWN, MN_NONE, 0);
4110 }
4111 }
4112
epoll_remove_ssock(const int eid,const SYSSOCKET s)4113 int srt::CUDT::epoll_remove_ssock(const int eid, const SYSSOCKET s)
4114 {
4115 try
4116 {
4117 return s_UDTUnited.epoll_remove_ssock(eid, s);
4118 }
4119 catch (const CUDTException& e)
4120 {
4121 return APIError(e);
4122 }
4123 catch (const std::exception& ee)
4124 {
4125 LOGC(aclog.Fatal, log << "epoll_remove_ssock: UNEXPECTED EXCEPTION: "
4126 << typeid(ee).name() << ": " << ee.what());
4127 return APIError(MJ_UNKNOWN, MN_NONE, 0);
4128 }
4129 }
4130
epoll_wait(const int eid,set<SRTSOCKET> * readfds,set<SRTSOCKET> * writefds,int64_t msTimeOut,set<SYSSOCKET> * lrfds,set<SYSSOCKET> * lwfds)4131 int srt::CUDT::epoll_wait(
4132 const int eid,
4133 set<SRTSOCKET>* readfds,
4134 set<SRTSOCKET>* writefds,
4135 int64_t msTimeOut,
4136 set<SYSSOCKET>* lrfds,
4137 set<SYSSOCKET>* lwfds)
4138 {
4139 try
4140 {
4141 return s_UDTUnited.epoll_ref().wait(
4142 eid, readfds, writefds, msTimeOut, lrfds, lwfds);
4143 }
4144 catch (const CUDTException& e)
4145 {
4146 return APIError(e);
4147 }
4148 catch (const std::exception& ee)
4149 {
4150 LOGC(aclog.Fatal, log << "epoll_wait: UNEXPECTED EXCEPTION: "
4151 << typeid(ee).name() << ": " << ee.what());
4152 return APIError(MJ_UNKNOWN, MN_NONE, 0);
4153 }
4154 }
4155
epoll_uwait(const int eid,SRT_EPOLL_EVENT * fdsSet,int fdsSize,int64_t msTimeOut)4156 int srt::CUDT::epoll_uwait(
4157 const int eid,
4158 SRT_EPOLL_EVENT* fdsSet,
4159 int fdsSize,
4160 int64_t msTimeOut)
4161 {
4162 try
4163 {
4164 return s_UDTUnited.epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
4165 }
4166 catch (const CUDTException& e)
4167 {
4168 return APIError(e);
4169 }
4170 catch (const std::exception& ee)
4171 {
4172 LOGC(aclog.Fatal, log << "epoll_uwait: UNEXPECTED EXCEPTION: "
4173 << typeid(ee).name() << ": " << ee.what());
4174 return APIError(MJ_UNKNOWN, MN_NONE, 0);
4175 }
4176 }
4177
epoll_set(const int eid,int32_t flags)4178 int32_t srt::CUDT::epoll_set(
4179 const int eid,
4180 int32_t flags)
4181 {
4182 try
4183 {
4184 return s_UDTUnited.epoll_set(eid, flags);
4185 }
4186 catch (const CUDTException& e)
4187 {
4188 return APIError(e);
4189 }
4190 catch (const std::exception& ee)
4191 {
4192 LOGC(aclog.Fatal, log << "epoll_set: UNEXPECTED EXCEPTION: "
4193 << typeid(ee).name() << ": " << ee.what());
4194 return APIError(MJ_UNKNOWN, MN_NONE, 0);
4195 }
4196 }
4197
epoll_release(const int eid)4198 int srt::CUDT::epoll_release(const int eid)
4199 {
4200 try
4201 {
4202 return s_UDTUnited.epoll_release(eid);
4203 }
4204 catch (const CUDTException& e)
4205 {
4206 return APIError(e);
4207 }
4208 catch (const std::exception& ee)
4209 {
4210 LOGC(aclog.Fatal, log << "epoll_release: UNEXPECTED EXCEPTION: "
4211 << typeid(ee).name() << ": " << ee.what());
4212 return APIError(MJ_UNKNOWN, MN_NONE, 0);
4213 }
4214 }
4215
getlasterror()4216 CUDTException& srt::CUDT::getlasterror()
4217 {
4218 return GetThreadLocalError();
4219 }
4220
bstats(SRTSOCKET u,CBytePerfMon * perf,bool clear,bool instantaneous)4221 int srt::CUDT::bstats(SRTSOCKET u, CBytePerfMon* perf, bool clear, bool instantaneous)
4222 {
4223 #if ENABLE_EXPERIMENTAL_BONDING
4224 if (u & SRTGROUP_MASK)
4225 return groupsockbstats(u, perf, clear);
4226 #endif
4227
4228 try
4229 {
4230 CUDT& udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->core();
4231 udt.bstats(perf, clear, instantaneous);
4232 return 0;
4233 }
4234 catch (const CUDTException& e)
4235 {
4236 return APIError(e);
4237 }
4238 catch (const std::exception& ee)
4239 {
4240 LOGC(aclog.Fatal, log << "bstats: UNEXPECTED EXCEPTION: "
4241 << typeid(ee).name() << ": " << ee.what());
4242 return APIError(MJ_UNKNOWN, MN_NONE, 0);
4243 }
4244 }
4245
4246 #if ENABLE_EXPERIMENTAL_BONDING
groupsockbstats(SRTSOCKET u,CBytePerfMon * perf,bool clear)4247 int srt::CUDT::groupsockbstats(SRTSOCKET u, CBytePerfMon* perf, bool clear)
4248 {
4249 try
4250 {
4251 CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW);
4252 k.group->bstatsSocket(perf, clear);
4253 return 0;
4254 }
4255 catch (const CUDTException& e)
4256 {
4257 SetThreadLocalError(e);
4258 return ERROR;
4259 }
4260 catch (const std::exception& ee)
4261 {
4262 LOGC(aclog.Fatal, log << "bstats: UNEXPECTED EXCEPTION: "
4263 << typeid(ee).name() << ": " << ee.what());
4264 SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
4265 return ERROR;
4266 }
4267 }
4268 #endif
4269
getUDTHandle(SRTSOCKET u)4270 srt::CUDT* srt::CUDT::getUDTHandle(SRTSOCKET u)
4271 {
4272 try
4273 {
4274 return &s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->core();
4275 }
4276 catch (const CUDTException& e)
4277 {
4278 SetThreadLocalError(e);
4279 return NULL;
4280 }
4281 catch (const std::exception& ee)
4282 {
4283 LOGC(aclog.Fatal, log << "getUDTHandle: UNEXPECTED EXCEPTION: "
4284 << typeid(ee).name() << ": " << ee.what());
4285 SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
4286 return NULL;
4287 }
4288 }
4289
existingSockets()4290 vector<SRTSOCKET> srt::CUDT::existingSockets()
4291 {
4292 vector<SRTSOCKET> out;
4293 for (CUDTUnited::sockets_t::iterator i = s_UDTUnited.m_Sockets.begin();
4294 i != s_UDTUnited.m_Sockets.end(); ++i)
4295 {
4296 out.push_back(i->first);
4297 }
4298 return out;
4299 }
4300
getsockstate(SRTSOCKET u)4301 SRT_SOCKSTATUS srt::CUDT::getsockstate(SRTSOCKET u)
4302 {
4303 try
4304 {
4305 #if ENABLE_EXPERIMENTAL_BONDING
4306 if (isgroup(u))
4307 {
4308 CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW);
4309 return k.group->getStatus();
4310 }
4311 #endif
4312 return s_UDTUnited.getStatus(u);
4313 }
4314 catch (const CUDTException& e)
4315 {
4316 SetThreadLocalError(e);
4317 return SRTS_NONEXIST;
4318 }
4319 catch (const std::exception& ee)
4320 {
4321 LOGC(aclog.Fatal, log << "getsockstate: UNEXPECTED EXCEPTION: "
4322 << typeid(ee).name() << ": " << ee.what());
4323 SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
4324 return SRTS_NONEXIST;
4325 }
4326 }
4327
4328 ////////////////////////////////////////////////////////////////////////////////
4329
4330 namespace UDT
4331 {
4332
startup()4333 int startup()
4334 {
4335 return srt::CUDT::startup();
4336 }
4337
cleanup()4338 int cleanup()
4339 {
4340 return srt::CUDT::cleanup();
4341 }
4342
bind(SRTSOCKET u,const struct sockaddr * name,int namelen)4343 int bind(SRTSOCKET u, const struct sockaddr* name, int namelen)
4344 {
4345 return srt::CUDT::bind(u, name, namelen);
4346 }
4347
bind2(SRTSOCKET u,UDPSOCKET udpsock)4348 int bind2(SRTSOCKET u, UDPSOCKET udpsock)
4349 {
4350 return srt::CUDT::bind(u, udpsock);
4351 }
4352
listen(SRTSOCKET u,int backlog)4353 int listen(SRTSOCKET u, int backlog)
4354 {
4355 return srt::CUDT::listen(u, backlog);
4356 }
4357
accept(SRTSOCKET u,struct sockaddr * addr,int * addrlen)4358 SRTSOCKET accept(SRTSOCKET u, struct sockaddr* addr, int* addrlen)
4359 {
4360 return srt::CUDT::accept(u, addr, addrlen);
4361 }
4362
connect(SRTSOCKET u,const struct sockaddr * name,int namelen)4363 int connect(SRTSOCKET u, const struct sockaddr* name, int namelen)
4364 {
4365 return srt::CUDT::connect(u, name, namelen, SRT_SEQNO_NONE);
4366 }
4367
close(SRTSOCKET u)4368 int close(SRTSOCKET u)
4369 {
4370 return srt::CUDT::close(u);
4371 }
4372
getpeername(SRTSOCKET u,struct sockaddr * name,int * namelen)4373 int getpeername(SRTSOCKET u, struct sockaddr* name, int* namelen)
4374 {
4375 return srt::CUDT::getpeername(u, name, namelen);
4376 }
4377
getsockname(SRTSOCKET u,struct sockaddr * name,int * namelen)4378 int getsockname(SRTSOCKET u, struct sockaddr* name, int* namelen)
4379 {
4380 return srt::CUDT::getsockname(u, name, namelen);
4381 }
4382
getsockopt(SRTSOCKET u,int level,SRT_SOCKOPT optname,void * optval,int * optlen)4383 int getsockopt(
4384 SRTSOCKET u, int level, SRT_SOCKOPT optname, void* optval, int* optlen)
4385 {
4386 return srt::CUDT::getsockopt(u, level, optname, optval, optlen);
4387 }
4388
setsockopt(SRTSOCKET u,int level,SRT_SOCKOPT optname,const void * optval,int optlen)4389 int setsockopt(
4390 SRTSOCKET u, int level, SRT_SOCKOPT optname, const void* optval, int optlen)
4391 {
4392 return srt::CUDT::setsockopt(u, level, optname, optval, optlen);
4393 }
4394
4395 // DEVELOPER API
4396
connect_debug(SRTSOCKET u,const struct sockaddr * name,int namelen,int32_t forced_isn)4397 int connect_debug(
4398 SRTSOCKET u, const struct sockaddr* name, int namelen, int32_t forced_isn)
4399 {
4400 return srt::CUDT::connect(u, name, namelen, forced_isn);
4401 }
4402
send(SRTSOCKET u,const char * buf,int len,int flags)4403 int send(SRTSOCKET u, const char* buf, int len, int flags)
4404 {
4405 return srt::CUDT::send(u, buf, len, flags);
4406 }
4407
recv(SRTSOCKET u,char * buf,int len,int flags)4408 int recv(SRTSOCKET u, char* buf, int len, int flags)
4409 {
4410 return srt::CUDT::recv(u, buf, len, flags);
4411 }
4412
4413
sendmsg(SRTSOCKET u,const char * buf,int len,int ttl,bool inorder,int64_t srctime)4414 int sendmsg(
4415 SRTSOCKET u, const char* buf, int len, int ttl, bool inorder,
4416 int64_t srctime)
4417 {
4418 return srt::CUDT::sendmsg(u, buf, len, ttl, inorder, srctime);
4419 }
4420
recvmsg(SRTSOCKET u,char * buf,int len,int64_t & srctime)4421 int recvmsg(SRTSOCKET u, char* buf, int len, int64_t& srctime)
4422 {
4423 return srt::CUDT::recvmsg(u, buf, len, srctime);
4424 }
4425
recvmsg(SRTSOCKET u,char * buf,int len)4426 int recvmsg(SRTSOCKET u, char* buf, int len)
4427 {
4428 int64_t srctime;
4429 return srt::CUDT::recvmsg(u, buf, len, srctime);
4430 }
4431
sendfile(SRTSOCKET u,fstream & ifs,int64_t & offset,int64_t size,int block)4432 int64_t sendfile(
4433 SRTSOCKET u,
4434 fstream& ifs,
4435 int64_t& offset,
4436 int64_t size,
4437 int block)
4438 {
4439 return srt::CUDT::sendfile(u, ifs, offset, size, block);
4440 }
4441
recvfile(SRTSOCKET u,fstream & ofs,int64_t & offset,int64_t size,int block)4442 int64_t recvfile(
4443 SRTSOCKET u,
4444 fstream& ofs,
4445 int64_t& offset,
4446 int64_t size,
4447 int block)
4448 {
4449 return srt::CUDT::recvfile(u, ofs, offset, size, block);
4450 }
4451
sendfile2(SRTSOCKET u,const char * path,int64_t * offset,int64_t size,int block)4452 int64_t sendfile2(
4453 SRTSOCKET u,
4454 const char* path,
4455 int64_t* offset,
4456 int64_t size,
4457 int block)
4458 {
4459 fstream ifs(path, ios::binary | ios::in);
4460 int64_t ret = srt::CUDT::sendfile(u, ifs, *offset, size, block);
4461 ifs.close();
4462 return ret;
4463 }
4464
recvfile2(SRTSOCKET u,const char * path,int64_t * offset,int64_t size,int block)4465 int64_t recvfile2(
4466 SRTSOCKET u,
4467 const char* path,
4468 int64_t* offset,
4469 int64_t size,
4470 int block)
4471 {
4472 fstream ofs(path, ios::binary | ios::out);
4473 int64_t ret = srt::CUDT::recvfile(u, ofs, *offset, size, block);
4474 ofs.close();
4475 return ret;
4476 }
4477
select(int nfds,UDSET * readfds,UDSET * writefds,UDSET * exceptfds,const struct timeval * timeout)4478 int select(
4479 int nfds,
4480 UDSET* readfds,
4481 UDSET* writefds,
4482 UDSET* exceptfds,
4483 const struct timeval* timeout)
4484 {
4485 return srt::CUDT::select(nfds, readfds, writefds, exceptfds, timeout);
4486 }
4487
selectEx(const vector<SRTSOCKET> & fds,vector<SRTSOCKET> * readfds,vector<SRTSOCKET> * writefds,vector<SRTSOCKET> * exceptfds,int64_t msTimeOut)4488 int selectEx(
4489 const vector<SRTSOCKET>& fds,
4490 vector<SRTSOCKET>* readfds,
4491 vector<SRTSOCKET>* writefds,
4492 vector<SRTSOCKET>* exceptfds,
4493 int64_t msTimeOut)
4494 {
4495 return srt::CUDT::selectEx(fds, readfds, writefds, exceptfds, msTimeOut);
4496 }
4497
epoll_create()4498 int epoll_create()
4499 {
4500 return srt::CUDT::epoll_create();
4501 }
4502
epoll_clear_usocks(int eid)4503 int epoll_clear_usocks(int eid)
4504 {
4505 return srt::CUDT::epoll_clear_usocks(eid);
4506 }
4507
epoll_add_usock(int eid,SRTSOCKET u,const int * events)4508 int epoll_add_usock(int eid, SRTSOCKET u, const int* events)
4509 {
4510 return srt::CUDT::epoll_add_usock(eid, u, events);
4511 }
4512
epoll_add_ssock(int eid,SYSSOCKET s,const int * events)4513 int epoll_add_ssock(int eid, SYSSOCKET s, const int* events)
4514 {
4515 return srt::CUDT::epoll_add_ssock(eid, s, events);
4516 }
4517
epoll_update_usock(int eid,SRTSOCKET u,const int * events)4518 int epoll_update_usock(int eid, SRTSOCKET u, const int* events)
4519 {
4520 return srt::CUDT::epoll_update_usock(eid, u, events);
4521 }
4522
epoll_update_ssock(int eid,SYSSOCKET s,const int * events)4523 int epoll_update_ssock(int eid, SYSSOCKET s, const int* events)
4524 {
4525 return srt::CUDT::epoll_update_ssock(eid, s, events);
4526 }
4527
epoll_remove_usock(int eid,SRTSOCKET u)4528 int epoll_remove_usock(int eid, SRTSOCKET u)
4529 {
4530 return srt::CUDT::epoll_remove_usock(eid, u);
4531 }
4532
epoll_remove_ssock(int eid,SYSSOCKET s)4533 int epoll_remove_ssock(int eid, SYSSOCKET s)
4534 {
4535 return srt::CUDT::epoll_remove_ssock(eid, s);
4536 }
4537
epoll_wait(int eid,set<SRTSOCKET> * readfds,set<SRTSOCKET> * writefds,int64_t msTimeOut,set<SYSSOCKET> * lrfds,set<SYSSOCKET> * lwfds)4538 int epoll_wait(
4539 int eid,
4540 set<SRTSOCKET>* readfds,
4541 set<SRTSOCKET>* writefds,
4542 int64_t msTimeOut,
4543 set<SYSSOCKET>* lrfds,
4544 set<SYSSOCKET>* lwfds)
4545 {
4546 return srt::CUDT::epoll_wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
4547 }
4548
4549 template <class SOCKTYPE>
set_result(set<SOCKTYPE> * val,int * num,SOCKTYPE * fds)4550 inline void set_result(set<SOCKTYPE>* val, int* num, SOCKTYPE* fds)
4551 {
4552 if ( !val || !num || !fds )
4553 return;
4554
4555 if (*num > int(val->size()))
4556 *num = int(val->size()); // will get 0 if val->empty()
4557 int count = 0;
4558
4559 // This loop will run 0 times if val->empty()
4560 for (typename set<SOCKTYPE>::const_iterator it = val->begin(); it != val->end(); ++ it)
4561 {
4562 if (count >= *num)
4563 break;
4564 fds[count ++] = *it;
4565 }
4566 }
4567
epoll_wait2(int eid,SRTSOCKET * readfds,int * rnum,SRTSOCKET * writefds,int * wnum,int64_t msTimeOut,SYSSOCKET * lrfds,int * lrnum,SYSSOCKET * lwfds,int * lwnum)4568 int epoll_wait2(
4569 int eid, SRTSOCKET* readfds,
4570 int* rnum, SRTSOCKET* writefds,
4571 int* wnum,
4572 int64_t msTimeOut,
4573 SYSSOCKET* lrfds,
4574 int* lrnum,
4575 SYSSOCKET* lwfds,
4576 int* lwnum)
4577 {
4578 // This API is an alternative format for epoll_wait, created for
4579 // compatability with other languages. Users need to pass in an array
4580 // for holding the returned sockets, with the maximum array length
4581 // stored in *rnum, etc., which will be updated with returned number
4582 // of sockets.
4583
4584 set<SRTSOCKET> readset;
4585 set<SRTSOCKET> writeset;
4586 set<SYSSOCKET> lrset;
4587 set<SYSSOCKET> lwset;
4588 set<SRTSOCKET>* rval = NULL;
4589 set<SRTSOCKET>* wval = NULL;
4590 set<SYSSOCKET>* lrval = NULL;
4591 set<SYSSOCKET>* lwval = NULL;
4592 if ((readfds != NULL) && (rnum != NULL))
4593 rval = &readset;
4594 if ((writefds != NULL) && (wnum != NULL))
4595 wval = &writeset;
4596 if ((lrfds != NULL) && (lrnum != NULL))
4597 lrval = &lrset;
4598 if ((lwfds != NULL) && (lwnum != NULL))
4599 lwval = &lwset;
4600
4601 int ret = srt::CUDT::epoll_wait(eid, rval, wval, msTimeOut, lrval, lwval);
4602 if (ret > 0)
4603 {
4604 //set<SRTSOCKET>::const_iterator i;
4605 //SET_RESULT(rval, rnum, readfds, i);
4606 set_result(rval, rnum, readfds);
4607 //SET_RESULT(wval, wnum, writefds, i);
4608 set_result(wval, wnum, writefds);
4609
4610 //set<SYSSOCKET>::const_iterator j;
4611 //SET_RESULT(lrval, lrnum, lrfds, j);
4612 set_result(lrval, lrnum, lrfds);
4613 //SET_RESULT(lwval, lwnum, lwfds, j);
4614 set_result(lwval, lwnum, lwfds);
4615 }
4616 return ret;
4617 }
4618
epoll_uwait(int eid,SRT_EPOLL_EVENT * fdsSet,int fdsSize,int64_t msTimeOut)4619 int epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
4620 {
4621 return srt::CUDT::epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
4622 }
4623
epoll_release(int eid)4624 int epoll_release(int eid)
4625 {
4626 return srt::CUDT::epoll_release(eid);
4627 }
4628
getlasterror()4629 ERRORINFO& getlasterror()
4630 {
4631 return srt::CUDT::getlasterror();
4632 }
4633
getlasterror_code()4634 int getlasterror_code()
4635 {
4636 return srt::CUDT::getlasterror().getErrorCode();
4637 }
4638
getlasterror_desc()4639 const char* getlasterror_desc()
4640 {
4641 return srt::CUDT::getlasterror().getErrorMessage();
4642 }
4643
getlasterror_errno()4644 int getlasterror_errno()
4645 {
4646 return srt::CUDT::getlasterror().getErrno();
4647 }
4648
4649 // Get error string of a given error code
geterror_desc(int code,int err)4650 const char* geterror_desc(int code, int err)
4651 {
4652 CUDTException e (CodeMajor(code/1000), CodeMinor(code%1000), err);
4653 return(e.getErrorMessage());
4654 }
4655
bstats(SRTSOCKET u,SRT_TRACEBSTATS * perf,bool clear)4656 int bstats(SRTSOCKET u, SRT_TRACEBSTATS* perf, bool clear)
4657 {
4658 return srt::CUDT::bstats(u, perf, clear);
4659 }
4660
getsockstate(SRTSOCKET u)4661 SRT_SOCKSTATUS getsockstate(SRTSOCKET u)
4662 {
4663 return srt::CUDT::getsockstate(u);
4664 }
4665
4666 } // namespace UDT
4667
4668 namespace srt
4669 {
4670
setloglevel(LogLevel::type ll)4671 void setloglevel(LogLevel::type ll)
4672 {
4673 ScopedLock gg(srt_logger_config.mutex);
4674 srt_logger_config.max_level = ll;
4675 }
4676
addlogfa(LogFA fa)4677 void addlogfa(LogFA fa)
4678 {
4679 ScopedLock gg(srt_logger_config.mutex);
4680 srt_logger_config.enabled_fa.set(fa, true);
4681 }
4682
dellogfa(LogFA fa)4683 void dellogfa(LogFA fa)
4684 {
4685 ScopedLock gg(srt_logger_config.mutex);
4686 srt_logger_config.enabled_fa.set(fa, false);
4687 }
4688
resetlogfa(set<LogFA> fas)4689 void resetlogfa(set<LogFA> fas)
4690 {
4691 ScopedLock gg(srt_logger_config.mutex);
4692 for (int i = 0; i <= SRT_LOGFA_LASTNONE; ++i)
4693 srt_logger_config.enabled_fa.set(i, fas.count(i));
4694 }
4695
resetlogfa(const int * fara,size_t fara_size)4696 void resetlogfa(const int* fara, size_t fara_size)
4697 {
4698 ScopedLock gg(srt_logger_config.mutex);
4699 srt_logger_config.enabled_fa.reset();
4700 for (const int* i = fara; i != fara + fara_size; ++i)
4701 srt_logger_config.enabled_fa.set(*i, true);
4702 }
4703
setlogstream(std::ostream & stream)4704 void setlogstream(std::ostream& stream)
4705 {
4706 ScopedLock gg(srt_logger_config.mutex);
4707 srt_logger_config.log_stream = &stream;
4708 }
4709
setloghandler(void * opaque,SRT_LOG_HANDLER_FN * handler)4710 void setloghandler(void* opaque, SRT_LOG_HANDLER_FN* handler)
4711 {
4712 ScopedLock gg(srt_logger_config.mutex);
4713 srt_logger_config.loghandler_opaque = opaque;
4714 srt_logger_config.loghandler_fn = handler;
4715 }
4716
setlogflags(int flags)4717 void setlogflags(int flags)
4718 {
4719 ScopedLock gg(srt_logger_config.mutex);
4720 srt_logger_config.flags = flags;
4721 }
4722
setstreamid(SRTSOCKET u,const std::string & sid)4723 SRT_API bool setstreamid(SRTSOCKET u, const std::string& sid)
4724 {
4725 return CUDT::setstreamid(u, sid);
4726 }
getstreamid(SRTSOCKET u)4727 SRT_API std::string getstreamid(SRTSOCKET u)
4728 {
4729 return CUDT::getstreamid(u);
4730 }
4731
getrejectreason(SRTSOCKET u)4732 int getrejectreason(SRTSOCKET u)
4733 {
4734 return CUDT::rejectReason(u);
4735 }
4736
setrejectreason(SRTSOCKET u,int value)4737 int setrejectreason(SRTSOCKET u, int value)
4738 {
4739 return CUDT::rejectReason(u, value);
4740 }
4741
4742 } // namespace srt
4743