1 /*
2  * SRT - Secure, Reliable, Transport
3  * Copyright (c) 2018 Haivision Systems Inc.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  *
9  */
10 
11 /*****************************************************************************
12 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
13 All rights reserved.
14 
15 Redistribution and use in source and binary forms, with or without
16 modification, are permitted provided that the following conditions are
17 met:
18 
19 * Redistributions of source code must retain the above
20   copyright notice, this list of conditions and the
21   following disclaimer.
22 
23 * Redistributions in binary form must reproduce the
24   above copyright notice, this list of conditions
25   and the following disclaimer in the documentation
26   and/or other materials provided with the distribution.
27 
28 * Neither the name of the University of Illinois
29   nor the names of its contributors may be used to
30   endorse or promote products derived from this
31   software without specific prior written permission.
32 
33 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
34 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
35 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
36 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
37 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
38 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
39 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
40 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
41 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
42 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
43 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
44 *****************************************************************************/
45 
46 /*****************************************************************************
47 written by
48    Yunhong Gu, last updated 07/09/2011
49 modified by
50    Haivision Systems Inc.
51 *****************************************************************************/
52 
53 #include "platform_sys.h"
54 
55 #include <exception>
56 #include <stdexcept>
57 #include <typeinfo>
58 #include <iterator>
59 #include <vector>
60 
61 #include <cstring>
62 #include "utilities.h"
63 #include "netinet_any.h"
64 #include "api.h"
65 #include "core.h"
66 #include "epoll.h"
67 #include "logging.h"
68 #include "threadname.h"
69 #include "srt.h"
70 #include "udt.h"
71 
72 #ifdef _WIN32
73    #include <win/wintime.h>
74 #endif
75 
76 #ifdef _MSC_VER
77    #pragma warning(error: 4530)
78 #endif
79 
80 using namespace std;
81 using namespace srt_logging;
82 using namespace srt::sync;
83 extern LogConfig srt_logger_config;
84 
85 
construct()86 void srt::CUDTSocket::construct()
87 {
88 #if ENABLE_EXPERIMENTAL_BONDING
89    m_GroupOf = NULL;
90    m_GroupMemberData = NULL;
91 #endif
92    setupMutex(m_AcceptLock, "Accept");
93    setupCond(m_AcceptCond, "Accept");
94    setupMutex(m_ControlLock, "Control");
95 }
96 
~CUDTSocket()97 srt::CUDTSocket::~CUDTSocket()
98 {
99    releaseMutex(m_AcceptLock);
100    releaseCond(m_AcceptCond);
101    releaseMutex(m_ControlLock);
102 }
103 
104 
getStatus()105 SRT_SOCKSTATUS srt::CUDTSocket::getStatus()
106 {
107     // TTL in CRendezvousQueue::updateConnStatus() will set m_bConnecting to false.
108     // Although m_Status is still SRTS_CONNECTING, the connection is in fact to be closed due to TTL expiry.
109     // In this case m_bConnected is also false. Both checks are required to avoid hitting
110     // a regular state transition from CONNECTING to CONNECTED.
111 
112     if (m_UDT.m_bBroken)
113         return SRTS_BROKEN;
114 
115     // Connecting timed out
116     if ((m_Status == SRTS_CONNECTING) && !m_UDT.m_bConnecting && !m_UDT.m_bConnected)
117         return SRTS_BROKEN;
118 
119     return m_Status;
120 }
121 
122 // [[using locked(m_GlobControlLock)]]
breakSocket_LOCKED()123 void srt::CUDTSocket::breakSocket_LOCKED()
124 {
125     // This function is intended to be called from GC,
126     // under a lock of m_GlobControlLock.
127     m_UDT.m_bBroken = true;
128     m_UDT.m_iBrokenCounter = 0;
129     HLOGC(smlog.Debug, log << "@" << m_SocketID << " CLOSING AS SOCKET");
130     m_UDT.closeInternal();
131     setClosed();
132 }
133 
setClosed()134 void srt::CUDTSocket::setClosed()
135 {
136     m_Status = SRTS_CLOSED;
137 
138     // a socket will not be immediately removed when it is closed
139     // in order to prevent other methods from accessing invalid address
140     // a timer is started and the socket will be removed after approximately
141     // 1 second
142     m_tsClosureTimeStamp = steady_clock::now();
143 }
144 
setBrokenClosed()145 void srt::CUDTSocket::setBrokenClosed()
146 {
147     m_UDT.m_iBrokenCounter = 60;
148     m_UDT.m_bBroken = true;
149     setClosed();
150 }
151 
readReady()152 bool srt::CUDTSocket::readReady()
153 {
154     if (m_UDT.m_bConnected && m_UDT.m_pRcvBuffer->isRcvDataReady())
155         return true;
156     if (m_UDT.m_bListening)
157         return !m_QueuedSockets.empty();
158 
159     return broken();
160 }
161 
writeReady() const162 bool srt::CUDTSocket::writeReady() const
163 {
164     return (m_UDT.m_bConnected
165                 && (m_UDT.m_pSndBuffer->getCurrBufSize() < m_UDT.m_config.iSndBufSize))
166         || broken();
167 }
168 
broken() const169 bool srt::CUDTSocket::broken() const
170 {
171     return m_UDT.m_bBroken || !m_UDT.m_bConnected;
172 }
173 
174 ////////////////////////////////////////////////////////////////////////////////
175 
CUDTUnited()176 srt::CUDTUnited::CUDTUnited():
177     m_Sockets(),
178     m_GlobControlLock(),
179     m_IDLock(),
180     m_mMultiplexer(),
181     m_MultiplexerLock(),
182     m_pCache(NULL),
183     m_bClosing(false),
184     m_GCStopCond(),
185     m_InitLock(),
186     m_iInstanceCount(0),
187     m_bGCStatus(false),
188     m_ClosedSockets()
189 {
190    // Socket ID MUST start from a random value
191    m_SocketIDGenerator = genRandomInt(1, MAX_SOCKET_VAL);
192    m_SocketIDGenerator_init = m_SocketIDGenerator;
193 
194    // XXX An unlikely exception thrown from the below calls
195    // might destroy the application before `main`. This shouldn't
196    // be a problem in general.
197    setupMutex(m_GlobControlLock, "GlobControl");
198    setupMutex(m_IDLock, "ID");
199    setupMutex(m_InitLock, "Init");
200 
201    m_pCache = new CCache<CInfoBlock>;
202 }
203 
~CUDTUnited()204 srt::CUDTUnited::~CUDTUnited()
205 {
206     // Call it if it wasn't called already.
207     // This will happen at the end of main() of the application,
208     // when the user didn't call srt_cleanup().
209     if (m_bGCStatus)
210     {
211         cleanup();
212     }
213 
214     releaseMutex(m_GlobControlLock);
215     releaseMutex(m_IDLock);
216     releaseMutex(m_InitLock);
217 
218     delete m_pCache;
219 }
220 
CONID(SRTSOCKET sock)221 string srt::CUDTUnited::CONID(SRTSOCKET sock)
222 {
223     if ( sock == 0 )
224         return "";
225 
226     std::ostringstream os;
227     os << "@" << sock << ":";
228     return os.str();
229 }
230 
startup()231 int srt::CUDTUnited::startup()
232 {
233    ScopedLock gcinit(m_InitLock);
234 
235    if (m_iInstanceCount++ > 0)
236        return 1;
237 
238    // Global initialization code
239 #ifdef _WIN32
240    WORD wVersionRequested;
241    WSADATA wsaData;
242    wVersionRequested = MAKEWORD(2, 2);
243 
244    if (0 != WSAStartup(wVersionRequested, &wsaData))
245       throw CUDTException(MJ_SETUP, MN_NONE,  WSAGetLastError());
246 #endif
247 
248    PacketFilter::globalInit();
249 
250    if (m_bGCStatus)
251       return 1;
252 
253    m_bClosing = false;
254 
255    try
256    {
257        setupMutex(m_GCStopLock, "GCStop");
258        setupCond(m_GCStopCond, "GCStop");
259    }
260    catch (...)
261    {
262        return -1;
263    }
264    if (!StartThread(m_GCThread, garbageCollect, this, "SRT:GC"))
265       return -1;
266 
267    m_bGCStatus = true;
268 
269    HLOGC(inlog.Debug, log << "SRT Clock Type: " << SRT_SYNC_CLOCK_STR);
270 
271    return 0;
272 }
273 
cleanup()274 int srt::CUDTUnited::cleanup()
275 {
276    // IMPORTANT!!!
277    // In this function there must be NO LOGGING AT ALL.  This function may
278    // potentially be called from within the global program destructor, and
279    // therefore some of the facilities used by the logging system - including
280    // the default std::cerr object bound to it by default, but also a different
281    // stream that the user's app has bound to it, and which got destroyed
282    // together with already exited main() - may be already deleted when
283    // executing this procedure.
284    ScopedLock gcinit(m_InitLock);
285 
286    if (--m_iInstanceCount > 0)
287        return 0;
288 
289    if (!m_bGCStatus)
290        return 0;
291 
292    m_bClosing = true;
293    // NOTE: we can do relaxed signaling here because
294    // waiting on m_GCStopCond has a 1-second timeout,
295    // after which the m_bClosing flag is cheched, which
296    // is set here above. Worst case secenario, this
297    // pthread_join() call will block for 1 second.
298    CSync::signal_relaxed(m_GCStopCond);
299    m_GCThread.join();
300 
301    // XXX There's some weird bug here causing this
302    // to hangup on Windows. This might be either something
303    // bigger, or some problem in pthread-win32. As this is
304    // the application cleanup section, this can be temporarily
305    // tolerated with simply exit the application without cleanup,
306    // counting on that the system will take care of it anyway.
307 #ifndef _WIN32
308    releaseCond(m_GCStopCond);
309 #endif
310 
311    m_bGCStatus = false;
312 
313    // Global destruction code
314 #ifdef _WIN32
315    WSACleanup();
316 #endif
317 
318    return 0;
319 }
320 
generateSocketID(bool for_group)321 SRTSOCKET srt::CUDTUnited::generateSocketID(bool for_group)
322 {
323     ScopedLock guard(m_IDLock);
324 
325     int sockval = m_SocketIDGenerator - 1;
326 
327     // First problem: zero-value should be avoided by various reasons.
328 
329     if (sockval <= 0)
330     {
331         // We have a rollover on the socket value, so
332         // definitely we haven't made the Columbus mistake yet.
333         m_SocketIDGenerator = MAX_SOCKET_VAL;
334     }
335 
336     // Check all sockets if any of them has this value.
337     // Socket IDs are begin created this way:
338     //
339     //                              Initial random
340     //                              |
341     //                             |
342     //                            |
343     //                           |
344     // ...
345     // The only problem might be if the number rolls over
346     // and reaches the same value from the opposite side.
347     // This is still a valid socket value, but this time
348     // we have to check, which sockets have been used already.
349     if ( sockval == m_SocketIDGenerator_init )
350     {
351         // Mark that since this point on the checks for
352         // whether the socket ID is in use must be done.
353         m_SocketIDGenerator_init = 0;
354     }
355 
356     // This is when all socket numbers have been already used once.
357     // This may happen after many years of running an application
358     // constantly when the connection breaks and gets restored often.
359     if ( m_SocketIDGenerator_init == 0 )
360     {
361         int startval = sockval;
362         for (;;) // Roll until an unused value is found
363         {
364             enterCS(m_GlobControlLock);
365             const bool exists =
366 #if ENABLE_EXPERIMENTAL_BONDING
367                 for_group
368                 ? m_Groups.count(sockval | SRTGROUP_MASK)
369                 :
370 #endif
371                 m_Sockets.count(sockval);
372             leaveCS(m_GlobControlLock);
373 
374             if (exists)
375             {
376                 // The socket value is in use.
377                 --sockval;
378                 if (sockval <= 0)
379                     sockval = MAX_SOCKET_VAL;
380 
381                 // Before continuing, check if we haven't rolled back to start again
382                 // This is virtually impossible, so just make an RTI error.
383                 if (sockval == startval)
384                 {
385                     // Of course, we don't lack memory, but actually this is so impossible
386                     // that a complete memory extinction is much more possible than this.
387                     // So treat this rather as a formal fallback for something that "should
388                     // never happen". This should make the socket creation functions, from
389                     // socket_create and accept, return this error.
390 
391                     m_SocketIDGenerator = sockval+1; // so that any next call will cause the same error
392                     throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
393                 }
394 
395                 // try again, if this is a free socket
396                 continue;
397             }
398 
399             // No socket found, this ID is free to use
400             m_SocketIDGenerator = sockval;
401             break;
402         }
403     }
404     else
405     {
406         m_SocketIDGenerator = sockval;
407     }
408 
409     // The socket value counter remains with the value rolled
410     // without the group bit set; only the returned value may have
411     // the group bit set.
412 
413     if (for_group)
414         sockval = m_SocketIDGenerator | SRTGROUP_MASK;
415     else
416         sockval = m_SocketIDGenerator;
417 
418     LOGC(smlog.Debug, log << "generateSocketID: " << (for_group ? "(group)" : "") << ": @" << sockval);
419 
420     return sockval;
421 }
422 
newSocket(CUDTSocket ** pps)423 SRTSOCKET srt::CUDTUnited::newSocket(CUDTSocket** pps)
424 {
425    // XXX consider using some replacement of std::unique_ptr
426    // so that exceptions will clean up the object without the
427    // need for a dedicated code.
428    CUDTSocket* ns = NULL;
429 
430    try
431    {
432       ns = new CUDTSocket;
433    }
434    catch (...)
435    {
436       delete ns;
437       throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
438    }
439 
440    try
441    {
442       ns->m_SocketID = generateSocketID();
443    }
444    catch (...)
445    {
446        delete ns;
447        throw;
448    }
449    ns->m_Status = SRTS_INIT;
450    ns->m_ListenSocket = 0;
451    ns->core().m_SocketID = ns->m_SocketID;
452    ns->core().m_pCache = m_pCache;
453 
454    try
455    {
456       HLOGC(smlog.Debug, log << CONID(ns->m_SocketID)
457          << "newSocket: mapping socket "
458          << ns->m_SocketID);
459 
460       // protect the m_Sockets structure.
461       ScopedLock cs(m_GlobControlLock);
462       m_Sockets[ns->m_SocketID] = ns;
463    }
464    catch (...)
465    {
466       //failure and rollback
467       delete ns;
468       ns = NULL;
469    }
470 
471    if (!ns)
472       throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
473 
474     if (pps)
475         *pps = ns;
476 
477    return ns->m_SocketID;
478 }
479 
newConnection(const SRTSOCKET listen,const sockaddr_any & peer,const CPacket & hspkt,CHandShake & w_hs,int & w_error,CUDT * & w_acpu)480 int srt::CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer, const CPacket& hspkt,
481         CHandShake& w_hs, int& w_error, CUDT*& w_acpu)
482 {
483    CUDTSocket* ns = NULL;
484    w_acpu = NULL;
485 
486    w_error = SRT_REJ_IPE;
487 
488    // Can't manage this error through an exception because this is
489    // running in the listener loop.
490    CUDTSocket* ls = locateSocket(listen);
491    if (!ls)
492    {
493        LOGC(cnlog.Error, log << "IPE: newConnection by listener socket id=" << listen << " which DOES NOT EXIST.");
494        return -1;
495    }
496 
497    HLOGC(cnlog.Debug, log << "newConnection: creating new socket after listener @"
498          << listen << " contacted with backlog=" << ls->m_uiBackLog);
499 
500    // if this connection has already been processed
501    if ((ns = locatePeer(peer, w_hs.m_iID, w_hs.m_iISN)) != NULL)
502    {
503       if (ns->core().m_bBroken)
504       {
505          // last connection from the "peer" address has been broken
506          ns->setClosed();
507 
508          ScopedLock acceptcg(ls->m_AcceptLock);
509          ls->m_QueuedSockets.erase(ns->m_SocketID);
510       }
511       else
512       {
513          // connection already exist, this is a repeated connection request
514          // respond with existing HS information
515          HLOGC(cnlog.Debug, log
516                << "newConnection: located a WORKING peer @"
517                << w_hs.m_iID << " - ADAPTING.");
518 
519          w_hs.m_iISN = ns->core().m_iISN;
520          w_hs.m_iMSS = ns->core().MSS();
521          w_hs.m_iFlightFlagSize = ns->core().m_config.iFlightFlagSize;
522          w_hs.m_iReqType = URQ_CONCLUSION;
523          w_hs.m_iID = ns->m_SocketID;
524 
525          // Report the original UDT because it will be
526          // required to complete the HS data for conclusion response.
527          w_acpu = &ns->core();
528 
529          return 0;
530 
531          //except for this situation a new connection should be started
532       }
533    }
534    else
535    {
536        HLOGC(cnlog.Debug, log << "newConnection: NOT located any peer @"
537                << w_hs.m_iID << " - resuming with initial connection.");
538    }
539 
540    // exceeding backlog, refuse the connection request
541    if (ls->m_QueuedSockets.size() >= ls->m_uiBackLog)
542    {
543        w_error = SRT_REJ_BACKLOG;
544        LOGC(cnlog.Note, log << "newConnection: listen backlog=" << ls->m_uiBackLog << " EXCEEDED");
545        return -1;
546    }
547 
548    try
549    {
550       ns = new CUDTSocket(*ls);
551       // No need to check the peer, this is the address from which the request has come.
552       ns->m_PeerAddr = peer;
553    }
554    catch (...)
555    {
556       w_error = SRT_REJ_RESOURCE;
557       delete ns;
558       LOGC(cnlog.Error, log << "IPE: newConnection: unexpected exception (probably std::bad_alloc)");
559       return -1;
560    }
561 
562    ns->core().m_RejectReason = SRT_REJ_UNKNOWN; // pre-set a universal value
563 
564    try
565    {
566        ns->m_SocketID = generateSocketID();
567    }
568    catch (const CUDTException&)
569    {
570        LOGF(cnlog.Fatal, "newConnection: IPE: all sockets occupied? Last gen=%d", m_SocketIDGenerator);
571        // generateSocketID throws exception, which can be naturally handled
572        // when the call is derived from the API call, but here it's called
573        // internally in response to receiving a handshake. It must be handled
574        // here and turned into an erroneous return value.
575        delete ns;
576        return -1;
577    }
578 
579    ns->m_ListenSocket = listen;
580    ns->core().m_SocketID = ns->m_SocketID;
581    ns->m_PeerID = w_hs.m_iID;
582    ns->m_iISN = w_hs.m_iISN;
583 
584    HLOGC(cnlog.Debug, log << "newConnection: DATA: lsnid=" << listen
585             << " id=" << ns->core().m_SocketID
586             << " peerid=" << ns->core().m_PeerID
587             << " ISN=" << ns->m_iISN);
588 
589    int error = 0;
590    bool should_submit_to_accept = true;
591 
592    // Set the error code for all prospective problems below.
593    // It won't be interpreted when result was successful.
594    w_error = SRT_REJ_RESOURCE;
595 
596    // These can throw exception only when the memory allocation failed.
597    // CUDT::connect() translates exception into CUDTException.
598    // CUDT::open() may only throw original std::bad_alloc from new.
599    // This is only to make the library extra safe (when your machine lacks
600    // memory, it will continue to work, but fail to accept connection).
601 
602    try
603    {
604        // This assignment must happen b4 the call to CUDT::connect() because
605        // this call causes sending the SRT Handshake through this socket.
606        // Without this mapping the socket cannot be found and therefore
607        // the SRT Handshake message would fail.
608        HLOGF(cnlog.Debug,
609                "newConnection: incoming %s, mapping socket %d",
610                peer.str().c_str(), ns->m_SocketID);
611        {
612            ScopedLock cg(m_GlobControlLock);
613            m_Sockets[ns->m_SocketID] = ns;
614        }
615 
616        if (ls->core().m_cbAcceptHook)
617        {
618            if (!ls->core().runAcceptHook(&ns->core(), peer.get(), w_hs, hspkt))
619            {
620                w_error = ns->core().m_RejectReason;
621 
622                error = 1;
623                goto ERR_ROLLBACK;
624            }
625        }
626 
627        // bind to the same addr of listening socket
628        ns->core().open();
629        updateListenerMux(ns, ls);
630 
631        ns->core().acceptAndRespond(ls->m_SelfAddr, peer, hspkt, (w_hs));
632    }
633    catch (...)
634    {
635        // Extract the error that was set in this new failed entity.
636        w_error = ns->core().m_RejectReason;
637        error = 1;
638        goto ERR_ROLLBACK;
639    }
640 
641    ns->m_Status = SRTS_CONNECTED;
642 
643    // copy address information of local node
644    // Precisely, what happens here is:
645    // - Get the IP address and port from the system database
646    ns->core().m_pSndQueue->m_pChannel->getSockAddr((ns->m_SelfAddr));
647    // - OVERWRITE just the IP address itself by a value taken from piSelfIP
648    // (the family is used exactly as the one taken from what has been returned
649    // by getsockaddr)
650    CIPAddress::pton((ns->m_SelfAddr), ns->core().m_piSelfIP, peer);
651 
652    {
653        // protect the m_PeerRec structure (and group existence)
654        ScopedLock glock (m_GlobControlLock);
655        try
656        {
657            HLOGF(cnlog.Debug,
658                    "newConnection: mapping peer %d to that socket (%d)\n",
659                    ns->m_PeerID, ns->m_SocketID);
660            m_PeerRec[ns->getPeerSpec()].insert(ns->m_SocketID);
661        }
662        catch (...)
663        {
664            LOGC(cnlog.Error, log << "newConnection: error when mapping peer!");
665            error = 2;
666        }
667 
668        // The access to m_GroupOf should be also protected, as the group
669        // could be requested deletion in the meantime. This will hold any possible
670        // removal from group and resetting m_GroupOf field.
671 
672 #if ENABLE_EXPERIMENTAL_BONDING
673        if (ns->m_GroupOf)
674        {
675            // XXX this might require another check of group type.
676            // For redundancy group, at least, update the status in the group
677            CUDTGroup* g = ns->m_GroupOf;
678            ScopedLock glock (g->m_GroupLock);
679            if (g->m_bClosing)
680            {
681                error = 1; // "INTERNAL REJECTION"
682                goto ERR_ROLLBACK;
683            }
684 
685            // Check if this is the first socket in the group.
686            // If so, give it up to accept, otherwise just do nothing
687            // The client will be informed about the newly added connection at the
688            // first moment when attempting to get the group status.
689            for (CUDTGroup::gli_t gi = g->m_Group.begin(); gi != g->m_Group.end(); ++gi)
690            {
691                if (gi->laststatus == SRTS_CONNECTED)
692                {
693                    HLOGC(cnlog.Debug, log << "Found another connected socket in the group: $"
694                            << gi->id << " - socket will be NOT given up for accepting");
695                    should_submit_to_accept = false;
696                    break;
697                }
698            }
699 
700            // Update the status in the group so that the next
701            // operation can include the socket in the group operation.
702            CUDTGroup::SocketData* gm = ns->m_GroupMemberData;
703 
704            HLOGC(cnlog.Debug, log << "newConnection(GROUP): Socket @" << ns->m_SocketID << " BELONGS TO $" << g->id()
705                    << " - will " << (should_submit_to_accept? "" : "NOT ") << "report in accept");
706            gm->sndstate = SRT_GST_IDLE;
707            gm->rcvstate = SRT_GST_IDLE;
708            gm->laststatus = SRTS_CONNECTED;
709 
710            if (!g->m_bConnected)
711            {
712                HLOGC(cnlog.Debug, log << "newConnection(GROUP): First socket connected, SETTING GROUP CONNECTED");
713                g->m_bConnected = true;
714            }
715 
716            // XXX PROLBEM!!! These events are subscribed here so that this is done once, lazily,
717            // but groupwise connections could be accepted from multiple listeners for the same group!
718            // m_listener MUST BE A CONTAINER, NOT POINTER!!!
719            // ALSO: Maybe checking "the same listener" is not necessary as subscruption may be done
720            // multiple times anyway?
721            if (!g->m_listener)
722            {
723                // Newly created group from the listener, which hasn't yet
724                // the listener set.
725                g->m_listener = ls;
726 
727                // Listen on both first connected socket and continued sockets.
728                // This might help with jump-over situations, and in regular continued
729                // sockets the IN event won't be reported anyway.
730                int listener_modes = SRT_EPOLL_ACCEPT | SRT_EPOLL_UPDATE;
731                epoll_add_usock_INTERNAL(g->m_RcvEID, ls, &listener_modes);
732 
733                // This listening should be done always when a first connected socket
734                // appears as accepted off the listener. This is for the sake of swait() calls
735                // inside the group receiving and sending functions so that they get
736                // interrupted when a new socket is connected.
737            }
738 
739            // Add also per-direction subscription for the about-to-be-accepted socket.
740            // Both first accepted socket that makes the group-accept and every next
741            // socket that adds a new link.
742            int read_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
743            int write_modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
744            epoll_add_usock_INTERNAL(g->m_RcvEID, ns, &read_modes);
745            epoll_add_usock_INTERNAL(g->m_SndEID, ns, &write_modes);
746 
747            // With app reader, do not set groupPacketArrival (block the
748            // provider array feature completely for now).
749 
750 
751            /* SETUP HERE IF NEEDED
752               ns->core().m_cbPacketArrival.set(ns->m_pUDT, &CUDT::groupPacketArrival);
753             */
754        }
755        else
756        {
757            HLOGC(cnlog.Debug, log << "newConnection: Socket @" << ns->m_SocketID << " is not in a group");
758        }
759 #endif
760    }
761 
762    if (should_submit_to_accept)
763    {
764       enterCS(ls->m_AcceptLock);
765       try
766       {
767          ls->m_QueuedSockets.insert(ns->m_SocketID);
768       }
769       catch (...)
770       {
771          LOGC(cnlog.Error, log << "newConnection: error when queuing socket!");
772          error = 3;
773       }
774       leaveCS(ls->m_AcceptLock);
775 
776       HLOGC(cnlog.Debug, log << "ACCEPT: new socket @" << ns->m_SocketID << " submitted for acceptance");
777       // acknowledge users waiting for new connections on the listening socket
778       m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_ACCEPT, true);
779 
780       CGlobEvent::triggerEvent();
781 
782       // XXX the exact value of 'error' is ignored
783       if (error > 0)
784       {
785          goto ERR_ROLLBACK;
786       }
787 
788       // wake up a waiting accept() call
789       CSync::lock_signal(ls->m_AcceptCond, ls->m_AcceptLock);
790    }
791    else
792    {
793       HLOGC(cnlog.Debug, log << "ACCEPT: new socket @" << ns->m_SocketID
794             << " NOT submitted to acceptance, another socket in the group is already connected");
795 
796       // acknowledge INTERNAL users waiting for new connections on the listening socket
797       // that are reported when a new socket is connected within an already connected group.
798       m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_UPDATE, true);
799       CGlobEvent::triggerEvent();
800    }
801 
802 ERR_ROLLBACK:
803    // XXX the exact value of 'error' is ignored
804    if (error > 0)
805    {
806 #if ENABLE_LOGGING
807        static const char* why [] = {
808            "UNKNOWN ERROR",
809            "INTERNAL REJECTION",
810            "IPE when mapping a socket",
811            "IPE when inserting a socket"
812        };
813        LOGC(cnlog.Warn, log << CONID(ns->m_SocketID) << "newConnection: connection rejected due to: "
814                << why[error] << " - " << RequestTypeStr(URQFailure(w_error)));
815 #endif
816 
817       SRTSOCKET id = ns->m_SocketID;
818       ns->core().closeInternal();
819       ns->setClosed();
820 
821       // The mapped socket should be now unmapped to preserve the situation that
822       // was in the original UDT code.
823       // In SRT additionally the acceptAndRespond() function (it was called probably
824       // connect() in UDT code) may fail, in which case this socket should not be
825       // further processed and should be removed.
826       {
827           ScopedLock cg(m_GlobControlLock);
828 
829 #if ENABLE_EXPERIMENTAL_BONDING
830           if (ns->m_GroupOf)
831           {
832               HLOGC(smlog.Debug, log << "@" << ns->m_SocketID << " IS MEMBER OF $" << ns->m_GroupOf->id() << " - REMOVING FROM GROUP");
833               ns->removeFromGroup(true);
834           }
835 #endif
836           m_Sockets.erase(id);
837           m_ClosedSockets[id] = ns;
838       }
839 
840       return -1;
841    }
842 
843    return 1;
844 }
845 
846 // static forwarder
installAcceptHook(SRTSOCKET lsn,srt_listen_callback_fn * hook,void * opaq)847 int srt::CUDT::installAcceptHook(SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq)
848 {
849     return s_UDTUnited.installAcceptHook(lsn, hook, opaq);
850 }
851 
installAcceptHook(const SRTSOCKET lsn,srt_listen_callback_fn * hook,void * opaq)852 int srt::CUDTUnited::installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq)
853 {
854     try
855     {
856         CUDTSocket* s = locateSocket(lsn, ERH_THROW);
857         s->core().installAcceptHook(hook, opaq);
858     }
859     catch (CUDTException& e)
860     {
861         SetThreadLocalError(e);
862         return SRT_ERROR;
863     }
864 
865     return 0;
866 }
867 
installConnectHook(SRTSOCKET lsn,srt_connect_callback_fn * hook,void * opaq)868 int srt::CUDT::installConnectHook(SRTSOCKET lsn, srt_connect_callback_fn* hook, void* opaq)
869 {
870     return s_UDTUnited.installConnectHook(lsn, hook, opaq);
871 }
872 
installConnectHook(const SRTSOCKET u,srt_connect_callback_fn * hook,void * opaq)873 int srt::CUDTUnited::installConnectHook(const SRTSOCKET u, srt_connect_callback_fn* hook, void* opaq)
874 {
875     try
876     {
877 #if ENABLE_EXPERIMENTAL_BONDING
878         if (u & SRTGROUP_MASK)
879         {
880             GroupKeeper k (*this, u, ERH_THROW);
881             k.group->installConnectHook(hook, opaq);
882             return 0;
883         }
884 #endif
885         CUDTSocket* s = locateSocket(u, ERH_THROW);
886         s->core().installConnectHook(hook, opaq);
887     }
888     catch (CUDTException& e)
889     {
890         SetThreadLocalError(e);
891         return SRT_ERROR;
892     }
893 
894     return 0;
895 }
896 
getStatus(const SRTSOCKET u)897 SRT_SOCKSTATUS srt::CUDTUnited::getStatus(const SRTSOCKET u)
898 {
899     // protects the m_Sockets structure
900     ScopedLock cg(m_GlobControlLock);
901 
902     sockets_t::const_iterator i = m_Sockets.find(u);
903 
904     if (i == m_Sockets.end())
905     {
906         if (m_ClosedSockets.find(u) != m_ClosedSockets.end())
907             return SRTS_CLOSED;
908 
909         return SRTS_NONEXIST;
910     }
911     return i->second->getStatus();
912 }
913 
bind(CUDTSocket * s,const sockaddr_any & name)914 int srt::CUDTUnited::bind(CUDTSocket* s, const sockaddr_any& name)
915 {
916    ScopedLock cg(s->m_ControlLock);
917 
918    // cannot bind a socket more than once
919    if (s->m_Status != SRTS_INIT)
920       throw CUDTException(MJ_NOTSUP, MN_NONE, 0);
921 
922    s->core().open();
923    updateMux(s, name);
924    s->m_Status = SRTS_OPENED;
925 
926    // copy address information of local node
927    s->core().m_pSndQueue->m_pChannel->getSockAddr((s->m_SelfAddr));
928 
929    return 0;
930 }
931 
bind(CUDTSocket * s,UDPSOCKET udpsock)932 int srt::CUDTUnited::bind(CUDTSocket* s, UDPSOCKET udpsock)
933 {
934    ScopedLock cg(s->m_ControlLock);
935 
936    // cannot bind a socket more than once
937    if (s->m_Status != SRTS_INIT)
938       throw CUDTException(MJ_NOTSUP, MN_NONE, 0);
939 
940    sockaddr_any name;
941    socklen_t namelen = sizeof name; // max of inet and inet6
942 
943    // This will preset the sa_family as well; the namelen is given simply large
944    // enough for any family here.
945    if (::getsockname(udpsock, &name.sa, &namelen) == -1)
946       throw CUDTException(MJ_NOTSUP, MN_INVAL);
947 
948    // Successfully extracted, so update the size
949    name.len = namelen;
950 
951    s->core().open();
952    updateMux(s, name, &udpsock);
953    s->m_Status = SRTS_OPENED;
954 
955    // copy address information of local node
956    s->core().m_pSndQueue->m_pChannel->getSockAddr(s->m_SelfAddr);
957 
958    return 0;
959 }
960 
listen(const SRTSOCKET u,int backlog)961 int srt::CUDTUnited::listen(const SRTSOCKET u, int backlog)
962 {
963    if (backlog <= 0)
964       throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
965 
966    // Don't search for the socket if it's already -1;
967    // this never is a valid socket.
968    if (u == UDT::INVALID_SOCK)
969       throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
970 
971    CUDTSocket* s = locateSocket(u);
972    if (!s)
973       throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
974 
975    ScopedLock cg(s->m_ControlLock);
976 
977    // NOTE: since now the socket is protected against simultaneous access.
978    // In the meantime the socket might have been closed, which means that
979    // it could have changed the state. It could be also set listen in another
980    // thread, so check it out.
981 
982    // do nothing if the socket is already listening
983    if (s->m_Status == SRTS_LISTENING)
984       return 0;
985 
986    // a socket can listen only if is in OPENED status
987    if (s->m_Status != SRTS_OPENED)
988       throw CUDTException(MJ_NOTSUP, MN_ISUNBOUND, 0);
989 
990    // [[using assert(s->m_Status == OPENED)]];
991 
992    // listen is not supported in rendezvous connection setup
993    if (s->core().m_config.bRendezvous)
994       throw CUDTException(MJ_NOTSUP, MN_ISRENDEZVOUS, 0);
995 
996    s->m_uiBackLog = backlog;
997 
998    // [[using assert(s->m_Status == OPENED)]]; // (still, unchanged)
999 
1000    s->core().setListenState();  // propagates CUDTException,
1001                                  // if thrown, remains in OPENED state if so.
1002    s->m_Status = SRTS_LISTENING;
1003 
1004    return 0;
1005 }
1006 
accept_bond(const SRTSOCKET listeners[],int lsize,int64_t msTimeOut)1007 SRTSOCKET srt::CUDTUnited::accept_bond(const SRTSOCKET listeners [], int lsize, int64_t msTimeOut)
1008 {
1009     CEPollDesc* ed = 0;
1010     int eid = m_EPoll.create(&ed);
1011 
1012     // Destroy it at return - this function can be interrupted
1013     // by an exception.
1014     struct AtReturn
1015     {
1016         int eid;
1017         CUDTUnited* that;
1018         AtReturn(CUDTUnited* t, int e): eid(e), that(t) {}
1019         ~AtReturn()
1020         {
1021             that->m_EPoll.release(eid);
1022         }
1023     } l_ar(this, eid);
1024 
1025     // Subscribe all of listeners for accept
1026     int events = SRT_EPOLL_ACCEPT;
1027 
1028     for (int i = 0; i < lsize; ++i)
1029     {
1030         srt_epoll_add_usock(eid, listeners[i], &events);
1031     }
1032 
1033     CEPoll::fmap_t st;
1034     m_EPoll.swait(*ed, (st), msTimeOut, true);
1035 
1036     if (st.empty())
1037     {
1038         // Sanity check
1039         throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
1040     }
1041 
1042     // Theoretically we can have a situation that more than one
1043     // listener is ready for accept. In this case simply get
1044     // only the first found.
1045     int lsn = st.begin()->first;
1046     sockaddr_storage dummy;
1047     int outlen = sizeof dummy;
1048     return accept(lsn, ((sockaddr*)&dummy), (&outlen));
1049 }
1050 
accept(const SRTSOCKET listen,sockaddr * pw_addr,int * pw_addrlen)1051 SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_addrlen)
1052 {
1053    if (pw_addr && !pw_addrlen)
1054    {
1055       LOGC(cnlog.Error, log << "srt_accept: provided address, but address length parameter is missing");
1056       throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
1057    }
1058 
1059    CUDTSocket* ls = locateSocket(listen);
1060 
1061    if (ls == NULL)
1062    {
1063       LOGC(cnlog.Error, log << "srt_accept: invalid listener socket ID value: " << listen);
1064       throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
1065    }
1066 
1067    // the "listen" socket must be in LISTENING status
1068    if (ls->m_Status != SRTS_LISTENING)
1069    {
1070       LOGC(cnlog.Error, log << "srt_accept: socket @" << listen << " is not in listening state (forgot srt_listen?)");
1071       throw CUDTException(MJ_NOTSUP, MN_NOLISTEN, 0);
1072    }
1073 
1074    // no "accept" in rendezvous connection setup
1075    if (ls->core().m_config.bRendezvous)
1076    {
1077        LOGC(cnlog.Fatal, log << "CUDTUnited::accept: RENDEZVOUS flag passed through check in srt_listen when it set listen state");
1078        // This problem should never happen because `srt_listen` function should have
1079        // checked this situation before and not set listen state in result.
1080        // Inform the user about the invalid state in the universal way.
1081        throw CUDTException(MJ_NOTSUP, MN_NOLISTEN, 0);
1082    }
1083 
1084    SRTSOCKET u = CUDT::INVALID_SOCK;
1085    bool accepted = false;
1086 
1087    // !!only one conection can be set up each time!!
1088    while (!accepted)
1089    {
1090        UniqueLock accept_lock(ls->m_AcceptLock);
1091        CSync accept_sync(ls->m_AcceptCond, accept_lock);
1092 
1093        if ((ls->m_Status != SRTS_LISTENING) || ls->core().m_bBroken)
1094        {
1095            // This socket has been closed.
1096            accepted = true;
1097        }
1098        else if (ls->m_QueuedSockets.size() > 0)
1099        {
1100            set<SRTSOCKET>::iterator b = ls->m_QueuedSockets.begin();
1101            u = *b;
1102            ls->m_QueuedSockets.erase(b);
1103            accepted = true;
1104        }
1105        else if (!ls->core().m_config.bSynRecving)
1106        {
1107            accepted = true;
1108        }
1109 
1110        if (!accepted && (ls->m_Status == SRTS_LISTENING))
1111            accept_sync.wait();
1112 
1113        if (ls->m_QueuedSockets.empty())
1114            m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_ACCEPT, false);
1115    }
1116 
1117    if (u == CUDT::INVALID_SOCK)
1118    {
1119       // non-blocking receiving, no connection available
1120       if (!ls->core().m_config.bSynRecving)
1121       {
1122          LOGC(cnlog.Error, log << "srt_accept: no pending connection available at the moment");
1123          throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
1124       }
1125 
1126       LOGC(cnlog.Error, log << "srt_accept: listener socket @" << listen << " is already closed");
1127       // listening socket is closed
1128       throw CUDTException(MJ_SETUP, MN_CLOSED, 0);
1129    }
1130 
1131    CUDTSocket* s = locateSocket(u);
1132    if (s == NULL)
1133    {
1134       LOGC(cnlog.Error, log << "srt_accept: pending connection has unexpectedly closed");
1135       throw CUDTException(MJ_SETUP, MN_CLOSED, 0);
1136    }
1137 
1138    // Set properly the SRTO_GROUPCONNECT flag
1139    s->core().m_config.iGroupConnect = 0;
1140 
1141    // Check if LISTENER has the SRTO_GROUPCONNECT flag set,
1142    // and the already accepted socket has successfully joined
1143    // the mirror group. If so, RETURN THE GROUP ID, not the socket ID.
1144 #if ENABLE_EXPERIMENTAL_BONDING
1145    if (ls->core().m_config.iGroupConnect == 1 && s->m_GroupOf)
1146    {
1147        // Put a lock to protect the group against accidental deletion
1148        // in the meantime.
1149        ScopedLock glock (m_GlobControlLock);
1150        // Check again; it's unlikely to happen, but
1151        // it's a theoretically possible scenario
1152        if (s->m_GroupOf)
1153        {
1154            u = s->m_GroupOf->m_GroupID;
1155            s->core().m_config.iGroupConnect = 1; // should be derived from ls, but make sure
1156 
1157            // Mark the beginning of the connection at the moment
1158            // when the group ID is returned to the app caller
1159             s->m_GroupOf->m_stats.tsLastSampleTime = steady_clock::now();
1160        }
1161        else
1162        {
1163            LOGC(smlog.Error, log << "accept: IPE: socket's group deleted in the meantime of accept process???");
1164        }
1165    }
1166 #endif
1167 
1168    ScopedLock cg(s->m_ControlLock);
1169 
1170    if (pw_addr != NULL && pw_addrlen != NULL)
1171    {
1172       // Check if the length of the buffer to fill the name in
1173       // was large enough.
1174       const int len = s->m_PeerAddr.size();
1175       if (*pw_addrlen < len)
1176           throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
1177 
1178       memcpy((pw_addr), &s->m_PeerAddr, len);
1179       *pw_addrlen = len;
1180    }
1181 
1182    return u;
1183 }
1184 
connect(SRTSOCKET u,const sockaddr * srcname,const sockaddr * tarname,int namelen)1185 int srt::CUDTUnited::connect(SRTSOCKET u, const sockaddr* srcname, const sockaddr* tarname, int namelen)
1186 {
1187     // Here both srcname and tarname must be specified
1188     if (!srcname || !tarname || size_t(namelen) < sizeof (sockaddr_in))
1189     {
1190         LOGC(aclog.Error, log << "connect(with source): invalid call: srcname="
1191                 << srcname << " tarname=" << tarname << " namelen=" << namelen);
1192         throw CUDTException(MJ_NOTSUP, MN_INVAL);
1193     }
1194 
1195     sockaddr_any source_addr(srcname, namelen);
1196     if (source_addr.len == 0)
1197         throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
1198     sockaddr_any target_addr(tarname, namelen);
1199     if (target_addr.len == 0)
1200         throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
1201 
1202 #if ENABLE_EXPERIMENTAL_BONDING
1203     // Check affiliation of the socket. It's now allowed for it to be
1204     // a group or socket. For a group, add automatically a socket to
1205     // the group.
1206     if (u & SRTGROUP_MASK)
1207     {
1208         GroupKeeper k (*this, u, ERH_THROW);
1209         // Note: forced_isn is ignored when connecting a group.
1210         // The group manages the ISN by itself ALWAYS, that is,
1211         // it's generated anew for the very first socket, and then
1212         // derived by all sockets in the group.
1213         SRT_SOCKGROUPCONFIG gd[1] = { srt_prepare_endpoint(srcname, tarname, namelen) };
1214 
1215         // When connecting to exactly one target, only this very target
1216         // can be returned as a socket, so rewritten back array can be ignored.
1217         return singleMemberConnect(k.group, gd);
1218     }
1219 #endif
1220 
1221     CUDTSocket* s = locateSocket(u);
1222     if (s == NULL)
1223         throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
1224 
1225     // For a single socket, just do bind, then connect
1226     bind(s, source_addr);
1227     return connectIn(s, target_addr, SRT_SEQNO_NONE);
1228 }
1229 
connect(const SRTSOCKET u,const sockaddr * name,int namelen,int32_t forced_isn)1230 int srt::CUDTUnited::connect(const SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn)
1231 {
1232     sockaddr_any target_addr(name, namelen);
1233     if (target_addr.len == 0)
1234         throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
1235 
1236 #if ENABLE_EXPERIMENTAL_BONDING
1237     // Check affiliation of the socket. It's now allowed for it to be
1238     // a group or socket. For a group, add automatically a socket to
1239     // the group.
1240     if (u & SRTGROUP_MASK)
1241     {
1242         GroupKeeper k (*this, u, ERH_THROW);
1243 
1244         // Note: forced_isn is ignored when connecting a group.
1245         // The group manages the ISN by itself ALWAYS, that is,
1246         // it's generated anew for the very first socket, and then
1247         // derived by all sockets in the group.
1248         SRT_SOCKGROUPCONFIG gd[1] = { srt_prepare_endpoint(NULL, name, namelen) };
1249         return singleMemberConnect(k.group, gd);
1250     }
1251 #endif
1252 
1253     CUDTSocket* s = locateSocket(u);
1254     if (!s)
1255         throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
1256 
1257     return connectIn(s, target_addr, forced_isn);
1258 }
1259 
1260 #if ENABLE_EXPERIMENTAL_BONDING
singleMemberConnect(CUDTGroup * pg,SRT_SOCKGROUPCONFIG * gd)1261 int srt::CUDTUnited::singleMemberConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* gd)
1262 {
1263     int gstat = groupConnect(pg, gd, 1);
1264     if (gstat == -1)
1265     {
1266         // We have only one element here, so refer to it.
1267         // Sanity check
1268         if (gd->errorcode == SRT_SUCCESS)
1269             gd->errorcode = SRT_EINVPARAM;
1270 
1271         CodeMajor mj = CodeMajor(gd->errorcode / 1000);
1272         CodeMinor mn = CodeMinor(gd->errorcode % 1000);
1273 
1274         return CUDT::APIError(mj, mn);
1275     }
1276 
1277     return gstat;
1278 }
1279 
1280 // [[using assert(pg->m_iBusy > 0)]]
groupConnect(CUDTGroup * pg,SRT_SOCKGROUPCONFIG * targets,int arraysize)1281 int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int arraysize)
1282 {
1283     CUDTGroup& g = *pg;
1284     SRT_ASSERT(g.m_iBusy > 0);
1285 
1286     // The group must be managed to use srt_connect on it,
1287     // as it must create particular socket automatically.
1288 
1289     // Non-managed groups can't be "connected" - at best you can connect
1290     // every socket individually.
1291     if (!g.managed())
1292         throw CUDTException(MJ_NOTSUP, MN_INVAL);
1293 
1294     // Check and report errors on data brought in by srt_prepare_endpoint,
1295     // as the latter function has no possibility to report errors.
1296     for (int tii = 0; tii < arraysize; ++tii)
1297     {
1298         if (targets[tii].srcaddr.ss_family != targets[tii].peeraddr.ss_family)
1299         {
1300             LOGC(aclog.Error, log << "srt_connect/group: family differs on source and target address");
1301             throw CUDTException(MJ_NOTSUP, MN_INVAL);
1302         }
1303 
1304         if (targets[tii].weight > CUDT::MAX_WEIGHT)
1305         {
1306             LOGC(aclog.Error, log << "srt_connect/group: weight value must be between 0 and " << (+CUDT::MAX_WEIGHT));
1307             throw CUDTException(MJ_NOTSUP, MN_INVAL);
1308         }
1309     }
1310 
1311     // If the open state switched to OPENED, the blocking mode
1312     // must make it wait for connecting it. Doing connect when the
1313     // group is already OPENED returns immediately, regardless if the
1314     // connection is going to later succeed or fail (this will be
1315     // known in the group state information).
1316     bool block_new_opened = !g.m_bOpened && g.m_bSynRecving;
1317     const bool was_empty = g.groupEmpty();
1318 
1319     // In case the group was retried connection, clear first all epoll readiness.
1320     const int ncleared = m_EPoll.update_events(g.id(), g.m_sPollID, SRT_EPOLL_ERR, false);
1321     if (was_empty || ncleared)
1322     {
1323         HLOGC(aclog.Debug, log << "srt_connect/group: clearing IN/OUT because was_empty=" << was_empty << " || ncleared=" << ncleared);
1324         // IN/OUT only in case when the group is empty, otherwise it would
1325         // clear out correct readiness resulting from earlier calls.
1326         // This also should happen if ERR flag was set, as IN and OUT could be set, too.
1327         m_EPoll.update_events(g.id(), g.m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT, false);
1328     }
1329     SRTSOCKET retval = -1;
1330 
1331     int eid = -1;
1332     int connect_modes = SRT_EPOLL_CONNECT | SRT_EPOLL_ERR;
1333     if (block_new_opened)
1334     {
1335         // Create this eid only to block-wait for the first
1336         // connection.
1337         eid = srt_epoll_create();
1338     }
1339 
1340     // Use private map to avoid searching in the
1341     // overall map.
1342     map<SRTSOCKET, CUDTSocket*> spawned;
1343 
1344     HLOGC(aclog.Debug, log << "groupConnect: will connect " << arraysize << " links and "
1345             << (block_new_opened ? "BLOCK until any is ready" : "leave the process in background"));
1346 
1347     for (int tii = 0; tii < arraysize; ++tii)
1348     {
1349         sockaddr_any target_addr(targets[tii].peeraddr);
1350         sockaddr_any source_addr(targets[tii].srcaddr);
1351         SRTSOCKET& sid_rloc = targets[tii].id;
1352         int& erc_rloc = targets[tii].errorcode;
1353         erc_rloc = SRT_SUCCESS; // preinitialized
1354         HLOGC(aclog.Debug, log << "groupConnect: taking on " << sockaddr_any(targets[tii].peeraddr).str());
1355 
1356         CUDTSocket* ns = 0;
1357 
1358         // NOTE: After calling newSocket, the socket is mapped into m_Sockets.
1359         // It must be MANUALLY removed from this list in case we need it deleted.
1360         SRTSOCKET sid = newSocket(&ns);
1361 
1362         if (pg->m_cbConnectHook)
1363         {
1364             // Derive the connect hook by the socket, if set on the group
1365             ns->core().m_cbConnectHook = pg->m_cbConnectHook;
1366         }
1367 
1368         SRT_SocketOptionObject* config = targets[tii].config;
1369 
1370         // XXX Support non-blocking mode:
1371         // If the group has nonblocking set for connect (SNDSYN),
1372         // then it must set so on the socket. Then, the connection
1373         // process is asynchronous. The socket appears first as
1374         // GST_PENDING state, and only after the socket becomes
1375         // connected does its status in the group turn into GST_IDLE.
1376 
1377         // Set all options that were requested by the options set on a group
1378         // prior to connecting.
1379         string error_reason SRT_ATR_UNUSED;
1380         try
1381         {
1382             for (size_t i = 0; i < g.m_config.size(); ++i)
1383             {
1384                 HLOGC(aclog.Debug, log << "groupConnect: OPTION @" << sid << " #" << g.m_config[i].so);
1385                 error_reason = "setting group-derived option: #" + Sprint(g.m_config[i].so);
1386                 ns->core().setOpt(g.m_config[i].so, &g.m_config[i].value[0], (int) g.m_config[i].value.size());
1387             }
1388 
1389             // Do not try to set a user option if failed already.
1390             if (config)
1391             {
1392                 error_reason = "user option";
1393                 ns->core().applyMemberConfigObject(*config);
1394             }
1395 
1396             error_reason = "bound address";
1397             // We got it. Bind the socket, if the source address was set
1398             if (!source_addr.empty())
1399                 bind(ns, source_addr);
1400 
1401         }
1402         catch (CUDTException& e)
1403         {
1404             // Just notify the problem, but the loop must continue.
1405             // Set the original error as reported.
1406             targets[tii].errorcode = e.getErrorCode();
1407             LOGC(aclog.Error, log << "srt_connect_group: failed to set " << error_reason);
1408         }
1409         catch (...)
1410         {
1411             // Set the general EINVPARAM - this error should never happen
1412             LOGC(aclog.Error, log << "IPE: CUDT::setOpt reported unknown exception");
1413             targets[tii].errorcode = SRT_EINVPARAM;
1414         }
1415 
1416         // Add socket to the group.
1417         // Do it after setting all stored options, as some of them may
1418         // influence some group data.
1419 
1420         srt::groups::SocketData data = srt::groups::prepareSocketData(ns);
1421         if (targets[tii].token != -1)
1422         {
1423             // Reuse the token, if specified by the caller
1424             data.token = targets[tii].token;
1425         }
1426         else
1427         {
1428             // Otherwise generate and write back the token
1429             data.token = CUDTGroup::genToken();
1430             targets[tii].token = data.token;
1431         }
1432 
1433         {
1434             ScopedLock cs(m_GlobControlLock);
1435             if (m_Sockets.count(sid) == 0)
1436             {
1437                 HLOGC(aclog.Debug, log << "srt_connect_group: socket @" << sid << " deleted in process");
1438                 // Someone deleted the socket in the meantime?
1439                 // Unlikely, but possible in theory.
1440                 // Don't delete anyhting - it's alreay done.
1441                 continue;
1442             }
1443 
1444             // There's nothing wrong with preparing the data first
1445             // even if this happens for nothing. But now, under the lock
1446             // and after checking that the socket still exists, check now
1447             // if this succeeded, and then also if the group is still usable.
1448             // The group will surely exist because it's set busy, until the
1449             // end of this function. But it might be simultaneously requested closed.
1450             bool proceed = true;
1451 
1452             if (targets[tii].errorcode != SRT_SUCCESS)
1453             {
1454                 HLOGC(aclog.Debug, log << "srt_connect_group: not processing @" << sid << " due to error in setting options");
1455                 proceed = false;
1456             }
1457 
1458             if (g.m_bClosing)
1459             {
1460                 HLOGC(aclog.Debug, log << "srt_connect_group: not processing @" << sid << " due to CLOSED GROUP $" << g.m_GroupID);
1461                 proceed = false;
1462             }
1463 
1464             if (proceed)
1465             {
1466                 CUDTGroup::SocketData* f = g.add(data);
1467                 ns->m_GroupMemberData = f;
1468                 ns->m_GroupOf = &g;
1469                 f->weight = targets[tii].weight;
1470                 LOGC(aclog.Note, log << "srt_connect_group: socket @" << sid << " added to group $" << g.m_GroupID);
1471             }
1472             else
1473             {
1474                 targets[tii].id = CUDT::INVALID_SOCK;
1475                 delete ns;
1476                 m_Sockets.erase(sid);
1477 
1478                 // If failed to set options, then do not continue
1479                 // neither with binding, nor with connecting.
1480                 continue;
1481             }
1482         }
1483 
1484 
1485         // XXX This should be reenabled later, this should
1486         // be probably still in use to exchange information about
1487         // packets assymetrically lost. But for no other purpose.
1488         /*
1489         ns->core().m_cbPacketArrival.set(ns->m_pUDT, &CUDT::groupPacketArrival);
1490         */
1491 
1492         int isn = g.currentSchedSequence();
1493 
1494         // Don't synchronize ISN in case of synch on msgno. Every link
1495         // may send their own payloads independently.
1496         if (g.synconmsgno())
1497         {
1498             HLOGC(aclog.Debug, log << "groupConnect: NOT synchronizing sequence numbers: will sync on msgno");
1499             isn = -1;
1500         }
1501 
1502         // Set it the groupconnect option, as all in-group sockets should have.
1503         ns->core().m_config.iGroupConnect = 1;
1504 
1505         // Every group member will have always nonblocking
1506         // (this implies also non-blocking connect/accept).
1507         // The group facility functions will block when necessary
1508         // using epoll_wait.
1509         ns->core().m_config.bSynRecving = false;
1510         ns->core().m_config.bSynSending = false;
1511 
1512         HLOGC(aclog.Debug, log << "groupConnect: NOTIFIED AS PENDING @" << sid << " both read and write");
1513         // If this socket is not to block the current connect process,
1514         // it may still be needed for the further check if the redundant
1515         // connection succeeded or failed and whether the new socket is
1516         // ready to use or needs to be closed.
1517         epoll_add_usock_INTERNAL(g.m_SndEID, ns, &connect_modes);
1518         epoll_add_usock_INTERNAL(g.m_RcvEID, ns, &connect_modes);
1519 
1520         // Adding a socket on which we need to block to BOTH these tracking EIDs
1521         // and the blocker EID. We'll simply remove from them later all sockets that
1522         // got connected state or were broken.
1523 
1524         if (block_new_opened)
1525         {
1526             HLOGC(aclog.Debug, log << "groupConnect: WILL BLOCK on @" << sid << " until connected");
1527             epoll_add_usock_INTERNAL(eid, ns, &connect_modes);
1528         }
1529 
1530         // And connect
1531         try
1532         {
1533             HLOGC(aclog.Debug, log << "groupConnect: connecting a new socket with ISN=" << isn);
1534             connectIn(ns, target_addr, isn);
1535         }
1536         catch (const CUDTException& e)
1537         {
1538             LOGC(aclog.Error, log << "groupConnect: socket @" << sid << " in group " << pg->id() << " failed to connect");
1539             // We know it does belong to a group.
1540             // Remove it first because this involves a mutex, and we want
1541             // to avoid locking more than one mutex at a time.
1542             erc_rloc = e.getErrorCode();
1543             targets[tii].errorcode = e.getErrorCode();
1544             targets[tii].id = CUDT::INVALID_SOCK;
1545 
1546             ScopedLock cl (m_GlobControlLock);
1547             ns->removeFromGroup(false);
1548             m_Sockets.erase(ns->m_SocketID);
1549             // Intercept to delete the socket on failure.
1550             delete ns;
1551             continue;
1552         }
1553         catch (...)
1554         {
1555             LOGC(aclog.Fatal, log << "groupConnect: IPE: UNKNOWN EXCEPTION from connectIn");
1556             targets[tii].errorcode = SRT_ESYSOBJ;
1557             targets[tii].id = CUDT::INVALID_SOCK;
1558             ScopedLock cl (m_GlobControlLock);
1559             ns->removeFromGroup(false);
1560             m_Sockets.erase(ns->m_SocketID);
1561             // Intercept to delete the socket on failure.
1562             delete ns;
1563 
1564             // Do not use original exception, it may crash off a C API.
1565             throw CUDTException(MJ_SYSTEMRES, MN_OBJECT);
1566         }
1567 
1568         SRT_SOCKSTATUS st;
1569         {
1570             ScopedLock grd (ns->m_ControlLock);
1571             st = ns->getStatus();
1572         }
1573 
1574         {
1575             // NOTE: Not applying m_GlobControlLock because the group is now
1576             // set busy, so it won't be deleted, even if it was requested to be closed.
1577             ScopedLock grd (g.m_GroupLock);
1578 
1579             if (!ns->m_GroupOf)
1580             {
1581                 // The situation could get changed between the unlock and lock of m_GroupLock.
1582                 // This must be checked again.
1583                 // If a socket has been removed from group, it means that some other thread is
1584                 // currently trying to delete the socket. Therefore it doesn't have, and even shouldn't,
1585                 // be deleted here. Just exit with error report.
1586                 LOGC(aclog.Error, log << "groupConnect: self-created member socket deleted during process, SKIPPING.");
1587 
1588                 // Do not report the error from here, just ignore this socket.
1589                 continue;
1590             }
1591 
1592             // If m_GroupOf is not NULL, the m_IncludedIter is still valid.
1593             CUDTGroup::SocketData* f = ns->m_GroupMemberData;
1594 
1595             // Now under a group lock, we need to make sure the group isn't being closed
1596             // in order not to add a socket to a dead group.
1597             if (g.m_bClosing)
1598             {
1599                 LOGC(aclog.Error, log << "groupConnect: group deleted while connecting; breaking the process");
1600 
1601                 // Set the status as pending so that the socket is taken care of later.
1602                 // Note that all earlier sockets that were processed in this loop were either
1603                 // set BROKEN or PENDING.
1604                 f->sndstate = SRT_GST_PENDING;
1605                 f->rcvstate = SRT_GST_PENDING;
1606                 retval = -1;
1607                 break;
1608             }
1609 
1610             HLOGC(aclog.Debug, log << "groupConnect: @" << sid << " connection successful, setting group OPEN (was "
1611                     << (g.m_bOpened ? "ALREADY" : "NOT") << "), will " << (block_new_opened ? "" : "NOT ")
1612                     << "block the connect call, status:" << SockStatusStr(st));
1613 
1614             // XXX OPEN OR CONNECTED?
1615             // BLOCK IF NOT OPEN OR BLOCK IF NOT CONNECTED?
1616             //
1617             // What happens to blocking when there are 2 connections
1618             // pending, about to be broken, and srt_connect() is called again?
1619             // SHOULD BLOCK the latter, even though is OPEN.
1620             // Or, OPEN should be removed from here and srt_connect(_group)
1621             // should block always if the group doesn't have neither 1 conencted link
1622             g.m_bOpened = true;
1623 
1624             g.m_stats.tsLastSampleTime = steady_clock::now();
1625 
1626             f->laststatus = st;
1627             // Check the socket status and update it.
1628             // Turn the group state of the socket to IDLE only if
1629             // connection is established or in progress
1630             f->agent = source_addr;
1631             f->peer = target_addr;
1632 
1633             if (st >= SRTS_BROKEN)
1634             {
1635                 f->sndstate = SRT_GST_BROKEN;
1636                 f->rcvstate = SRT_GST_BROKEN;
1637                 epoll_remove_socket_INTERNAL(g.m_SndEID, ns);
1638                 epoll_remove_socket_INTERNAL(g.m_RcvEID, ns);
1639             }
1640             else
1641             {
1642                 f->sndstate = SRT_GST_PENDING;
1643                 f->rcvstate = SRT_GST_PENDING;
1644                 spawned[sid] = ns;
1645 
1646                 sid_rloc = sid;
1647                 erc_rloc = 0;
1648                 retval = sid;
1649             }
1650         }
1651     }
1652 
1653     if (retval == -1)
1654     {
1655         HLOGC(aclog.Debug, log << "groupConnect: none succeeded as background-spawn, exit with error");
1656         block_new_opened = false; // Avoid executing further while loop
1657     }
1658 
1659     vector<SRTSOCKET> broken;
1660 
1661     while (block_new_opened)
1662     {
1663         if (spawned.empty())
1664         {
1665             // All were removed due to errors.
1666             retval = -1;
1667             break;
1668         }
1669         HLOGC(aclog.Debug, log << "groupConnect: first connection, applying EPOLL WAITING.");
1670         int len = (int) spawned.size();
1671         vector<SRTSOCKET> ready(spawned.size());
1672         const int estat = srt_epoll_wait(eid,
1673                     NULL, NULL,  // IN/ACCEPT
1674                     &ready[0], &len, // OUT/CONNECT
1675                     -1, // indefinitely (FIXME Check if it needs to REGARD CONNECTION TIMEOUT!)
1676                     NULL, NULL,
1677                     NULL, NULL
1678                     );
1679 
1680         // Sanity check. Shouldn't happen if subs are in sync with spawned.
1681         if (estat == -1)
1682         {
1683 #if ENABLE_LOGGING
1684             CUDTException& x = CUDT::getlasterror();
1685             if (x.getErrorCode() != SRT_EPOLLEMPTY)
1686             {
1687                 LOGC(aclog.Error, log << "groupConnect: srt_epoll_wait failed not because empty, unexpected IPE:"
1688                         << x.getErrorMessage());
1689             }
1690 #endif
1691             HLOGC(aclog.Debug, log << "groupConnect: srt_epoll_wait failed - breaking the wait loop");
1692             retval = -1;
1693             break;
1694         }
1695 
1696         // At the moment when you are going to work with real sockets,
1697         // lock the groups so that no one messes up with something here
1698         // in the meantime.
1699 
1700         ScopedLock lock (*g.exp_groupLock());
1701 
1702         // NOTE: UNDER m_GroupLock, NO API FUNCTION CALLS DARE TO HAPPEN BELOW!
1703 
1704         // Check first if a socket wasn't closed in the meantime. It will be
1705         // automatically removed from all EIDs, but there's no sense in keeping
1706         // them in 'spawned' map.
1707         for (map<SRTSOCKET, CUDTSocket*>::iterator y = spawned.begin();
1708                 y != spawned.end(); ++y)
1709         {
1710             SRTSOCKET sid = y->first;
1711             if (y->second->getStatus() >= SRTS_BROKEN)
1712             {
1713                 HLOGC(aclog.Debug, log << "groupConnect: Socket @" << sid << " got BROKEN in the meantine during the check, remove from candidates");
1714                 // Remove from spawned and try again
1715                 broken.push_back(sid);
1716 
1717                 epoll_remove_socket_INTERNAL(eid, y->second);
1718                 epoll_remove_socket_INTERNAL(g.m_SndEID, y->second);
1719                 epoll_remove_socket_INTERNAL(g.m_RcvEID, y->second);
1720             }
1721         }
1722 
1723         // Remove them outside the loop because this can't be done
1724         // while iterating over the same container.
1725         for (size_t i = 0; i < broken.size(); ++i)
1726             spawned.erase(broken[i]);
1727 
1728         // Check the sockets if they were reported due
1729         // to have connected or due to have failed.
1730         // Distill successful ones. If distilled nothing, return -1.
1731         // If not all sockets were reported in this instance, repeat
1732         // the call until you get information about all of them.
1733         for (int i = 0; i < len; ++i)
1734         {
1735             map<SRTSOCKET, CUDTSocket*>::iterator x = spawned.find(ready[i]);
1736             if (x == spawned.end())
1737             {
1738                 // Might be removed above - ignore it.
1739                 continue;
1740             }
1741 
1742             SRTSOCKET sid = x->first;
1743             CUDTSocket* s = x->second;
1744 
1745             // Check status. If failed, remove from spawned
1746             // and try again.
1747             SRT_SOCKSTATUS st = s->getStatus();
1748             if (st >= SRTS_BROKEN)
1749             {
1750                 HLOGC(aclog.Debug, log << "groupConnect: Socket @" << sid << " got BROKEN during background connect, remove & TRY AGAIN");
1751                 // Remove from spawned and try again
1752                 if (spawned.erase(sid))
1753                     broken.push_back(sid);
1754 
1755                 epoll_remove_socket_INTERNAL(eid, s);
1756                 epoll_remove_socket_INTERNAL(g.m_SndEID, s);
1757                 epoll_remove_socket_INTERNAL(g.m_RcvEID, s);
1758 
1759                 continue;
1760             }
1761 
1762             if (st == SRTS_CONNECTED)
1763             {
1764                 HLOGC(aclog.Debug, log << "groupConnect: Socket @" << sid << " got CONNECTED as first in the group - reporting");
1765                 retval = sid;
1766                 g.m_bConnected = true;
1767                 block_new_opened = false; // Interrupt also rolling epoll (outer loop)
1768 
1769                 // Remove this socket from SND EID because it doesn't need to
1770                 // be connection-tracked anymore. Don't remove from the RCV EID
1771                 // however because RCV procedure relies on epoll also for reading
1772                 // and when found this socket connected it will "upgrade" it to
1773                 // read-ready tracking only.
1774                 epoll_remove_socket_INTERNAL(g.m_SndEID, s);
1775                 break;
1776             }
1777 
1778             // Spurious?
1779             HLOGC(aclog.Debug, log << "groupConnect: Socket @" << sid << " got spurious wakeup in "
1780                     << SockStatusStr(st) << " TRY AGAIN");
1781         }
1782         // END of m_GroupLock CS - you can safely use API functions now.
1783     }
1784     // Finished, delete epoll.
1785     if (eid != -1)
1786     {
1787         HLOGC(aclog.Debug, log << "connect FIRST IN THE GROUP finished, removing E" << eid);
1788         srt_epoll_release(eid);
1789     }
1790 
1791     for (vector<SRTSOCKET>::iterator b = broken.begin(); b != broken.end(); ++b)
1792     {
1793         CUDTSocket* s = locateSocket(*b, ERH_RETURN);
1794         if (!s)
1795             continue;
1796 
1797         // This will also automatically remove it from the group and all eids
1798         close(s);
1799     }
1800 
1801     // There's no possibility to report a problem on every connection
1802     // separately in case when every single connection has failed. What
1803     // is more interesting, it's only a matter of luck that all connections
1804     // fail at exactly the same time. OTOH if all are to fail, this
1805     // function will still be polling sockets to determine the last man
1806     // standing. Each one could, however, break by a different reason,
1807     // for example, one by timeout, another by wrong passphrase. Check
1808     // the `errorcode` field to determine the reaon for particular link.
1809     if (retval == -1)
1810         throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
1811 
1812     return retval;
1813 }
1814 #endif
1815 
1816 
connectIn(CUDTSocket * s,const sockaddr_any & target_addr,int32_t forced_isn)1817 int srt::CUDTUnited::connectIn(CUDTSocket* s, const sockaddr_any& target_addr, int32_t forced_isn)
1818 {
1819    ScopedLock cg(s->m_ControlLock);
1820    // a socket can "connect" only if it is in the following states:
1821    // - OPENED: assume the socket binding parameters are configured
1822    // - INIT: configure binding parameters here
1823    // - any other (meaning, already connected): report error
1824 
1825    if (s->m_Status == SRTS_INIT)
1826    {
1827        if (s->core().m_config.bRendezvous)
1828            throw CUDTException(MJ_NOTSUP, MN_ISRENDUNBOUND, 0);
1829 
1830        // If bind() was done first on this socket, then the
1831        // socket will not perform this step. This actually does the
1832        // same thing as bind() does, just with empty address so that
1833        // the binding parameters are autoselected.
1834 
1835        s->core().open();
1836        sockaddr_any autoselect_sa (target_addr.family());
1837        // This will create such a sockaddr_any that
1838        // will return true from empty().
1839        updateMux(s, autoselect_sa);  // <<---- updateMux
1840        // -> C(Snd|Rcv)Queue::init
1841        // -> pthread_create(...C(Snd|Rcv)Queue::worker...)
1842        s->m_Status = SRTS_OPENED;
1843    }
1844    else
1845    {
1846        if (s->m_Status != SRTS_OPENED)
1847            throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0);
1848 
1849        // status = SRTS_OPENED, so family should be known already.
1850        if (target_addr.family() != s->m_SelfAddr.family())
1851        {
1852            LOGP(cnlog.Error, "srt_connect: socket is bound to a different family than target address");
1853            throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
1854        }
1855    }
1856 
1857 
1858    // connect_complete() may be called before connect() returns.
1859    // So we need to update the status before connect() is called,
1860    // otherwise the status may be overwritten with wrong value
1861    // (CONNECTED vs. CONNECTING).
1862    s->m_Status = SRTS_CONNECTING;
1863 
1864   /*
1865    * In blocking mode, connect can block for up to 30 seconds for
1866    * rendez-vous mode. Holding the s->m_ControlLock prevent close
1867    * from cancelling the connect
1868    */
1869    try
1870    {
1871        // record peer address
1872        s->m_PeerAddr = target_addr;
1873        s->core().startConnect(target_addr, forced_isn);
1874    }
1875    catch (CUDTException& e) // Interceptor, just to change the state.
1876    {
1877       s->m_Status = SRTS_OPENED;
1878       throw e;
1879    }
1880 
1881    // ScopedLock destructor will delete cg and unlock s->m_ControlLock
1882 
1883    return 0;
1884 }
1885 
1886 
close(const SRTSOCKET u)1887 int srt::CUDTUnited::close(const SRTSOCKET u)
1888 {
1889 #if ENABLE_EXPERIMENTAL_BONDING
1890     if (u & SRTGROUP_MASK)
1891     {
1892         GroupKeeper k (*this, u, ERH_THROW);
1893         k.group->close();
1894         deleteGroup(k.group);
1895         return 0;
1896     }
1897 #endif
1898     CUDTSocket* s = locateSocket(u);
1899     if (!s)
1900         throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
1901 
1902     return close(s);
1903 }
1904 
1905 #if ENABLE_EXPERIMENTAL_BONDING
deleteGroup(CUDTGroup * g)1906 void srt::CUDTUnited::deleteGroup(CUDTGroup* g)
1907 {
1908     using srt_logging::gmlog;
1909 
1910     srt::sync::ScopedLock cg (m_GlobControlLock);
1911     return deleteGroup_LOCKED(g);
1912 }
1913 
1914 // [[using locked(m_GlobControlLock)]]
deleteGroup_LOCKED(CUDTGroup * g)1915 void srt::CUDTUnited::deleteGroup_LOCKED(CUDTGroup* g)
1916 {
1917     SRT_ASSERT(g->groupEmpty());
1918 
1919     // After that the group is no longer findable by GroupKeeper
1920     m_Groups.erase(g->m_GroupID);
1921     m_ClosedGroups[g->m_GroupID] = g;
1922 
1923     // Paranoid check: since the group is in m_ClosedGroups
1924     // it may potentially be deleted. Make sure no socket points
1925     // to it. Actually all sockets should have been already removed
1926     // from the group container, so if any does, it's invalid.
1927     for (sockets_t::iterator i = m_Sockets.begin();
1928             i != m_Sockets.end(); ++ i)
1929     {
1930         CUDTSocket* s = i->second;
1931         if (s->m_GroupOf == g)
1932         {
1933             HLOGC(smlog.Debug, log << "deleteGroup: IPE: existing @" << s->m_SocketID << " points to a dead group!");
1934             s->m_GroupOf = NULL;
1935             s->m_GroupMemberData = NULL;
1936         }
1937     }
1938 
1939     // Just in case, do it in closed sockets, too, although this should be
1940     // always done before moving to it.
1941     for (sockets_t::iterator i = m_ClosedSockets.begin();
1942             i != m_ClosedSockets.end(); ++ i)
1943     {
1944         CUDTSocket* s = i->second;
1945         if (s->m_GroupOf == g)
1946         {
1947             HLOGC(smlog.Debug, log << "deleteGroup: IPE: closed @" << s->m_SocketID << " points to a dead group!");
1948             s->m_GroupOf = NULL;
1949             s->m_GroupMemberData = NULL;
1950         }
1951     }
1952 }
1953 #endif
1954 
close(CUDTSocket * s)1955 int srt::CUDTUnited::close(CUDTSocket* s)
1956 {
1957    HLOGC(smlog.Debug, log << s->core().CONID() << " CLOSE. Acquiring control lock");
1958    ScopedLock socket_cg(s->m_ControlLock);
1959    HLOGC(smlog.Debug, log << s->core().CONID() << " CLOSING (removing from listening, closing CUDT)");
1960 
1961    const bool synch_close_snd = s->core().m_config.bSynSending;
1962 
1963    SRTSOCKET u = s->m_SocketID;
1964 
1965    if (s->m_Status == SRTS_LISTENING)
1966    {
1967       if (s->core().m_bBroken)
1968          return 0;
1969 
1970       s->m_tsClosureTimeStamp = steady_clock::now();
1971       s->core().m_bBroken    = true;
1972 
1973       // Change towards original UDT:
1974       // Leave all the closing activities for garbageCollect to happen,
1975       // however remove the listener from the RcvQueue IMMEDIATELY.
1976       // Even though garbageCollect would eventually remove the listener
1977       // as well, there would be some time interval between now and the
1978       // moment when it's done, and during this time the application will
1979       // be unable to bind to this port that the about-to-delete listener
1980       // is currently occupying (due to blocked slot in the RcvQueue).
1981 
1982       HLOGC(smlog.Debug, log << s->core().CONID() << " CLOSING (removing listener immediately)");
1983       s->core().notListening();
1984 
1985       // broadcast all "accept" waiting
1986       CSync::lock_broadcast(s->m_AcceptCond, s->m_AcceptLock);
1987    }
1988    else
1989    {
1990        // Note: this call may be done on a socket that hasn't finished
1991        // sending all packets scheduled for sending, which means, this call
1992        // may block INDEFINITELY. As long as it's acceptable to block the
1993        // call to srt_close(), and all functions in all threads where this
1994        // very socket is used, this shall not block the central database.
1995        s->core().closeInternal();
1996 
1997        // synchronize with garbage collection.
1998        HLOGC(smlog.Debug, log << "@" << u << "U::close done. GLOBAL CLOSE: " << s->core().CONID() << ". Acquiring GLOBAL control lock");
1999        ScopedLock manager_cg(m_GlobControlLock);
2000        // since "s" is located before m_GlobControlLock, locate it again in case
2001        // it became invalid
2002        // XXX This is very weird; if we state that the CUDTSocket object
2003        // could not be deleted between locks, then definitely it couldn't
2004        // also change the pointer value. There's no other reason for getting
2005        // this iterator but to obtain the 's' pointer, which is impossible to
2006        // be different than previous 's' (m_Sockets is a map that stores pointers
2007        // transparently). This iterator isn't even later used to delete the socket
2008        // from the container, though it would be more efficient.
2009        // FURTHER RESEARCH REQUIRED.
2010        sockets_t::iterator i = m_Sockets.find(u);
2011        if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED))
2012        {
2013            HLOGC(smlog.Debug, log << "@" << u << "U::close: NOT AN ACTIVE SOCKET, returning.");
2014            return 0;
2015        }
2016        s = i->second;
2017        s->setClosed();
2018 
2019 #if ENABLE_EXPERIMENTAL_BONDING
2020        if (s->m_GroupOf)
2021        {
2022            HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
2023            s->removeFromGroup(true);
2024        }
2025 #endif
2026 
2027        m_Sockets.erase(s->m_SocketID);
2028        m_ClosedSockets[s->m_SocketID] = s;
2029        HLOGC(smlog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later.");
2030 
2031        CGlobEvent::triggerEvent();
2032    }
2033 
2034    HLOGC(smlog.Debug, log << "@" << u << ": GLOBAL: CLOSING DONE");
2035 
2036    // Check if the ID is still in closed sockets before you access it
2037    // (the last triggerEvent could have deleted it).
2038    if ( synch_close_snd )
2039    {
2040 #if SRT_ENABLE_CLOSE_SYNCH
2041 
2042        HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sync-waiting for releasing sender resources...");
2043        for (;;)
2044        {
2045            CSndBuffer* sb = s->core().m_pSndBuffer;
2046 
2047            // Disconnected from buffer - nothing more to check.
2048            if (!sb)
2049            {
2050                HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer disconnected. Allowed to close.");
2051                break;
2052            }
2053 
2054            // Sender buffer empty
2055            if (sb->getCurrBufSize() == 0)
2056            {
2057                HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer depleted. Allowed to close.");
2058                break;
2059            }
2060 
2061            // Ok, now you are keeping GC thread hands off the internal data.
2062            // You can check then if it has already deleted the socket or not.
2063            // The socket is either in m_ClosedSockets or is already gone.
2064 
2065            // Done the other way, but still done. You can stop waiting.
2066            bool isgone = false;
2067            {
2068                ScopedLock manager_cg(m_GlobControlLock);
2069                isgone = m_ClosedSockets.count(u) == 0;
2070            }
2071            if (!isgone)
2072            {
2073                isgone = !s->core().m_bOpened;
2074            }
2075            if (isgone)
2076            {
2077                HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: ... gone in the meantime, whatever. Exiting close().");
2078                break;
2079            }
2080 
2081            HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: ... still waiting for any update.");
2082            // How to handle a possible error here?
2083            CGlobEvent::waitForEvent();
2084 
2085            // Continue waiting in case when an event happened or 1s waiting time passed for checkpoint.
2086        }
2087 #endif
2088    }
2089 
2090    /*
2091       This code is PUT ASIDE for now.
2092       Most likely this will be never required.
2093       It had to hold the closing activity until the time when the receiver buffer is depleted.
2094       However the closing of the socket should only happen when the receiver has received
2095       an information about that the reading is no longer possible (error report from recv/recvfile).
2096       When this happens, the receiver buffer is definitely depleted already and there's no need to check
2097       anything.
2098 
2099       Should there appear any other conditions in future under which the closing process should be
2100       delayed until the receiver buffer is empty, this code can be filled here.
2101 
2102    if ( synch_close_rcv )
2103    {
2104    ...
2105    }
2106    */
2107 
2108    return 0;
2109 }
2110 
getpeername(const SRTSOCKET u,sockaddr * pw_name,int * pw_namelen)2111 void srt::CUDTUnited::getpeername(const SRTSOCKET u, sockaddr* pw_name, int* pw_namelen)
2112 {
2113    if (!pw_name || !pw_namelen)
2114        throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
2115 
2116    if (getStatus(u) != SRTS_CONNECTED)
2117       throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
2118 
2119    CUDTSocket* s = locateSocket(u);
2120 
2121    if (!s)
2122       throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2123 
2124    if (!s->core().m_bConnected || s->core().m_bBroken)
2125       throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
2126 
2127    const int len = s->m_PeerAddr.size();
2128    if (*pw_namelen < len)
2129        throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
2130 
2131    memcpy((pw_name), &s->m_PeerAddr.sa, len);
2132    *pw_namelen = len;
2133 }
2134 
getsockname(const SRTSOCKET u,sockaddr * pw_name,int * pw_namelen)2135 void srt::CUDTUnited::getsockname(const SRTSOCKET u, sockaddr* pw_name, int* pw_namelen)
2136 {
2137    if (!pw_name || !pw_namelen)
2138        throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
2139 
2140    CUDTSocket* s = locateSocket(u);
2141 
2142    if (!s)
2143       throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2144 
2145    if (s->core().m_bBroken)
2146       throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2147 
2148    if (s->m_Status == SRTS_INIT)
2149       throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
2150 
2151    const int len = s->m_SelfAddr.size();
2152    if (*pw_namelen < len)
2153        throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
2154 
2155    memcpy((pw_name), &s->m_SelfAddr.sa, len);
2156    *pw_namelen = len;
2157 }
2158 
select(UDT::UDSET * readfds,UDT::UDSET * writefds,UDT::UDSET * exceptfds,const timeval * timeout)2159 int srt::CUDTUnited::select(
2160    UDT::UDSET* readfds, UDT::UDSET* writefds, UDT::UDSET* exceptfds, const timeval* timeout)
2161 {
2162    const steady_clock::time_point entertime = steady_clock::now();
2163 
2164    const int64_t timeo_us = timeout
2165        ? static_cast<int64_t>(timeout->tv_sec) * 1000000 + timeout->tv_usec
2166        : -1;
2167    const steady_clock::duration timeo(microseconds_from(timeo_us));
2168 
2169    // initialize results
2170    int count = 0;
2171    set<SRTSOCKET> rs, ws, es;
2172 
2173    // retrieve related UDT sockets
2174    vector<CUDTSocket*> ru, wu, eu;
2175    CUDTSocket* s;
2176    if (readfds)
2177       for (set<SRTSOCKET>::iterator i1 = readfds->begin();
2178          i1 != readfds->end(); ++ i1)
2179       {
2180          if (getStatus(*i1) == SRTS_BROKEN)
2181          {
2182             rs.insert(*i1);
2183             ++ count;
2184          }
2185          else if (!(s = locateSocket(*i1)))
2186             throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2187          else
2188             ru.push_back(s);
2189       }
2190    if (writefds)
2191       for (set<SRTSOCKET>::iterator i2 = writefds->begin();
2192          i2 != writefds->end(); ++ i2)
2193       {
2194          if (getStatus(*i2) == SRTS_BROKEN)
2195          {
2196             ws.insert(*i2);
2197             ++ count;
2198          }
2199          else if (!(s = locateSocket(*i2)))
2200             throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2201          else
2202             wu.push_back(s);
2203       }
2204    if (exceptfds)
2205       for (set<SRTSOCKET>::iterator i3 = exceptfds->begin();
2206          i3 != exceptfds->end(); ++ i3)
2207       {
2208          if (getStatus(*i3) == SRTS_BROKEN)
2209          {
2210             es.insert(*i3);
2211             ++ count;
2212          }
2213          else if (!(s = locateSocket(*i3)))
2214             throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2215          else
2216             eu.push_back(s);
2217       }
2218 
2219    do
2220    {
2221       // query read sockets
2222       for (vector<CUDTSocket*>::iterator j1 = ru.begin(); j1 != ru.end(); ++ j1)
2223       {
2224          s = *j1;
2225 
2226          if (s->readReady() || s->m_Status == SRTS_CLOSED)
2227          {
2228             rs.insert(s->m_SocketID);
2229             ++ count;
2230          }
2231       }
2232 
2233       // query write sockets
2234       for (vector<CUDTSocket*>::iterator j2 = wu.begin(); j2 != wu.end(); ++ j2)
2235       {
2236          s = *j2;
2237 
2238          if (s->writeReady() || s->m_Status == SRTS_CLOSED)
2239          {
2240             ws.insert(s->m_SocketID);
2241             ++ count;
2242          }
2243       }
2244 
2245       // query exceptions on sockets
2246       for (vector<CUDTSocket*>::iterator j3 = eu.begin(); j3 != eu.end(); ++ j3)
2247       {
2248          // check connection request status, not supported now
2249       }
2250 
2251       if (0 < count)
2252          break;
2253 
2254       CGlobEvent::waitForEvent();
2255    } while (timeo > steady_clock::now() - entertime);
2256 
2257    if (readfds)
2258       *readfds = rs;
2259 
2260    if (writefds)
2261       *writefds = ws;
2262 
2263    if (exceptfds)
2264       *exceptfds = es;
2265 
2266    return count;
2267 }
2268 
selectEx(const vector<SRTSOCKET> & fds,vector<SRTSOCKET> * readfds,vector<SRTSOCKET> * writefds,vector<SRTSOCKET> * exceptfds,int64_t msTimeOut)2269 int srt::CUDTUnited::selectEx(
2270    const vector<SRTSOCKET>& fds,
2271    vector<SRTSOCKET>* readfds,
2272    vector<SRTSOCKET>* writefds,
2273    vector<SRTSOCKET>* exceptfds,
2274    int64_t msTimeOut)
2275 {
2276     const steady_clock::time_point entertime = steady_clock::now();
2277 
2278     const int64_t timeo_us = msTimeOut >= 0
2279         ? msTimeOut * 1000
2280         : -1;
2281     const steady_clock::duration timeo(microseconds_from(timeo_us));
2282 
2283    // initialize results
2284    int count = 0;
2285    if (readfds)
2286       readfds->clear();
2287    if (writefds)
2288       writefds->clear();
2289    if (exceptfds)
2290       exceptfds->clear();
2291 
2292    do
2293    {
2294       for (vector<SRTSOCKET>::const_iterator i = fds.begin();
2295          i != fds.end(); ++ i)
2296       {
2297          CUDTSocket* s = locateSocket(*i);
2298 
2299          if ((!s) || s->core().m_bBroken || (s->m_Status == SRTS_CLOSED))
2300          {
2301             if (exceptfds)
2302             {
2303                exceptfds->push_back(*i);
2304                ++ count;
2305             }
2306             continue;
2307          }
2308 
2309          if (readfds)
2310          {
2311             if ((s->core().m_bConnected
2312                   && s->core().m_pRcvBuffer->isRcvDataReady()
2313                )
2314                || (s->core().m_bListening
2315                   && (s->m_QueuedSockets.size() > 0)))
2316             {
2317                readfds->push_back(s->m_SocketID);
2318                ++ count;
2319             }
2320          }
2321 
2322          if (writefds)
2323          {
2324             if (s->core().m_bConnected
2325                && (s->core().m_pSndBuffer->getCurrBufSize()
2326                   < s->core().m_config.iSndBufSize))
2327             {
2328                writefds->push_back(s->m_SocketID);
2329                ++ count;
2330             }
2331          }
2332       }
2333 
2334       if (count > 0)
2335          break;
2336 
2337       CGlobEvent::waitForEvent();
2338    } while (timeo > steady_clock::now() - entertime);
2339 
2340    return count;
2341 }
2342 
epoll_create()2343 int srt::CUDTUnited::epoll_create()
2344 {
2345    return m_EPoll.create();
2346 }
2347 
epoll_clear_usocks(int eid)2348 int srt::CUDTUnited::epoll_clear_usocks(int eid)
2349 {
2350     return m_EPoll.clear_usocks(eid);
2351 }
2352 
epoll_add_usock(const int eid,const SRTSOCKET u,const int * events)2353 int srt::CUDTUnited::epoll_add_usock(
2354    const int eid, const SRTSOCKET u, const int* events)
2355 {
2356    int ret = -1;
2357 #if ENABLE_EXPERIMENTAL_BONDING
2358    if (u & SRTGROUP_MASK)
2359    {
2360        GroupKeeper k (*this, u, ERH_THROW);
2361        ret = m_EPoll.update_usock(eid, u, events);
2362        k.group->addEPoll(eid);
2363        return 0;
2364    }
2365 #endif
2366 
2367    CUDTSocket* s = locateSocket(u);
2368    if (s)
2369    {
2370       ret = epoll_add_usock_INTERNAL(eid, s, events);
2371    }
2372    else
2373    {
2374       throw CUDTException(MJ_NOTSUP, MN_SIDINVAL);
2375    }
2376 
2377    return ret;
2378 }
2379 
2380 // NOTE: WILL LOCK (serially):
2381 // - CEPoll::m_EPollLock
2382 // - CUDT::m_RecvLock
epoll_add_usock_INTERNAL(const int eid,CUDTSocket * s,const int * events)2383 int srt::CUDTUnited::epoll_add_usock_INTERNAL(const int eid, CUDTSocket* s, const int* events)
2384 {
2385     int ret = m_EPoll.update_usock(eid, s->m_SocketID, events);
2386     s->core().addEPoll(eid);
2387     return ret;
2388 }
2389 
epoll_add_ssock(const int eid,const SYSSOCKET s,const int * events)2390 int srt::CUDTUnited::epoll_add_ssock(
2391    const int eid, const SYSSOCKET s, const int* events)
2392 {
2393    return m_EPoll.add_ssock(eid, s, events);
2394 }
2395 
epoll_update_ssock(const int eid,const SYSSOCKET s,const int * events)2396 int srt::CUDTUnited::epoll_update_ssock(
2397    const int eid, const SYSSOCKET s, const int* events)
2398 {
2399    return m_EPoll.update_ssock(eid, s, events);
2400 }
2401 
2402 template <class EntityType>
epoll_remove_entity(const int eid,EntityType * ent)2403 int srt::CUDTUnited::epoll_remove_entity(const int eid, EntityType* ent)
2404 {
2405     // XXX Not sure if this is anyhow necessary because setting readiness
2406     // to false doesn't actually trigger any action. Further research needed.
2407     HLOGC(ealog.Debug, log << "epoll_remove_usock: CLEARING readiness on E" << eid << " of @" << ent->id());
2408     ent->removeEPollEvents(eid);
2409 
2410     // First remove the EID from the subscribed in the socket so that
2411     // a possible call to update_events:
2412     // - if happens before this call, can find the epoll bit update possible
2413     // - if happens after this call, will not strike this EID
2414     HLOGC(ealog.Debug, log << "epoll_remove_usock: REMOVING E" << eid << " from back-subscirbers in @" << ent->id());
2415     ent->removeEPollID(eid);
2416 
2417     HLOGC(ealog.Debug, log << "epoll_remove_usock: CLEARING subscription on E" << eid << " of @" << ent->id());
2418     int no_events = 0;
2419     int ret = m_EPoll.update_usock(eid, ent->id(), &no_events);
2420 
2421     return ret;
2422 }
2423 
2424 // Needed internal access!
epoll_remove_socket_INTERNAL(const int eid,CUDTSocket * s)2425 int srt::CUDTUnited::epoll_remove_socket_INTERNAL(const int eid, CUDTSocket* s)
2426 {
2427     return epoll_remove_entity(eid, &s->core());
2428 }
2429 
2430 #if ENABLE_EXPERIMENTAL_BONDING
epoll_remove_group_INTERNAL(const int eid,CUDTGroup * g)2431 int srt::CUDTUnited::epoll_remove_group_INTERNAL(const int eid, CUDTGroup* g)
2432 {
2433     return epoll_remove_entity(eid, g);
2434 }
2435 #endif
2436 
epoll_remove_usock(const int eid,const SRTSOCKET u)2437 int srt::CUDTUnited::epoll_remove_usock(const int eid, const SRTSOCKET u)
2438 {
2439    CUDTSocket* s = 0;
2440 
2441 #if ENABLE_EXPERIMENTAL_BONDING
2442    CUDTGroup* g = 0;
2443    if (u & SRTGROUP_MASK)
2444    {
2445        GroupKeeper k (*this, u, ERH_THROW);
2446        g = k.group;
2447        return epoll_remove_entity(eid, g);
2448    }
2449    else
2450 #endif
2451    {
2452        s = locateSocket(u);
2453        if (s)
2454            return epoll_remove_entity(eid, &s->core());
2455    }
2456 
2457    LOGC(ealog.Error, log << "remove_usock: @" << u
2458            << " not found as either socket or group. Removing only from epoll system.");
2459    int no_events = 0;
2460    return m_EPoll.update_usock(eid, u, &no_events);
2461 }
2462 
epoll_remove_ssock(const int eid,const SYSSOCKET s)2463 int srt::CUDTUnited::epoll_remove_ssock(const int eid, const SYSSOCKET s)
2464 {
2465    return m_EPoll.remove_ssock(eid, s);
2466 }
2467 
epoll_uwait(const int eid,SRT_EPOLL_EVENT * fdsSet,int fdsSize,int64_t msTimeOut)2468 int srt::CUDTUnited::epoll_uwait(
2469    const int eid,
2470    SRT_EPOLL_EVENT* fdsSet,
2471    int fdsSize,
2472    int64_t msTimeOut)
2473 {
2474    return m_EPoll.uwait(eid, fdsSet, fdsSize, msTimeOut);
2475 }
2476 
epoll_set(int eid,int32_t flags)2477 int32_t srt::CUDTUnited::epoll_set(int eid, int32_t flags)
2478 {
2479     return m_EPoll.setflags(eid, flags);
2480 }
2481 
epoll_release(const int eid)2482 int srt::CUDTUnited::epoll_release(const int eid)
2483 {
2484    return m_EPoll.release(eid);
2485 }
2486 
locateSocket(const SRTSOCKET u,ErrorHandling erh)2487 srt::CUDTSocket* srt::CUDTUnited::locateSocket(const SRTSOCKET u, ErrorHandling erh)
2488 {
2489     ScopedLock cg (m_GlobControlLock);
2490     CUDTSocket* s = locateSocket_LOCKED(u);
2491     if (!s)
2492     {
2493         if (erh == ERH_RETURN)
2494             return NULL;
2495         throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2496     }
2497 
2498     return s;
2499 }
2500 
2501 // [[using locked(m_GlobControlLock)]];
locateSocket_LOCKED(SRTSOCKET u)2502 srt::CUDTSocket* srt::CUDTUnited::locateSocket_LOCKED(SRTSOCKET u)
2503 {
2504     sockets_t::iterator i = m_Sockets.find(u);
2505 
2506     if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED))
2507     {
2508         return NULL;
2509     }
2510 
2511     return i->second;
2512 }
2513 
2514 #if ENABLE_EXPERIMENTAL_BONDING
locateAcquireGroup(SRTSOCKET u,ErrorHandling erh)2515 srt::CUDTGroup* srt::CUDTUnited::locateAcquireGroup(SRTSOCKET u, ErrorHandling erh)
2516 {
2517    ScopedLock cg (m_GlobControlLock);
2518 
2519    const groups_t::iterator i = m_Groups.find(u);
2520    if ( i == m_Groups.end() )
2521    {
2522        if (erh == ERH_THROW)
2523            throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
2524        return NULL;
2525    }
2526 
2527    ScopedLock cgroup (*i->second->exp_groupLock());
2528    i->second->apiAcquire();
2529    return i->second;
2530 }
2531 
acquireSocketsGroup(CUDTSocket * s)2532 srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s)
2533 {
2534    ScopedLock cg (m_GlobControlLock);
2535    CUDTGroup* g = s->m_GroupOf;
2536    if (!g)
2537        return NULL;
2538 
2539    // With m_GlobControlLock locked, we are sure the group
2540    // still exists, if it wasn't removed from this socket.
2541    g->apiAcquire();
2542    return g;
2543 }
2544 #endif
2545 
locatePeer(const sockaddr_any & peer,const SRTSOCKET id,int32_t isn)2546 srt::CUDTSocket* srt::CUDTUnited::locatePeer(
2547    const sockaddr_any& peer,
2548    const SRTSOCKET id,
2549    int32_t isn)
2550 {
2551    ScopedLock cg(m_GlobControlLock);
2552 
2553    map<int64_t, set<SRTSOCKET> >::iterator i = m_PeerRec.find(
2554       CUDTSocket::getPeerSpec(id, isn));
2555    if (i == m_PeerRec.end())
2556       return NULL;
2557 
2558    for (set<SRTSOCKET>::iterator j = i->second.begin();
2559       j != i->second.end(); ++ j)
2560    {
2561       sockets_t::iterator k = m_Sockets.find(*j);
2562       // this socket might have been closed and moved m_ClosedSockets
2563       if (k == m_Sockets.end())
2564          continue;
2565 
2566       if (k->second->m_PeerAddr == peer)
2567       {
2568          return k->second;
2569       }
2570    }
2571 
2572    return NULL;
2573 }
2574 
checkBrokenSockets()2575 void srt::CUDTUnited::checkBrokenSockets()
2576 {
2577    ScopedLock cg(m_GlobControlLock);
2578 
2579    // set of sockets To Be Closed and To Be Removed
2580    vector<SRTSOCKET> tbc;
2581    vector<SRTSOCKET> tbr;
2582 
2583 #if ENABLE_EXPERIMENTAL_BONDING
2584    vector<SRTSOCKET> delgids;
2585 
2586    for (groups_t::iterator i = m_ClosedGroups.begin(); i != m_ClosedGroups.end(); ++i)
2587    {
2588        // isStillBusy requires lock on the group, so only after an API
2589        // function that uses it returns, and so clears the busy flag,
2590        // a new API function won't be called anyway until it can acquire
2591        // GlobControlLock, and all functions that have already seen this
2592        // group as closing will not continue with the API and return.
2593        // If we caught some API function still using the closed group,
2594        // it's not going to wait, will be checked next time.
2595        if (i->second->isStillBusy())
2596            continue;
2597 
2598        delgids.push_back(i->first);
2599        delete i->second;
2600        i->second = NULL; // just for a case, avoid a dangling pointer
2601    }
2602 
2603    for (vector<SRTSOCKET>::iterator di = delgids.begin(); di != delgids.end(); ++di)
2604    {
2605        m_ClosedGroups.erase(*di);
2606    }
2607 
2608 #endif
2609 
2610    for (sockets_t::iterator i = m_Sockets.begin();
2611       i != m_Sockets.end(); ++ i)
2612    {
2613        CUDTSocket* s = i->second;
2614 
2615       // check broken connection
2616       if (s->core().m_bBroken)
2617       {
2618          if (s->m_Status == SRTS_LISTENING)
2619          {
2620             const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp;
2621             // for a listening socket, it should wait an extra 3 seconds
2622             // in case a client is connecting
2623             if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
2624             {
2625                continue;
2626             }
2627          }
2628          else if ((s->core().m_pRcvBuffer != NULL)
2629             // FIXED: calling isRcvDataAvailable() just to get the information
2630             // whether there are any data waiting in the buffer,
2631             // NOT WHETHER THEY ARE ALSO READY TO PLAY at the time when
2632             // this function is called (isRcvDataReady also checks if the
2633             // available data is "ready to play").
2634             && s->core().m_pRcvBuffer->isRcvDataAvailable())
2635          {
2636              const int bc = s->core().m_iBrokenCounter.load();
2637              if (bc > 0)
2638              {
2639                  // HLOGF(smlog.Debug, "STILL KEEPING socket (still have data):
2640                  // %d\n", i->first);
2641                  // if there is still data in the receiver buffer, wait longer
2642                  s->core().m_iBrokenCounter.store(bc - 1);
2643                  continue;
2644              }
2645          }
2646 
2647 #if ENABLE_EXPERIMENTAL_BONDING
2648          if (s->m_GroupOf)
2649          {
2650              LOGC(smlog.Note, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
2651              s->removeFromGroup(true);
2652          }
2653 #endif
2654 
2655          HLOGC(smlog.Debug, log << "checkBrokenSockets: moving BROKEN socket to CLOSED: @" << i->first);
2656 
2657          //close broken connections and start removal timer
2658          s->setClosed();
2659          tbc.push_back(i->first);
2660          m_ClosedSockets[i->first] = s;
2661 
2662          // remove from listener's queue
2663          sockets_t::iterator ls = m_Sockets.find(s->m_ListenSocket);
2664          if (ls == m_Sockets.end())
2665          {
2666             ls = m_ClosedSockets.find(s->m_ListenSocket);
2667             if (ls == m_ClosedSockets.end())
2668                continue;
2669          }
2670 
2671          enterCS(ls->second->m_AcceptLock);
2672          ls->second->m_QueuedSockets.erase(s->m_SocketID);
2673          leaveCS(ls->second->m_AcceptLock);
2674       }
2675    }
2676 
2677    for (sockets_t::iterator j = m_ClosedSockets.begin();
2678       j != m_ClosedSockets.end(); ++ j)
2679    {
2680       // HLOGF(smlog.Debug, "checking CLOSED socket: %d\n", j->first);
2681       if (!is_zero(j->second->core().m_tsLingerExpiration))
2682       {
2683          // asynchronous close:
2684          if ((!j->second->core().m_pSndBuffer)
2685             || (0 == j->second->core().m_pSndBuffer->getCurrBufSize())
2686             || (j->second->core().m_tsLingerExpiration <= steady_clock::now()))
2687          {
2688             HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << j->second->m_SocketID);
2689             j->second->core().m_tsLingerExpiration = steady_clock::time_point();
2690             j->second->core().m_bClosing = true;
2691             j->second->m_tsClosureTimeStamp = steady_clock::now();
2692          }
2693       }
2694 
2695       // timeout 1 second to destroy a socket AND it has been removed from
2696       // RcvUList
2697       const steady_clock::time_point now = steady_clock::now();
2698       const steady_clock::duration closed_ago = now - j->second->m_tsClosureTimeStamp;
2699       if (closed_ago > seconds_from(1))
2700       {
2701           CRNode* rnode = j->second->core().m_pRNode;
2702           if (!rnode || !rnode->m_bOnList)
2703           {
2704               HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << j->second->m_SocketID << " closed "
2705                       << FormatDuration(closed_ago) << " ago and removed from RcvQ - will remove");
2706 
2707               // HLOGF(smlog.Debug, "will unref socket: %d\n", j->first);
2708               tbr.push_back(j->first);
2709           }
2710       }
2711    }
2712 
2713    // move closed sockets to the ClosedSockets structure
2714    for (vector<SRTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k)
2715       m_Sockets.erase(*k);
2716 
2717    // remove those timeout sockets
2718    for (vector<SRTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)
2719       removeSocket(*l);
2720 }
2721 
2722 // [[using locked(m_GlobControlLock)]]
removeSocket(const SRTSOCKET u)2723 void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
2724 {
2725    sockets_t::iterator i = m_ClosedSockets.find(u);
2726 
2727    // invalid socket ID
2728    if (i == m_ClosedSockets.end())
2729       return;
2730 
2731    CUDTSocket* const s = i->second;
2732 
2733    // The socket may be in the trashcan now, but could
2734    // still be under processing in the sender/receiver worker
2735    // threads. If that's the case, SKIP IT THIS TIME. The
2736    // socket will be checked next time the GC rollover starts.
2737    CSNode* sn = s->core().m_pSNode;
2738    if (sn && sn->m_iHeapLoc != -1)
2739        return;
2740 
2741    CRNode* rn = s->core().m_pRNode;
2742    if (rn && rn->m_bOnList)
2743        return;
2744 
2745 
2746 #if ENABLE_EXPERIMENTAL_BONDING
2747    if (s->m_GroupOf)
2748    {
2749        HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
2750        s->removeFromGroup(true);
2751    }
2752 #endif
2753    // decrease multiplexer reference count, and remove it if necessary
2754    const int mid = s->m_iMuxID;
2755 
2756    {
2757       ScopedLock cg(s->m_AcceptLock);
2758 
2759       // if it is a listener, close all un-accepted sockets in its queue
2760       // and remove them later
2761       for (set<SRTSOCKET>::iterator q = s->m_QueuedSockets.begin();
2762          q != s->m_QueuedSockets.end(); ++ q)
2763       {
2764          sockets_t::iterator si = m_Sockets.find(*q);
2765          if (si == m_Sockets.end())
2766          {
2767             // gone in the meantime
2768             LOGC(smlog.Error, log << "removeSocket: IPE? socket @" << (*q)
2769                     << " being queued for listener socket @" << s->m_SocketID
2770                     << " is GONE in the meantime ???");
2771             continue;
2772          }
2773 
2774          CUDTSocket* as = si->second;
2775 
2776          as->breakSocket_LOCKED();
2777          m_ClosedSockets[*q] = as;
2778          m_Sockets.erase(*q);
2779       }
2780    }
2781 
2782    // remove from peer rec
2783    map<int64_t, set<SRTSOCKET> >::iterator j = m_PeerRec.find(
2784       s->getPeerSpec());
2785    if (j != m_PeerRec.end())
2786    {
2787       j->second.erase(u);
2788       if (j->second.empty())
2789          m_PeerRec.erase(j);
2790    }
2791 
2792    /*
2793    * Socket may be deleted while still having ePoll events set that would
2794    * remains forever causing epoll_wait to unblock continuously for inexistent
2795    * sockets. Get rid of all events for this socket.
2796    */
2797    m_EPoll.update_events(u, s->core().m_sPollID,
2798       SRT_EPOLL_IN|SRT_EPOLL_OUT|SRT_EPOLL_ERR, false);
2799 
2800    // delete this one
2801    m_ClosedSockets.erase(i);
2802 
2803    HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u);
2804    s->core().closeInternal();
2805    HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u);
2806    delete s;
2807 
2808    if (mid == -1)
2809        return;
2810 
2811    map<int, CMultiplexer>::iterator m;
2812    m = m_mMultiplexer.find(mid);
2813    if (m == m_mMultiplexer.end())
2814    {
2815       LOGC(smlog.Fatal, log << "IPE: For socket @" << u << " MUXER id=" << mid << " NOT FOUND!");
2816       return;
2817    }
2818 
2819    CMultiplexer& mx = m->second;
2820 
2821    mx.m_iRefCount --;
2822    // HLOGF(smlog.Debug, "unrefing underlying socket for %u: %u\n",
2823    //    u, mx.m_iRefCount);
2824    if (0 == mx.m_iRefCount)
2825    {
2826        HLOGC(smlog.Debug, log << "MUXER id=" << mid << " lost last socket @"
2827            << u << " - deleting muxer bound to port "
2828            << mx.m_pChannel->bindAddressAny().hport());
2829       // The channel has no access to the queues and
2830       // it looks like the multiplexer is the master of all of them.
2831       // The queues must be silenced before closing the channel
2832       // because this will cause error to be returned in any operation
2833       // being currently done in the queues, if any.
2834       mx.m_pSndQueue->setClosing();
2835       mx.m_pRcvQueue->setClosing();
2836       mx.destroy();
2837       m_mMultiplexer.erase(m);
2838    }
2839 }
2840 
configureMuxer(CMultiplexer & w_m,const CUDTSocket * s,int af)2841 void srt::CUDTUnited::configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af)
2842 {
2843    w_m.m_mcfg = s->core().m_config;
2844    w_m.m_iIPversion = af;
2845    w_m.m_iRefCount = 1;
2846    w_m.m_iID = s->m_SocketID;
2847 }
2848 
installMuxer(CUDTSocket * w_s,CMultiplexer & fw_sm)2849 uint16_t srt::CUDTUnited::installMuxer(CUDTSocket* w_s, CMultiplexer& fw_sm)
2850 {
2851     w_s->core().m_pSndQueue = fw_sm.m_pSndQueue;
2852     w_s->core().m_pRcvQueue = fw_sm.m_pRcvQueue;
2853     w_s->m_iMuxID = fw_sm.m_iID;
2854     sockaddr_any sa;
2855     fw_sm.m_pChannel->getSockAddr((sa));
2856     w_s->m_SelfAddr = sa; // Will be also completed later, but here it's needed for later checks
2857     return sa.hport();
2858 }
2859 
channelSettingsMatch(const CMultiplexer & m,const CUDTSocket * s)2860 bool srt::CUDTUnited::channelSettingsMatch(const CMultiplexer& m, const CUDTSocket* s)
2861 {
2862     return m.m_mcfg.bReuseAddr && m.m_mcfg == s->core().m_config;
2863 }
2864 
updateMux(CUDTSocket * s,const sockaddr_any & addr,const UDPSOCKET * udpsock)2865 void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* udpsock /*[[nullable]]*/)
2866 {
2867     ScopedLock cg(m_GlobControlLock);
2868 
2869     // If udpsock is provided, then this socket will be simply
2870     // taken for binding as a good deal. It would be nice to make
2871     // a sanity check to see if this UDP socket isn't already installed
2872     // in some multiplexer, but we state this UDP socket isn't accessible
2873     // anyway so this wouldn't be possible.
2874     if (!udpsock)
2875     {
2876         // If not, we need to see if there exist already a multiplexer bound
2877         // to the same endpoint.
2878         const int port = addr.hport();
2879 
2880         bool reuse_attempt = false;
2881         for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin();
2882                 i != m_mMultiplexer.end(); ++ i)
2883         {
2884             CMultiplexer& m = i->second;
2885 
2886             // First, we need to find a multiplexer with the same port.
2887             if (m.m_iPort != port)
2888             {
2889                 HLOGC(smlog.Debug, log << "bind: muxer @" << m.m_iID << " found, but for port "
2890                         << m.m_iPort << " (requested port: " << port << ")");
2891                 continue;
2892             }
2893 
2894             // If this is bound to the wildcard address, it can be reused if:
2895             // - addr is also a wildcard
2896             // - channel settings match
2897             // Otherwise it's a conflict.
2898             sockaddr_any sa;
2899             m.m_pChannel->getSockAddr((sa));
2900 
2901             HLOGC(smlog.Debug, log << "bind: Found existing muxer @" << m.m_iID << " : " << sa.str()
2902                     << " - check against " << addr.str());
2903 
2904             if (sa.isany())
2905             {
2906                 if (!addr.isany())
2907                 {
2908                     LOGC(smlog.Error, log << "bind: Address: " << addr.str()
2909                             << " conflicts with existing wildcard binding: " << sa.str());
2910                     throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
2911                 }
2912 
2913                 // Still, for ANY you need either the same family, or open
2914                 // for families.
2915                 if (m.m_mcfg.iIpV6Only != -1 && m.m_mcfg.iIpV6Only != s->core().m_config.iIpV6Only)
2916                 {
2917                     LOGC(smlog.Error, log << "bind: Address: " << addr.str()
2918                             << " conflicts with existing IPv6 wildcard binding: " << sa.str());
2919                     throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
2920                 }
2921 
2922                 if ((m.m_mcfg.iIpV6Only == 0 || s->core().m_config.iIpV6Only == 0) && m.m_iIPversion != addr.family())
2923                 {
2924                     LOGC(smlog.Error, log << "bind: Address: " << addr.str()
2925                             << " conflicts with IPv6 wildcard binding: " << sa.str()
2926                             << " : family " << (m.m_iIPversion == AF_INET ? "IPv4" : "IPv6")
2927                             << " vs. " << (addr.family() == AF_INET ? "IPv4" : "IPv6"));
2928                     throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
2929                 }
2930                 reuse_attempt = true;
2931                 HLOGC(smlog.Debug, log << "bind: wildcard address - multiplexer reusable");
2932             }
2933             else if (addr.isany() && addr.family() == sa.family())
2934             {
2935                 LOGC(smlog.Error, log << "bind: Wildcard address: " << addr.str()
2936                         << " conflicts with existting IP binding: " << sa.str());
2937                 throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
2938             }
2939             // If this is bound to a certain address, AND:
2940             else if (sa.equal_address(addr))
2941             {
2942                 // - the address is the same as addr
2943                 reuse_attempt = true;
2944                 HLOGC(smlog.Debug, log << "bind: same IP address - multiplexer reusable");
2945             }
2946             else
2947             {
2948                 HLOGC(smlog.Debug, log << "bind: IP addresses differ - ALLOWED to create a new multiplexer");
2949             }
2950             // Otherwise:
2951             // - the address is different than addr
2952             //   - the address can't be reused, but this can go on with new one.
2953 
2954             // If this is a reusage attempt:
2955             if (reuse_attempt)
2956             {
2957                 //   - if the channel settings match, it can be reused
2958                 if (channelSettingsMatch(m, s))
2959                 {
2960                     HLOGC(smlog.Debug, log << "bind: reusing multiplexer for port " << port);
2961                     // reuse the existing multiplexer
2962                     ++ i->second.m_iRefCount;
2963                     installMuxer((s), (i->second));
2964                     return;
2965                 }
2966                 else
2967                 {
2968                     //   - if not, it's a conflict
2969                     LOGC(smlog.Error, log << "bind: Address: " << addr.str()
2970                             << " conflicts with binding: " << sa.str() << " due to channel settings");
2971                     throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
2972                 }
2973             }
2974             // If not, proceed to the next one, and when there are no reusage
2975             // candidates, proceed with creating a new multiplexer.
2976 
2977             // Note that a binding to a different IP address is not treated
2978             // as a candidate for either reuseage or conflict.
2979         }
2980     }
2981 
2982    // a new multiplexer is needed
2983    CMultiplexer m;
2984    configureMuxer((m), s, addr.family());
2985 
2986    try
2987    {
2988        m.m_pChannel = new CChannel();
2989        m.m_pChannel->setConfig(m.m_mcfg);
2990 
2991        if (udpsock)
2992        {
2993            // In this case, addr contains the address
2994            // that has been extracted already from the
2995            // given socket
2996            m.m_pChannel->attach(*udpsock, addr);
2997        }
2998        else if (addr.empty())
2999        {
3000            // The case of previously used case of a NULL address.
3001            // This here is used to pass family only, in this case
3002            // just automatically bind to the "0" address to autoselect
3003            // everything.
3004            m.m_pChannel->open(addr.family());
3005        }
3006        else
3007        {
3008            // If at least the IP address is specified, then bind to that
3009            // address, but still possibly autoselect the outgoing port, if the
3010            // port was specified as 0.
3011            m.m_pChannel->open(addr);
3012        }
3013 
3014        m.m_pTimer = new CTimer;
3015        m.m_pSndQueue = new CSndQueue;
3016        m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
3017        m.m_pRcvQueue = new CRcvQueue;
3018        m.m_pRcvQueue->init(
3019                32, s->core().maxPayloadSize(), m.m_iIPversion, 1024,
3020                m.m_pChannel, m.m_pTimer);
3021 
3022        // Rewrite the port here, as it might be only known upon return
3023        // from CChannel::open.
3024        m.m_iPort = installMuxer((s), m);
3025        m_mMultiplexer[m.m_iID] = m;
3026    }
3027    catch (const CUDTException&)
3028    {
3029        m.destroy();
3030        throw;
3031    }
3032    catch (...)
3033    {
3034        m.destroy();
3035        throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
3036    }
3037 
3038    HLOGC(smlog.Debug, log << "bind: creating new multiplexer for port " << m.m_iPort);
3039 }
3040 
3041 // This function is going to find a multiplexer for the port contained
3042 // in the 'ls' listening socket. The multiplexer must exist when the listener
3043 // exists, otherwise the dispatching procedure wouldn't even call this
3044 // function. By historical reasons there's also a fallback for a case when the
3045 // multiplexer wasn't found by id, the search by port number continues.
updateListenerMux(CUDTSocket * s,const CUDTSocket * ls)3046 bool srt::CUDTUnited::updateListenerMux(CUDTSocket* s, const CUDTSocket* ls)
3047 {
3048    ScopedLock cg(m_GlobControlLock);
3049    const int port = ls->m_SelfAddr.hport();
3050 
3051    HLOGC(smlog.Debug, log << "updateListenerMux: finding muxer of listener socket @"
3052            << ls->m_SocketID << " muxid=" << ls->m_iMuxID
3053            << " bound=" << ls->m_SelfAddr.str()
3054            << " FOR @" << s->m_SocketID << " addr="
3055            << s->m_SelfAddr.str() << "_->_" << s->m_PeerAddr.str());
3056 
3057    // First thing that should be certain here is that there should exist
3058    // a muxer with the ID written in the listener socket's mux ID.
3059 
3060    CMultiplexer* mux = map_getp(m_mMultiplexer, ls->m_iMuxID);
3061 
3062    // NOTE:
3063    // THIS BELOW CODE is only for a highly unlikely, and probably buggy,
3064    // situation when the Multiplexer wasn't found by ID recorded in the listener.
3065    CMultiplexer* fallback = NULL;
3066    if (!mux)
3067    {
3068        LOGC(smlog.Error, log << "updateListenerMux: IPE? listener muxer not found by ID, trying by port");
3069 
3070        // To be used as first found with different IP version
3071 
3072        // find the listener's address
3073        for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin();
3074                i != m_mMultiplexer.end(); ++ i)
3075        {
3076            CMultiplexer& m = i->second;
3077 
3078 #if ENABLE_HEAVY_LOGGING
3079            ostringstream that_muxer;
3080            that_muxer << "id=" << m.m_iID
3081                << " port=" << m.m_iPort
3082                << " ip=" << (m.m_iIPversion == AF_INET ? "v4" : "v6");
3083 #endif
3084 
3085            if (m.m_iPort == port)
3086            {
3087                HLOGC(smlog.Debug, log << "updateListenerMux: reusing muxer: " << that_muxer.str());
3088                if (m.m_iIPversion == s->m_PeerAddr.family())
3089                {
3090                    mux = &m; // best match
3091                    break;
3092                }
3093                else
3094                {
3095                    fallback = &m;
3096                }
3097            }
3098            else
3099            {
3100                HLOGC(smlog.Debug, log << "updateListenerMux: SKIPPING muxer: " << that_muxer.str());
3101            }
3102        }
3103 
3104        if (!mux && fallback)
3105        {
3106            // It is allowed to reuse this multiplexer, but the socket must allow both IPv4 and IPv6
3107            if (fallback->m_mcfg.iIpV6Only == 0)
3108            {
3109                HLOGC(smlog.Warn, log << "updateListenerMux: reusing multiplexer from different family");
3110                mux = fallback;
3111            }
3112        }
3113    }
3114 
3115    // Checking again because the above procedure could have set it
3116    if (mux)
3117    {
3118        // reuse the existing multiplexer
3119        ++ mux->m_iRefCount;
3120        s->core().m_pSndQueue = mux->m_pSndQueue;
3121        s->core().m_pRcvQueue = mux->m_pRcvQueue;
3122        s->m_iMuxID = mux->m_iID;
3123        return true;
3124    }
3125 
3126    return false;
3127 }
3128 
garbageCollect(void * p)3129 void* srt::CUDTUnited::garbageCollect(void* p)
3130 {
3131    CUDTUnited* self = (CUDTUnited*)p;
3132 
3133    THREAD_STATE_INIT("SRT:GC");
3134 
3135    UniqueLock gclock(self->m_GCStopLock);
3136 
3137    while (!self->m_bClosing)
3138    {
3139        INCREMENT_THREAD_ITERATIONS();
3140        self->checkBrokenSockets();
3141 
3142        HLOGC(inlog.Debug, log << "GC: sleep 1 s");
3143        self->m_GCStopCond.wait_for(gclock, seconds_from(1));
3144    }
3145 
3146    // remove all sockets and multiplexers
3147    HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all pending sockets. Acquring control lock...");
3148 
3149    {
3150        ScopedLock glock (self->m_GlobControlLock);
3151 
3152        for (sockets_t::iterator i = self->m_Sockets.begin();
3153                i != self->m_Sockets.end(); ++ i)
3154        {
3155            CUDTSocket* s = i->second;
3156            s->breakSocket_LOCKED();
3157 
3158 #if ENABLE_EXPERIMENTAL_BONDING
3159            if (s->m_GroupOf)
3160            {
3161                HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " (IPE?) - REMOVING FROM GROUP");
3162                s->removeFromGroup(false);
3163            }
3164 #endif
3165            self->m_ClosedSockets[i->first] = s;
3166 
3167            // remove from listener's queue
3168            sockets_t::iterator ls = self->m_Sockets.find(
3169                    s->m_ListenSocket);
3170            if (ls == self->m_Sockets.end())
3171            {
3172                ls = self->m_ClosedSockets.find(s->m_ListenSocket);
3173                if (ls == self->m_ClosedSockets.end())
3174                    continue;
3175            }
3176 
3177            enterCS(ls->second->m_AcceptLock);
3178            ls->second->m_QueuedSockets.erase(s->m_SocketID);
3179            leaveCS(ls->second->m_AcceptLock);
3180        }
3181        self->m_Sockets.clear();
3182 
3183        for (sockets_t::iterator j = self->m_ClosedSockets.begin();
3184                j != self->m_ClosedSockets.end(); ++ j)
3185        {
3186            j->second->m_tsClosureTimeStamp = steady_clock::time_point();
3187        }
3188    }
3189 
3190    HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all CLOSED sockets.");
3191    while (true)
3192    {
3193       self->checkBrokenSockets();
3194 
3195       enterCS(self->m_GlobControlLock);
3196       bool empty = self->m_ClosedSockets.empty();
3197       leaveCS(self->m_GlobControlLock);
3198 
3199       if (empty)
3200          break;
3201 
3202       srt::sync::this_thread::sleep_for(milliseconds_from(1));
3203    }
3204 
3205    THREAD_EXIT();
3206    return NULL;
3207 }
3208 
3209 ////////////////////////////////////////////////////////////////////////////////
3210 
startup()3211 int srt::CUDT::startup()
3212 {
3213    return s_UDTUnited.startup();
3214 }
3215 
cleanup()3216 int srt::CUDT::cleanup()
3217 {
3218    return s_UDTUnited.cleanup();
3219 }
3220 
socket()3221 SRTSOCKET srt::CUDT::socket()
3222 {
3223    if (!s_UDTUnited.m_bGCStatus)
3224       s_UDTUnited.startup();
3225 
3226    try
3227    {
3228       return s_UDTUnited.newSocket();
3229    }
3230    catch (const CUDTException& e)
3231    {
3232       SetThreadLocalError(e);
3233       return INVALID_SOCK;
3234    }
3235    catch (const bad_alloc&)
3236    {
3237       SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
3238       return INVALID_SOCK;
3239    }
3240    catch (const std::exception& ee)
3241    {
3242       LOGC(aclog.Fatal, log << "socket: UNEXPECTED EXCEPTION: "
3243          << typeid(ee).name()
3244          << ": " << ee.what());
3245       SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
3246       return INVALID_SOCK;
3247    }
3248 }
3249 
APIError(const CUDTException & e)3250 srt::CUDT::APIError::APIError(const CUDTException& e)
3251 {
3252     SetThreadLocalError(e);
3253 }
3254 
APIError(CodeMajor mj,CodeMinor mn,int syserr)3255 srt::CUDT::APIError::APIError(CodeMajor mj, CodeMinor mn, int syserr)
3256 {
3257     SetThreadLocalError(CUDTException(mj, mn, syserr));
3258 }
3259 
3260 #if ENABLE_EXPERIMENTAL_BONDING
3261 // This is an internal function; 'type' should be pre-checked if it has a correct value.
3262 // This doesn't have argument of GroupType due to header file conflicts.
3263 
3264 // [[using locked(s_UDTUnited.m_GlobControlLock)]]
newGroup(const int type)3265 srt::CUDTGroup& srt::CUDT::newGroup(const int type)
3266 {
3267     const SRTSOCKET id = s_UDTUnited.generateSocketID(true);
3268 
3269     // Now map the group
3270     return s_UDTUnited.addGroup(id, SRT_GROUP_TYPE(type)).set_id(id);
3271 }
3272 
createGroup(SRT_GROUP_TYPE gt)3273 SRTSOCKET srt::CUDT::createGroup(SRT_GROUP_TYPE gt)
3274 {
3275     // Doing the same lazy-startup as with srt_create_socket()
3276     if (!s_UDTUnited.m_bGCStatus)
3277         s_UDTUnited.startup();
3278 
3279     try
3280     {
3281         srt::sync::ScopedLock globlock (s_UDTUnited.m_GlobControlLock);
3282         return newGroup(gt).id();
3283         // Note: potentially, after this function exits, the group
3284         // could be deleted, immediately, from a separate thread (tho
3285         // unlikely because the other thread would need some handle to
3286         // keep it). But then, the first call to any API function would
3287         // return invalid ID error.
3288     }
3289     catch (const CUDTException& e)
3290     {
3291         return APIError(e);
3292     }
3293     catch (...)
3294     {
3295         return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3296     }
3297 
3298     return SRT_INVALID_SOCK;
3299 }
3300 
3301 
addSocketToGroup(SRTSOCKET socket,SRTSOCKET group)3302 int srt::CUDT::addSocketToGroup(SRTSOCKET socket, SRTSOCKET group)
3303 {
3304     // Check if socket and group have been set correctly.
3305     int32_t sid = socket & ~SRTGROUP_MASK;
3306     int32_t gm = group & SRTGROUP_MASK;
3307 
3308     if ( sid != socket || gm == 0 )
3309         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3310 
3311     // Find the socket and the group
3312     CUDTSocket* s = s_UDTUnited.locateSocket(socket);
3313     CUDTUnited::GroupKeeper k (s_UDTUnited, group, s_UDTUnited.ERH_RETURN);
3314 
3315     if (!s || !k.group)
3316         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3317 
3318     // Check if the socket is already IN SOME GROUP.
3319     if (s->m_GroupOf)
3320         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3321 
3322     CUDTGroup* g = k.group;
3323     if (g->managed())
3324     {
3325         // This can be changed as long as the group is empty.
3326         if (!g->groupEmpty())
3327         {
3328             return APIError(MJ_NOTSUP, MN_INVAL, 0);
3329         }
3330         g->set_managed(false);
3331     }
3332 
3333     ScopedLock cg (s->m_ControlLock);
3334     ScopedLock cglob (s_UDTUnited.m_GlobControlLock);
3335     if (g->closing())
3336         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3337 
3338     // Check if the socket already is in the group
3339     srt::groups::SocketData* f;
3340     if (g->contains(socket, (f)))
3341     {
3342         // XXX This is internal error. Report it, but continue
3343         LOGC(aclog.Error, log << "IPE (non-fatal): the socket is in the group, but has no clue about it!");
3344         s->m_GroupMemberData = f;
3345         s->m_GroupOf = g;
3346         return 0;
3347     }
3348     s->m_GroupMemberData = g->add(srt::groups::prepareSocketData(s));
3349     s->m_GroupOf = g;
3350 
3351     return 0;
3352 }
3353 
3354 // dead function as for now. This is only for non-managed
3355 // groups.
removeSocketFromGroup(SRTSOCKET socket)3356 int srt::CUDT::removeSocketFromGroup(SRTSOCKET socket)
3357 {
3358     CUDTSocket* s = s_UDTUnited.locateSocket(socket);
3359     if (!s)
3360         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3361 
3362     if (!s->m_GroupOf)
3363         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3364 
3365     ScopedLock cg (s->m_ControlLock);
3366     ScopedLock glob_grd (s_UDTUnited.m_GlobControlLock);
3367     s->removeFromGroup(false);
3368     return 0;
3369 }
3370 
3371 // [[using locked(m_ControlLock)]]
3372 // [[using locked(CUDT::s_UDTUnited.m_GlobControlLock)]]
removeFromGroup(bool broken)3373 void srt::CUDTSocket::removeFromGroup(bool broken)
3374 {
3375     CUDTGroup* g = m_GroupOf;
3376     if (g)
3377     {
3378         // Reset group-related fields immediately. They won't be accessed
3379         // in the below calls, while the iterator will be invalidated for
3380         // a short moment between removal from the group container and the end,
3381         // while the GroupLock would be already taken out. It is safer to reset
3382         // it to a NULL iterator before removal.
3383         m_GroupOf = NULL;
3384         m_GroupMemberData = NULL;
3385 
3386         bool still_have = g->remove(m_SocketID);
3387         if (broken)
3388         {
3389             // Activate the SRT_EPOLL_UPDATE event on the group
3390             // if it was because of a socket that was earlier connected
3391             // and became broken. This is not to be sent in case when
3392             // it is a failure during connection, or the socket was
3393             // explicitly removed from the group.
3394             g->activateUpdateEvent(still_have);
3395         }
3396 
3397         HLOGC(smlog.Debug, log << "removeFromGroup: socket @" << m_SocketID << " NO LONGER A MEMBER of $" << g->id() << "; group is "
3398                 << (still_have ? "still ACTIVE" : "now EMPTY"));
3399     }
3400 }
3401 
getGroupOfSocket(SRTSOCKET socket)3402 SRTSOCKET srt::CUDT::getGroupOfSocket(SRTSOCKET socket)
3403 {
3404     // Lock this for the whole function as we need the group
3405     // to persist the call.
3406     ScopedLock glock (s_UDTUnited.m_GlobControlLock);
3407     CUDTSocket* s = s_UDTUnited.locateSocket_LOCKED(socket);
3408     if (!s || !s->m_GroupOf)
3409         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3410 
3411     return s->m_GroupOf->id();
3412 }
3413 
configureGroup(SRTSOCKET groupid,const char * str)3414 int srt::CUDT::configureGroup(SRTSOCKET groupid, const char* str)
3415 {
3416     if ( (groupid & SRTGROUP_MASK) == 0)
3417     {
3418         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3419     }
3420 
3421     CUDTUnited::GroupKeeper k (s_UDTUnited, groupid, s_UDTUnited.ERH_RETURN);
3422     if (!k.group)
3423     {
3424         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3425     }
3426 
3427     return k.group->configure(str);
3428 }
3429 
getGroupData(SRTSOCKET groupid,SRT_SOCKGROUPDATA * pdata,size_t * psize)3430 int srt::CUDT::getGroupData(SRTSOCKET groupid, SRT_SOCKGROUPDATA* pdata, size_t* psize)
3431 {
3432     if ((groupid & SRTGROUP_MASK) == 0 || !psize)
3433     {
3434         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3435     }
3436 
3437     CUDTUnited::GroupKeeper k (s_UDTUnited, groupid, s_UDTUnited.ERH_RETURN);
3438     if (!k.group)
3439     {
3440         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3441     }
3442 
3443     // To get only the size of the group pdata=NULL can be used
3444     return k.group->getGroupData(pdata, psize);
3445 }
3446 #endif
3447 
bind(SRTSOCKET u,const sockaddr * name,int namelen)3448 int srt::CUDT::bind(SRTSOCKET u, const sockaddr* name, int namelen)
3449 {
3450    try
3451    {
3452        sockaddr_any sa (name, namelen);
3453        if (sa.len == 0)
3454        {
3455            // This happens if the namelen check proved it to be
3456            // too small for particular family, or that family is
3457            // not recognized (is none of AF_INET, AF_INET6).
3458            // This is a user error.
3459            return APIError(MJ_NOTSUP, MN_INVAL, 0);
3460        }
3461        CUDTSocket* s = s_UDTUnited.locateSocket(u);
3462        if (!s)
3463            return APIError(MJ_NOTSUP, MN_INVAL, 0);
3464 
3465        return s_UDTUnited.bind(s, sa);
3466    }
3467    catch (const CUDTException& e)
3468    {
3469        return APIError(e);
3470    }
3471    catch (bad_alloc&)
3472    {
3473        return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3474    }
3475    catch (const std::exception& ee)
3476    {
3477       LOGC(aclog.Fatal, log << "bind: UNEXPECTED EXCEPTION: "
3478          << typeid(ee).name()
3479          << ": " << ee.what());
3480       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3481    }
3482 }
3483 
bind(SRTSOCKET u,UDPSOCKET udpsock)3484 int srt::CUDT::bind(SRTSOCKET u, UDPSOCKET udpsock)
3485 {
3486     try
3487     {
3488         CUDTSocket* s = s_UDTUnited.locateSocket(u);
3489         if (!s)
3490             return APIError(MJ_NOTSUP, MN_INVAL, 0);
3491 
3492         return s_UDTUnited.bind(s, udpsock);
3493     }
3494     catch (const CUDTException& e)
3495     {
3496         return APIError(e);
3497     }
3498     catch (bad_alloc&)
3499     {
3500         return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3501     }
3502     catch (const std::exception& ee)
3503     {
3504         LOGC(aclog.Fatal, log << "bind/udp: UNEXPECTED EXCEPTION: "
3505                 << typeid(ee).name() << ": " << ee.what());
3506         return APIError(MJ_UNKNOWN, MN_NONE, 0);
3507     }
3508 }
3509 
listen(SRTSOCKET u,int backlog)3510 int srt::CUDT::listen(SRTSOCKET u, int backlog)
3511 {
3512    try
3513    {
3514       return s_UDTUnited.listen(u, backlog);
3515    }
3516    catch (const CUDTException& e)
3517    {
3518       return APIError(e);
3519    }
3520    catch (bad_alloc&)
3521    {
3522       return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3523    }
3524    catch (const std::exception& ee)
3525    {
3526       LOGC(aclog.Fatal, log << "listen: UNEXPECTED EXCEPTION: "
3527          << typeid(ee).name() << ": " << ee.what());
3528       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3529    }
3530 }
3531 
accept_bond(const SRTSOCKET listeners[],int lsize,int64_t msTimeOut)3532 SRTSOCKET srt::CUDT::accept_bond(const SRTSOCKET listeners [], int lsize, int64_t msTimeOut)
3533 {
3534    try
3535    {
3536       return s_UDTUnited.accept_bond(listeners, lsize, msTimeOut);
3537    }
3538    catch (const CUDTException& e)
3539    {
3540       SetThreadLocalError(e);
3541       return INVALID_SOCK;
3542    }
3543    catch (bad_alloc&)
3544    {
3545       SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
3546       return INVALID_SOCK;
3547    }
3548    catch (const std::exception& ee)
3549    {
3550       LOGC(aclog.Fatal, log << "accept_bond: UNEXPECTED EXCEPTION: "
3551          << typeid(ee).name() << ": " << ee.what());
3552       SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
3553       return INVALID_SOCK;
3554    }
3555 }
3556 
accept(SRTSOCKET u,sockaddr * addr,int * addrlen)3557 SRTSOCKET srt::CUDT::accept(SRTSOCKET u, sockaddr* addr, int* addrlen)
3558 {
3559    try
3560    {
3561       return s_UDTUnited.accept(u, addr, addrlen);
3562    }
3563    catch (const CUDTException& e)
3564    {
3565       SetThreadLocalError(e);
3566       return INVALID_SOCK;
3567    }
3568    catch (const bad_alloc&)
3569    {
3570       SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
3571       return INVALID_SOCK;
3572    }
3573    catch (const std::exception& ee)
3574    {
3575       LOGC(aclog.Fatal, log << "accept: UNEXPECTED EXCEPTION: "
3576          << typeid(ee).name() << ": " << ee.what());
3577       SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
3578       return INVALID_SOCK;
3579    }
3580 }
3581 
connect(SRTSOCKET u,const sockaddr * name,const sockaddr * tname,int namelen)3582 int srt::CUDT::connect(
3583     SRTSOCKET u, const sockaddr* name, const sockaddr* tname, int namelen)
3584 {
3585    try
3586    {
3587       return s_UDTUnited.connect(u, name, tname, namelen);
3588    }
3589    catch (const CUDTException& e)
3590    {
3591       return APIError(e);
3592    }
3593    catch (bad_alloc&)
3594    {
3595       return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3596    }
3597    catch (std::exception& ee)
3598    {
3599       LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: "
3600          << typeid(ee).name() << ": " << ee.what());
3601       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3602    }
3603 }
3604 
3605 #if ENABLE_EXPERIMENTAL_BONDING
connectLinks(SRTSOCKET grp,SRT_SOCKGROUPCONFIG targets[],int arraysize)3606 int srt::CUDT::connectLinks(SRTSOCKET grp,
3607         SRT_SOCKGROUPCONFIG targets [], int arraysize)
3608 {
3609     if (arraysize <= 0)
3610         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3611 
3612     if ( (grp & SRTGROUP_MASK) == 0)
3613     {
3614         // connectLinks accepts only GROUP id, not socket id.
3615         return APIError(MJ_NOTSUP, MN_SIDINVAL, 0);
3616     }
3617 
3618     try
3619     {
3620         CUDTUnited::GroupKeeper k(s_UDTUnited, grp, s_UDTUnited.ERH_THROW);
3621         return s_UDTUnited.groupConnect(k.group, targets, arraysize);
3622     }
3623     catch (CUDTException& e)
3624     {
3625         return APIError(e);
3626     }
3627     catch (bad_alloc&)
3628     {
3629         return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3630     }
3631     catch (std::exception& ee)
3632     {
3633         LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: "
3634                 << typeid(ee).name() << ": " << ee.what());
3635         return APIError(MJ_UNKNOWN, MN_NONE, 0);
3636     }
3637 }
3638 #endif
3639 
connect(SRTSOCKET u,const sockaddr * name,int namelen,int32_t forced_isn)3640 int srt::CUDT::connect(
3641    SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn)
3642 {
3643    try
3644    {
3645       return s_UDTUnited.connect(u, name, namelen, forced_isn);
3646    }
3647    catch (const CUDTException &e)
3648    {
3649       return APIError(e);
3650    }
3651    catch (bad_alloc&)
3652    {
3653       return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3654    }
3655    catch (const std::exception& ee)
3656    {
3657       LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: "
3658          << typeid(ee).name() << ": " << ee.what());
3659       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3660    }
3661 }
3662 
close(SRTSOCKET u)3663 int srt::CUDT::close(SRTSOCKET u)
3664 {
3665    try
3666    {
3667       return s_UDTUnited.close(u);
3668    }
3669    catch (const CUDTException& e)
3670    {
3671       return APIError(e);
3672    }
3673    catch (const std::exception& ee)
3674    {
3675       LOGC(aclog.Fatal, log << "close: UNEXPECTED EXCEPTION: "
3676          << typeid(ee).name() << ": " << ee.what());
3677       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3678    }
3679 }
3680 
getpeername(SRTSOCKET u,sockaddr * name,int * namelen)3681 int srt::CUDT::getpeername(SRTSOCKET u, sockaddr* name, int* namelen)
3682 {
3683    try
3684    {
3685       s_UDTUnited.getpeername(u, name, namelen);
3686       return 0;
3687    }
3688    catch (const CUDTException& e)
3689    {
3690       return APIError(e);
3691    }
3692    catch (const std::exception& ee)
3693    {
3694       LOGC(aclog.Fatal, log << "getpeername: UNEXPECTED EXCEPTION: "
3695          << typeid(ee).name() << ": " << ee.what());
3696       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3697    }
3698 }
3699 
getsockname(SRTSOCKET u,sockaddr * name,int * namelen)3700 int srt::CUDT::getsockname(SRTSOCKET u, sockaddr* name, int* namelen)
3701 {
3702    try
3703    {
3704       s_UDTUnited.getsockname(u, name, namelen);
3705       return 0;
3706    }
3707    catch (const CUDTException& e)
3708    {
3709       return APIError(e);
3710    }
3711    catch (const std::exception& ee)
3712    {
3713       LOGC(aclog.Fatal, log << "getsockname: UNEXPECTED EXCEPTION: "
3714          << typeid(ee).name() << ": " << ee.what());
3715       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3716    }
3717 }
3718 
getsockopt(SRTSOCKET u,int,SRT_SOCKOPT optname,void * pw_optval,int * pw_optlen)3719 int srt::CUDT::getsockopt(
3720    SRTSOCKET u, int, SRT_SOCKOPT optname, void* pw_optval, int* pw_optlen)
3721 {
3722     if (!pw_optval || !pw_optlen)
3723     {
3724         return APIError(MJ_NOTSUP, MN_INVAL, 0);
3725     }
3726 
3727     try
3728     {
3729 #if ENABLE_EXPERIMENTAL_BONDING
3730         if (u & SRTGROUP_MASK)
3731         {
3732             CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW);
3733             k.group->getOpt(optname, (pw_optval), (*pw_optlen));
3734             return 0;
3735         }
3736 #endif
3737 
3738         CUDT& udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->core();
3739         udt.getOpt(optname, (pw_optval), (*pw_optlen));
3740         return 0;
3741     }
3742     catch (const CUDTException& e)
3743     {
3744         return APIError(e);
3745     }
3746     catch (const std::exception& ee)
3747     {
3748         LOGC(aclog.Fatal, log << "getsockopt: UNEXPECTED EXCEPTION: "
3749                 << typeid(ee).name() << ": " << ee.what());
3750         return APIError(MJ_UNKNOWN, MN_NONE, 0);
3751     }
3752 }
3753 
setsockopt(SRTSOCKET u,int,SRT_SOCKOPT optname,const void * optval,int optlen)3754 int srt::CUDT::setsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, const void* optval, int optlen)
3755 {
3756    if (!optval)
3757        return APIError(MJ_NOTSUP, MN_INVAL, 0);
3758 
3759    try
3760    {
3761 #if ENABLE_EXPERIMENTAL_BONDING
3762        if (u & SRTGROUP_MASK)
3763        {
3764            CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW);
3765            k.group->setOpt(optname, optval, optlen);
3766            return 0;
3767        }
3768 #endif
3769 
3770        CUDT& udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->core();
3771        udt.setOpt(optname, optval, optlen);
3772        return 0;
3773    }
3774    catch (const CUDTException& e)
3775    {
3776        return APIError(e);
3777    }
3778    catch (const std::exception& ee)
3779    {
3780        LOGC(aclog.Fatal, log << "setsockopt: UNEXPECTED EXCEPTION: "
3781                << typeid(ee).name() << ": " << ee.what());
3782        return APIError(MJ_UNKNOWN, MN_NONE, 0);
3783    }
3784 }
3785 
send(SRTSOCKET u,const char * buf,int len,int)3786 int srt::CUDT::send(SRTSOCKET u, const char* buf, int len, int)
3787 {
3788     SRT_MSGCTRL mctrl = srt_msgctrl_default;
3789     return sendmsg2(u, buf, len, (mctrl));
3790 }
3791 
3792 // --> CUDT::recv moved down
3793 
sendmsg(SRTSOCKET u,const char * buf,int len,int ttl,bool inorder,int64_t srctime)3794 int srt::CUDT::sendmsg(
3795    SRTSOCKET u, const char* buf, int len, int ttl, bool inorder,
3796    int64_t srctime)
3797 {
3798     SRT_MSGCTRL mctrl = srt_msgctrl_default;
3799     mctrl.msgttl = ttl;
3800     mctrl.inorder = inorder;
3801     mctrl.srctime = srctime;
3802     return sendmsg2(u, buf, len, (mctrl));
3803 }
3804 
sendmsg2(SRTSOCKET u,const char * buf,int len,SRT_MSGCTRL & w_m)3805 int srt::CUDT::sendmsg2(
3806    SRTSOCKET u, const char* buf, int len, SRT_MSGCTRL& w_m)
3807 {
3808    try
3809    {
3810 #if ENABLE_EXPERIMENTAL_BONDING
3811        if (u & SRTGROUP_MASK)
3812        {
3813            CUDTUnited::GroupKeeper k (s_UDTUnited, u, s_UDTUnited.ERH_THROW);
3814            return k.group->send(buf, len, (w_m));
3815        }
3816 #endif
3817 
3818        return s_UDTUnited.locateSocket(u, CUDTUnited::ERH_THROW)->core().sendmsg2(buf, len, (w_m));
3819    }
3820    catch (const CUDTException& e)
3821    {
3822       return APIError(e);
3823    }
3824    catch (bad_alloc&)
3825    {
3826       return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3827    }
3828    catch (const std::exception& ee)
3829    {
3830       LOGC(aclog.Fatal, log << "sendmsg: UNEXPECTED EXCEPTION: "
3831          << typeid(ee).name() << ": " << ee.what());
3832       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3833    }
3834 }
3835 
recv(SRTSOCKET u,char * buf,int len,int)3836 int srt::CUDT::recv(SRTSOCKET u, char* buf, int len, int)
3837 {
3838     SRT_MSGCTRL mctrl = srt_msgctrl_default;
3839     int ret = recvmsg2(u, buf, len, (mctrl));
3840     return ret;
3841 }
3842 
recvmsg(SRTSOCKET u,char * buf,int len,int64_t & srctime)3843 int srt::CUDT::recvmsg(SRTSOCKET u, char* buf, int len, int64_t& srctime)
3844 {
3845     SRT_MSGCTRL mctrl = srt_msgctrl_default;
3846     int ret = recvmsg2(u, buf, len, (mctrl));
3847     srctime = mctrl.srctime;
3848     return ret;
3849 }
3850 
recvmsg2(SRTSOCKET u,char * buf,int len,SRT_MSGCTRL & w_m)3851 int srt::CUDT::recvmsg2(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL& w_m)
3852 {
3853    try
3854    {
3855 #if ENABLE_EXPERIMENTAL_BONDING
3856       if (u & SRTGROUP_MASK)
3857       {
3858           CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW);
3859           return k.group->recv(buf, len, (w_m));
3860       }
3861 #endif
3862 
3863       return s_UDTUnited.locateSocket(u, CUDTUnited::ERH_THROW)->core().recvmsg2(buf, len, (w_m));
3864    }
3865    catch (const CUDTException& e)
3866    {
3867       return APIError(e);
3868    }
3869    catch (const std::exception& ee)
3870    {
3871       LOGC(aclog.Fatal, log << "recvmsg: UNEXPECTED EXCEPTION: "
3872          << typeid(ee).name() << ": " << ee.what());
3873       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3874    }
3875 }
3876 
sendfile(SRTSOCKET u,fstream & ifs,int64_t & offset,int64_t size,int block)3877 int64_t srt::CUDT::sendfile(
3878    SRTSOCKET u, fstream& ifs, int64_t& offset, int64_t size, int block)
3879 {
3880    try
3881    {
3882       CUDT& udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->core();
3883       return udt.sendfile(ifs, offset, size, block);
3884    }
3885    catch (const CUDTException& e)
3886    {
3887       return APIError(e);
3888    }
3889    catch (bad_alloc&)
3890    {
3891       return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3892    }
3893    catch (const std::exception& ee)
3894    {
3895       LOGC(aclog.Fatal, log << "sendfile: UNEXPECTED EXCEPTION: "
3896          << typeid(ee).name() << ": " << ee.what());
3897       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3898    }
3899 }
3900 
recvfile(SRTSOCKET u,fstream & ofs,int64_t & offset,int64_t size,int block)3901 int64_t srt::CUDT::recvfile(
3902    SRTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block)
3903 {
3904    try
3905    {
3906        return s_UDTUnited.locateSocket(u, CUDTUnited::ERH_THROW)->core().recvfile(ofs, offset, size, block);
3907    }
3908    catch (const CUDTException& e)
3909    {
3910       return APIError(e);
3911    }
3912    catch (const std::exception& ee)
3913    {
3914       LOGC(aclog.Fatal, log << "recvfile: UNEXPECTED EXCEPTION: "
3915          << typeid(ee).name() << ": " << ee.what());
3916       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3917    }
3918 }
3919 
select(int,UDT::UDSET * readfds,UDT::UDSET * writefds,UDT::UDSET * exceptfds,const timeval * timeout)3920 int srt::CUDT::select(
3921    int,
3922    UDT::UDSET* readfds,
3923    UDT::UDSET* writefds,
3924    UDT::UDSET* exceptfds,
3925    const timeval* timeout)
3926 {
3927    if ((!readfds) && (!writefds) && (!exceptfds))
3928    {
3929       return APIError(MJ_NOTSUP, MN_INVAL, 0);
3930    }
3931 
3932    try
3933    {
3934       return s_UDTUnited.select(readfds, writefds, exceptfds, timeout);
3935    }
3936    catch (const CUDTException& e)
3937    {
3938       return APIError(e);
3939    }
3940    catch (bad_alloc&)
3941    {
3942       return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3943    }
3944    catch (const std::exception& ee)
3945    {
3946       LOGC(aclog.Fatal, log << "select: UNEXPECTED EXCEPTION: "
3947          << typeid(ee).name() << ": " << ee.what());
3948       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3949    }
3950 }
3951 
selectEx(const vector<SRTSOCKET> & fds,vector<SRTSOCKET> * readfds,vector<SRTSOCKET> * writefds,vector<SRTSOCKET> * exceptfds,int64_t msTimeOut)3952 int srt::CUDT::selectEx(
3953    const vector<SRTSOCKET>& fds,
3954    vector<SRTSOCKET>* readfds,
3955    vector<SRTSOCKET>* writefds,
3956    vector<SRTSOCKET>* exceptfds,
3957    int64_t msTimeOut)
3958 {
3959    if ((!readfds) && (!writefds) && (!exceptfds))
3960    {
3961       return APIError(MJ_NOTSUP, MN_INVAL, 0);
3962    }
3963 
3964    try
3965    {
3966       return s_UDTUnited.selectEx(fds, readfds, writefds, exceptfds, msTimeOut);
3967    }
3968    catch (const CUDTException& e)
3969    {
3970       return APIError(e);
3971    }
3972    catch (bad_alloc&)
3973    {
3974       return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
3975    }
3976    catch (const std::exception& ee)
3977    {
3978       LOGC(aclog.Fatal, log << "selectEx: UNEXPECTED EXCEPTION: "
3979          << typeid(ee).name() << ": " << ee.what());
3980       return APIError(MJ_UNKNOWN);
3981    }
3982 }
3983 
epoll_create()3984 int srt::CUDT::epoll_create()
3985 {
3986    try
3987    {
3988       return s_UDTUnited.epoll_create();
3989    }
3990    catch (const CUDTException& e)
3991    {
3992       return APIError(e);
3993    }
3994    catch (const std::exception& ee)
3995    {
3996       LOGC(aclog.Fatal, log << "epoll_create: UNEXPECTED EXCEPTION: "
3997          << typeid(ee).name() << ": " << ee.what());
3998       return APIError(MJ_UNKNOWN, MN_NONE, 0);
3999    }
4000 }
4001 
epoll_clear_usocks(int eid)4002 int srt::CUDT::epoll_clear_usocks(int eid)
4003 {
4004    try
4005    {
4006       return s_UDTUnited.epoll_clear_usocks(eid);
4007    }
4008    catch (const CUDTException& e)
4009    {
4010       return APIError(e);
4011    }
4012    catch (std::exception& ee)
4013    {
4014       LOGC(aclog.Fatal, log << "epoll_clear_usocks: UNEXPECTED EXCEPTION: "
4015          << typeid(ee).name() << ": " << ee.what());
4016       return APIError(MJ_UNKNOWN, MN_NONE, 0);
4017    }
4018 }
4019 
epoll_add_usock(const int eid,const SRTSOCKET u,const int * events)4020 int srt::CUDT::epoll_add_usock(const int eid, const SRTSOCKET u, const int* events)
4021 {
4022    try
4023    {
4024       return s_UDTUnited.epoll_add_usock(eid, u, events);
4025    }
4026    catch (const CUDTException& e)
4027    {
4028       return APIError(e);
4029    }
4030    catch (const std::exception& ee)
4031    {
4032       LOGC(aclog.Fatal, log << "epoll_add_usock: UNEXPECTED EXCEPTION: "
4033          << typeid(ee).name() << ": " << ee.what());
4034       return APIError(MJ_UNKNOWN, MN_NONE, 0);
4035    }
4036 }
4037 
epoll_add_ssock(const int eid,const SYSSOCKET s,const int * events)4038 int srt::CUDT::epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events)
4039 {
4040    try
4041    {
4042       return s_UDTUnited.epoll_add_ssock(eid, s, events);
4043    }
4044    catch (const CUDTException& e)
4045    {
4046       return APIError(e);
4047    }
4048    catch (const std::exception& ee)
4049    {
4050       LOGC(aclog.Fatal, log << "epoll_add_ssock: UNEXPECTED EXCEPTION: "
4051          << typeid(ee).name() << ": " << ee.what());
4052       return APIError(MJ_UNKNOWN, MN_NONE, 0);
4053    }
4054 }
4055 
epoll_update_usock(const int eid,const SRTSOCKET u,const int * events)4056 int srt::CUDT::epoll_update_usock(
4057    const int eid, const SRTSOCKET u, const int* events)
4058 {
4059    try
4060    {
4061       return s_UDTUnited.epoll_add_usock(eid, u, events);
4062    }
4063    catch (const CUDTException& e)
4064    {
4065       return APIError(e);
4066    }
4067    catch (const std::exception& ee)
4068    {
4069       LOGC(aclog.Fatal, log << "epoll_update_usock: UNEXPECTED EXCEPTION: "
4070          << typeid(ee).name() << ": " << ee.what());
4071       return APIError(MJ_UNKNOWN, MN_NONE, 0);
4072    }
4073 }
4074 
epoll_update_ssock(const int eid,const SYSSOCKET s,const int * events)4075 int srt::CUDT::epoll_update_ssock(
4076    const int eid, const SYSSOCKET s, const int* events)
4077 {
4078    try
4079    {
4080       return s_UDTUnited.epoll_update_ssock(eid, s, events);
4081    }
4082    catch (const CUDTException& e)
4083    {
4084       return APIError(e);
4085    }
4086    catch (const std::exception& ee)
4087    {
4088       LOGC(aclog.Fatal, log << "epoll_update_ssock: UNEXPECTED EXCEPTION: "
4089          << typeid(ee).name() << ": " << ee.what());
4090       return APIError(MJ_UNKNOWN, MN_NONE, 0);
4091    }
4092 }
4093 
4094 
epoll_remove_usock(const int eid,const SRTSOCKET u)4095 int srt::CUDT::epoll_remove_usock(const int eid, const SRTSOCKET u)
4096 {
4097    try
4098    {
4099       return s_UDTUnited.epoll_remove_usock(eid, u);
4100    }
4101    catch (const CUDTException& e)
4102    {
4103       return APIError(e);
4104    }
4105    catch (const std::exception& ee)
4106    {
4107       LOGC(aclog.Fatal, log << "epoll_remove_usock: UNEXPECTED EXCEPTION: "
4108          << typeid(ee).name() << ": " << ee.what());
4109       return APIError(MJ_UNKNOWN, MN_NONE, 0);
4110    }
4111 }
4112 
epoll_remove_ssock(const int eid,const SYSSOCKET s)4113 int srt::CUDT::epoll_remove_ssock(const int eid, const SYSSOCKET s)
4114 {
4115    try
4116    {
4117       return s_UDTUnited.epoll_remove_ssock(eid, s);
4118    }
4119    catch (const CUDTException& e)
4120    {
4121       return APIError(e);
4122    }
4123    catch (const std::exception& ee)
4124    {
4125       LOGC(aclog.Fatal, log << "epoll_remove_ssock: UNEXPECTED EXCEPTION: "
4126          << typeid(ee).name() << ": " << ee.what());
4127       return APIError(MJ_UNKNOWN, MN_NONE, 0);
4128    }
4129 }
4130 
epoll_wait(const int eid,set<SRTSOCKET> * readfds,set<SRTSOCKET> * writefds,int64_t msTimeOut,set<SYSSOCKET> * lrfds,set<SYSSOCKET> * lwfds)4131 int srt::CUDT::epoll_wait(
4132    const int eid,
4133    set<SRTSOCKET>* readfds,
4134    set<SRTSOCKET>* writefds,
4135    int64_t msTimeOut,
4136    set<SYSSOCKET>* lrfds,
4137    set<SYSSOCKET>* lwfds)
4138 {
4139    try
4140    {
4141       return s_UDTUnited.epoll_ref().wait(
4142               eid, readfds, writefds, msTimeOut, lrfds, lwfds);
4143    }
4144    catch (const CUDTException& e)
4145    {
4146       return APIError(e);
4147    }
4148    catch (const std::exception& ee)
4149    {
4150       LOGC(aclog.Fatal, log << "epoll_wait: UNEXPECTED EXCEPTION: "
4151          << typeid(ee).name() << ": " << ee.what());
4152       return APIError(MJ_UNKNOWN, MN_NONE, 0);
4153    }
4154 }
4155 
epoll_uwait(const int eid,SRT_EPOLL_EVENT * fdsSet,int fdsSize,int64_t msTimeOut)4156 int srt::CUDT::epoll_uwait(
4157    const int eid,
4158    SRT_EPOLL_EVENT* fdsSet,
4159    int fdsSize,
4160    int64_t msTimeOut)
4161 {
4162    try
4163    {
4164       return s_UDTUnited.epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
4165    }
4166    catch (const CUDTException& e)
4167    {
4168       return APIError(e);
4169    }
4170    catch (const std::exception& ee)
4171    {
4172       LOGC(aclog.Fatal, log << "epoll_uwait: UNEXPECTED EXCEPTION: "
4173          << typeid(ee).name() << ": " << ee.what());
4174       return APIError(MJ_UNKNOWN, MN_NONE, 0);
4175    }
4176 }
4177 
epoll_set(const int eid,int32_t flags)4178 int32_t srt::CUDT::epoll_set(
4179    const int eid,
4180    int32_t flags)
4181 {
4182    try
4183    {
4184       return s_UDTUnited.epoll_set(eid, flags);
4185    }
4186    catch (const CUDTException& e)
4187    {
4188       return APIError(e);
4189    }
4190    catch (const std::exception& ee)
4191    {
4192       LOGC(aclog.Fatal, log << "epoll_set: UNEXPECTED EXCEPTION: "
4193          << typeid(ee).name() << ": " << ee.what());
4194       return APIError(MJ_UNKNOWN, MN_NONE, 0);
4195    }
4196 }
4197 
epoll_release(const int eid)4198 int srt::CUDT::epoll_release(const int eid)
4199 {
4200    try
4201    {
4202       return s_UDTUnited.epoll_release(eid);
4203    }
4204    catch (const CUDTException& e)
4205    {
4206       return APIError(e);
4207    }
4208    catch (const std::exception& ee)
4209    {
4210       LOGC(aclog.Fatal, log << "epoll_release: UNEXPECTED EXCEPTION: "
4211          << typeid(ee).name() << ": " << ee.what());
4212       return APIError(MJ_UNKNOWN, MN_NONE, 0);
4213    }
4214 }
4215 
getlasterror()4216 CUDTException& srt::CUDT::getlasterror()
4217 {
4218    return GetThreadLocalError();
4219 }
4220 
bstats(SRTSOCKET u,CBytePerfMon * perf,bool clear,bool instantaneous)4221 int srt::CUDT::bstats(SRTSOCKET u, CBytePerfMon* perf, bool clear, bool instantaneous)
4222 {
4223 #if ENABLE_EXPERIMENTAL_BONDING
4224    if (u & SRTGROUP_MASK)
4225        return groupsockbstats(u, perf, clear);
4226 #endif
4227 
4228    try
4229    {
4230       CUDT& udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->core();
4231       udt.bstats(perf, clear, instantaneous);
4232       return 0;
4233    }
4234    catch (const CUDTException& e)
4235    {
4236       return APIError(e);
4237    }
4238    catch (const std::exception& ee)
4239    {
4240       LOGC(aclog.Fatal, log << "bstats: UNEXPECTED EXCEPTION: "
4241          << typeid(ee).name() << ": " << ee.what());
4242       return APIError(MJ_UNKNOWN, MN_NONE, 0);
4243    }
4244 }
4245 
4246 #if ENABLE_EXPERIMENTAL_BONDING
groupsockbstats(SRTSOCKET u,CBytePerfMon * perf,bool clear)4247 int srt::CUDT::groupsockbstats(SRTSOCKET u, CBytePerfMon* perf, bool clear)
4248 {
4249    try
4250    {
4251       CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW);
4252       k.group->bstatsSocket(perf, clear);
4253       return 0;
4254    }
4255    catch (const CUDTException& e)
4256    {
4257       SetThreadLocalError(e);
4258       return ERROR;
4259    }
4260    catch (const std::exception& ee)
4261    {
4262       LOGC(aclog.Fatal, log << "bstats: UNEXPECTED EXCEPTION: "
4263          << typeid(ee).name() << ": " << ee.what());
4264       SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
4265       return ERROR;
4266    }
4267 }
4268 #endif
4269 
getUDTHandle(SRTSOCKET u)4270 srt::CUDT* srt::CUDT::getUDTHandle(SRTSOCKET u)
4271 {
4272    try
4273    {
4274       return &s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->core();
4275    }
4276    catch (const CUDTException& e)
4277    {
4278       SetThreadLocalError(e);
4279       return NULL;
4280    }
4281    catch (const std::exception& ee)
4282    {
4283       LOGC(aclog.Fatal, log << "getUDTHandle: UNEXPECTED EXCEPTION: "
4284          << typeid(ee).name() << ": " << ee.what());
4285       SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
4286       return NULL;
4287    }
4288 }
4289 
existingSockets()4290 vector<SRTSOCKET> srt::CUDT::existingSockets()
4291 {
4292     vector<SRTSOCKET> out;
4293     for (CUDTUnited::sockets_t::iterator i = s_UDTUnited.m_Sockets.begin();
4294             i != s_UDTUnited.m_Sockets.end(); ++i)
4295     {
4296         out.push_back(i->first);
4297     }
4298     return out;
4299 }
4300 
getsockstate(SRTSOCKET u)4301 SRT_SOCKSTATUS srt::CUDT::getsockstate(SRTSOCKET u)
4302 {
4303    try
4304    {
4305 #if ENABLE_EXPERIMENTAL_BONDING
4306       if (isgroup(u))
4307       {
4308           CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW);
4309           return k.group->getStatus();
4310       }
4311 #endif
4312       return s_UDTUnited.getStatus(u);
4313    }
4314    catch (const CUDTException& e)
4315    {
4316       SetThreadLocalError(e);
4317       return SRTS_NONEXIST;
4318    }
4319    catch (const std::exception& ee)
4320    {
4321       LOGC(aclog.Fatal, log << "getsockstate: UNEXPECTED EXCEPTION: "
4322          << typeid(ee).name() << ": " << ee.what());
4323       SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
4324       return SRTS_NONEXIST;
4325    }
4326 }
4327 
4328 ////////////////////////////////////////////////////////////////////////////////
4329 
4330 namespace UDT
4331 {
4332 
startup()4333 int startup()
4334 {
4335    return srt::CUDT::startup();
4336 }
4337 
cleanup()4338 int cleanup()
4339 {
4340    return srt::CUDT::cleanup();
4341 }
4342 
bind(SRTSOCKET u,const struct sockaddr * name,int namelen)4343 int bind(SRTSOCKET u, const struct sockaddr* name, int namelen)
4344 {
4345    return srt::CUDT::bind(u, name, namelen);
4346 }
4347 
bind2(SRTSOCKET u,UDPSOCKET udpsock)4348 int bind2(SRTSOCKET u, UDPSOCKET udpsock)
4349 {
4350    return srt::CUDT::bind(u, udpsock);
4351 }
4352 
listen(SRTSOCKET u,int backlog)4353 int listen(SRTSOCKET u, int backlog)
4354 {
4355    return srt::CUDT::listen(u, backlog);
4356 }
4357 
accept(SRTSOCKET u,struct sockaddr * addr,int * addrlen)4358 SRTSOCKET accept(SRTSOCKET u, struct sockaddr* addr, int* addrlen)
4359 {
4360    return srt::CUDT::accept(u, addr, addrlen);
4361 }
4362 
connect(SRTSOCKET u,const struct sockaddr * name,int namelen)4363 int connect(SRTSOCKET u, const struct sockaddr* name, int namelen)
4364 {
4365    return srt::CUDT::connect(u, name, namelen, SRT_SEQNO_NONE);
4366 }
4367 
close(SRTSOCKET u)4368 int close(SRTSOCKET u)
4369 {
4370    return srt::CUDT::close(u);
4371 }
4372 
getpeername(SRTSOCKET u,struct sockaddr * name,int * namelen)4373 int getpeername(SRTSOCKET u, struct sockaddr* name, int* namelen)
4374 {
4375    return srt::CUDT::getpeername(u, name, namelen);
4376 }
4377 
getsockname(SRTSOCKET u,struct sockaddr * name,int * namelen)4378 int getsockname(SRTSOCKET u, struct sockaddr* name, int* namelen)
4379 {
4380    return srt::CUDT::getsockname(u, name, namelen);
4381 }
4382 
getsockopt(SRTSOCKET u,int level,SRT_SOCKOPT optname,void * optval,int * optlen)4383 int getsockopt(
4384    SRTSOCKET u, int level, SRT_SOCKOPT optname, void* optval, int* optlen)
4385 {
4386    return srt::CUDT::getsockopt(u, level, optname, optval, optlen);
4387 }
4388 
setsockopt(SRTSOCKET u,int level,SRT_SOCKOPT optname,const void * optval,int optlen)4389 int setsockopt(
4390    SRTSOCKET u, int level, SRT_SOCKOPT optname, const void* optval, int optlen)
4391 {
4392    return srt::CUDT::setsockopt(u, level, optname, optval, optlen);
4393 }
4394 
4395 // DEVELOPER API
4396 
connect_debug(SRTSOCKET u,const struct sockaddr * name,int namelen,int32_t forced_isn)4397 int connect_debug(
4398    SRTSOCKET u, const struct sockaddr* name, int namelen, int32_t forced_isn)
4399 {
4400    return srt::CUDT::connect(u, name, namelen, forced_isn);
4401 }
4402 
send(SRTSOCKET u,const char * buf,int len,int flags)4403 int send(SRTSOCKET u, const char* buf, int len, int flags)
4404 {
4405    return srt::CUDT::send(u, buf, len, flags);
4406 }
4407 
recv(SRTSOCKET u,char * buf,int len,int flags)4408 int recv(SRTSOCKET u, char* buf, int len, int flags)
4409 {
4410    return srt::CUDT::recv(u, buf, len, flags);
4411 }
4412 
4413 
sendmsg(SRTSOCKET u,const char * buf,int len,int ttl,bool inorder,int64_t srctime)4414 int sendmsg(
4415    SRTSOCKET u, const char* buf, int len, int ttl, bool inorder,
4416    int64_t srctime)
4417 {
4418    return srt::CUDT::sendmsg(u, buf, len, ttl, inorder, srctime);
4419 }
4420 
recvmsg(SRTSOCKET u,char * buf,int len,int64_t & srctime)4421 int recvmsg(SRTSOCKET u, char* buf, int len, int64_t& srctime)
4422 {
4423    return srt::CUDT::recvmsg(u, buf, len, srctime);
4424 }
4425 
recvmsg(SRTSOCKET u,char * buf,int len)4426 int recvmsg(SRTSOCKET u, char* buf, int len)
4427 {
4428    int64_t srctime;
4429    return srt::CUDT::recvmsg(u, buf, len, srctime);
4430 }
4431 
sendfile(SRTSOCKET u,fstream & ifs,int64_t & offset,int64_t size,int block)4432 int64_t sendfile(
4433    SRTSOCKET u,
4434    fstream& ifs,
4435    int64_t& offset,
4436    int64_t size,
4437    int block)
4438 {
4439    return srt::CUDT::sendfile(u, ifs, offset, size, block);
4440 }
4441 
recvfile(SRTSOCKET u,fstream & ofs,int64_t & offset,int64_t size,int block)4442 int64_t recvfile(
4443    SRTSOCKET u,
4444    fstream& ofs,
4445    int64_t& offset,
4446    int64_t size,
4447    int block)
4448 {
4449    return srt::CUDT::recvfile(u, ofs, offset, size, block);
4450 }
4451 
sendfile2(SRTSOCKET u,const char * path,int64_t * offset,int64_t size,int block)4452 int64_t sendfile2(
4453    SRTSOCKET u,
4454    const char* path,
4455    int64_t* offset,
4456    int64_t size,
4457    int block)
4458 {
4459    fstream ifs(path, ios::binary | ios::in);
4460    int64_t ret = srt::CUDT::sendfile(u, ifs, *offset, size, block);
4461    ifs.close();
4462    return ret;
4463 }
4464 
recvfile2(SRTSOCKET u,const char * path,int64_t * offset,int64_t size,int block)4465 int64_t recvfile2(
4466    SRTSOCKET u,
4467    const char* path,
4468    int64_t* offset,
4469    int64_t size,
4470    int block)
4471 {
4472    fstream ofs(path, ios::binary | ios::out);
4473    int64_t ret = srt::CUDT::recvfile(u, ofs, *offset, size, block);
4474    ofs.close();
4475    return ret;
4476 }
4477 
select(int nfds,UDSET * readfds,UDSET * writefds,UDSET * exceptfds,const struct timeval * timeout)4478 int select(
4479    int nfds,
4480    UDSET* readfds,
4481    UDSET* writefds,
4482    UDSET* exceptfds,
4483    const struct timeval* timeout)
4484 {
4485    return srt::CUDT::select(nfds, readfds, writefds, exceptfds, timeout);
4486 }
4487 
selectEx(const vector<SRTSOCKET> & fds,vector<SRTSOCKET> * readfds,vector<SRTSOCKET> * writefds,vector<SRTSOCKET> * exceptfds,int64_t msTimeOut)4488 int selectEx(
4489    const vector<SRTSOCKET>& fds,
4490    vector<SRTSOCKET>* readfds,
4491    vector<SRTSOCKET>* writefds,
4492    vector<SRTSOCKET>* exceptfds,
4493    int64_t msTimeOut)
4494 {
4495    return srt::CUDT::selectEx(fds, readfds, writefds, exceptfds, msTimeOut);
4496 }
4497 
epoll_create()4498 int epoll_create()
4499 {
4500    return srt::CUDT::epoll_create();
4501 }
4502 
epoll_clear_usocks(int eid)4503 int epoll_clear_usocks(int eid)
4504 {
4505     return srt::CUDT::epoll_clear_usocks(eid);
4506 }
4507 
epoll_add_usock(int eid,SRTSOCKET u,const int * events)4508 int epoll_add_usock(int eid, SRTSOCKET u, const int* events)
4509 {
4510    return srt::CUDT::epoll_add_usock(eid, u, events);
4511 }
4512 
epoll_add_ssock(int eid,SYSSOCKET s,const int * events)4513 int epoll_add_ssock(int eid, SYSSOCKET s, const int* events)
4514 {
4515    return srt::CUDT::epoll_add_ssock(eid, s, events);
4516 }
4517 
epoll_update_usock(int eid,SRTSOCKET u,const int * events)4518 int epoll_update_usock(int eid, SRTSOCKET u, const int* events)
4519 {
4520    return srt::CUDT::epoll_update_usock(eid, u, events);
4521 }
4522 
epoll_update_ssock(int eid,SYSSOCKET s,const int * events)4523 int epoll_update_ssock(int eid, SYSSOCKET s, const int* events)
4524 {
4525    return srt::CUDT::epoll_update_ssock(eid, s, events);
4526 }
4527 
epoll_remove_usock(int eid,SRTSOCKET u)4528 int epoll_remove_usock(int eid, SRTSOCKET u)
4529 {
4530    return srt::CUDT::epoll_remove_usock(eid, u);
4531 }
4532 
epoll_remove_ssock(int eid,SYSSOCKET s)4533 int epoll_remove_ssock(int eid, SYSSOCKET s)
4534 {
4535    return srt::CUDT::epoll_remove_ssock(eid, s);
4536 }
4537 
epoll_wait(int eid,set<SRTSOCKET> * readfds,set<SRTSOCKET> * writefds,int64_t msTimeOut,set<SYSSOCKET> * lrfds,set<SYSSOCKET> * lwfds)4538 int epoll_wait(
4539    int eid,
4540    set<SRTSOCKET>* readfds,
4541    set<SRTSOCKET>* writefds,
4542    int64_t msTimeOut,
4543    set<SYSSOCKET>* lrfds,
4544    set<SYSSOCKET>* lwfds)
4545 {
4546    return srt::CUDT::epoll_wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
4547 }
4548 
4549 template <class SOCKTYPE>
set_result(set<SOCKTYPE> * val,int * num,SOCKTYPE * fds)4550 inline void set_result(set<SOCKTYPE>* val, int* num, SOCKTYPE* fds)
4551 {
4552     if ( !val || !num || !fds )
4553         return;
4554 
4555     if (*num > int(val->size()))
4556         *num = int(val->size()); // will get 0 if val->empty()
4557     int count = 0;
4558 
4559     // This loop will run 0 times if val->empty()
4560     for (typename set<SOCKTYPE>::const_iterator it = val->begin(); it != val->end(); ++ it)
4561     {
4562         if (count >= *num)
4563             break;
4564         fds[count ++] = *it;
4565     }
4566 }
4567 
epoll_wait2(int eid,SRTSOCKET * readfds,int * rnum,SRTSOCKET * writefds,int * wnum,int64_t msTimeOut,SYSSOCKET * lrfds,int * lrnum,SYSSOCKET * lwfds,int * lwnum)4568 int epoll_wait2(
4569    int eid, SRTSOCKET* readfds,
4570    int* rnum, SRTSOCKET* writefds,
4571    int* wnum,
4572    int64_t msTimeOut,
4573    SYSSOCKET* lrfds,
4574    int* lrnum,
4575    SYSSOCKET* lwfds,
4576    int* lwnum)
4577 {
4578    // This API is an alternative format for epoll_wait, created for
4579    // compatability with other languages. Users need to pass in an array
4580    // for holding the returned sockets, with the maximum array length
4581    // stored in *rnum, etc., which will be updated with returned number
4582    // of sockets.
4583 
4584    set<SRTSOCKET> readset;
4585    set<SRTSOCKET> writeset;
4586    set<SYSSOCKET> lrset;
4587    set<SYSSOCKET> lwset;
4588    set<SRTSOCKET>* rval = NULL;
4589    set<SRTSOCKET>* wval = NULL;
4590    set<SYSSOCKET>* lrval = NULL;
4591    set<SYSSOCKET>* lwval = NULL;
4592    if ((readfds != NULL) && (rnum != NULL))
4593       rval = &readset;
4594    if ((writefds != NULL) && (wnum != NULL))
4595       wval = &writeset;
4596    if ((lrfds != NULL) && (lrnum != NULL))
4597       lrval = &lrset;
4598    if ((lwfds != NULL) && (lwnum != NULL))
4599       lwval = &lwset;
4600 
4601    int ret = srt::CUDT::epoll_wait(eid, rval, wval, msTimeOut, lrval, lwval);
4602    if (ret > 0)
4603    {
4604       //set<SRTSOCKET>::const_iterator i;
4605       //SET_RESULT(rval, rnum, readfds, i);
4606       set_result(rval, rnum, readfds);
4607       //SET_RESULT(wval, wnum, writefds, i);
4608       set_result(wval, wnum, writefds);
4609 
4610       //set<SYSSOCKET>::const_iterator j;
4611       //SET_RESULT(lrval, lrnum, lrfds, j);
4612       set_result(lrval, lrnum, lrfds);
4613       //SET_RESULT(lwval, lwnum, lwfds, j);
4614       set_result(lwval, lwnum, lwfds);
4615    }
4616    return ret;
4617 }
4618 
epoll_uwait(int eid,SRT_EPOLL_EVENT * fdsSet,int fdsSize,int64_t msTimeOut)4619 int epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
4620 {
4621    return srt::CUDT::epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
4622 }
4623 
epoll_release(int eid)4624 int epoll_release(int eid)
4625 {
4626    return srt::CUDT::epoll_release(eid);
4627 }
4628 
getlasterror()4629 ERRORINFO& getlasterror()
4630 {
4631    return srt::CUDT::getlasterror();
4632 }
4633 
getlasterror_code()4634 int getlasterror_code()
4635 {
4636    return srt::CUDT::getlasterror().getErrorCode();
4637 }
4638 
getlasterror_desc()4639 const char* getlasterror_desc()
4640 {
4641    return srt::CUDT::getlasterror().getErrorMessage();
4642 }
4643 
getlasterror_errno()4644 int getlasterror_errno()
4645 {
4646    return srt::CUDT::getlasterror().getErrno();
4647 }
4648 
4649 // Get error string of a given error code
geterror_desc(int code,int err)4650 const char* geterror_desc(int code, int err)
4651 {
4652    CUDTException e (CodeMajor(code/1000), CodeMinor(code%1000), err);
4653    return(e.getErrorMessage());
4654 }
4655 
bstats(SRTSOCKET u,SRT_TRACEBSTATS * perf,bool clear)4656 int bstats(SRTSOCKET u, SRT_TRACEBSTATS* perf, bool clear)
4657 {
4658    return srt::CUDT::bstats(u, perf, clear);
4659 }
4660 
getsockstate(SRTSOCKET u)4661 SRT_SOCKSTATUS getsockstate(SRTSOCKET u)
4662 {
4663    return srt::CUDT::getsockstate(u);
4664 }
4665 
4666 } // namespace UDT
4667 
4668 namespace srt
4669 {
4670 
setloglevel(LogLevel::type ll)4671 void setloglevel(LogLevel::type ll)
4672 {
4673     ScopedLock gg(srt_logger_config.mutex);
4674     srt_logger_config.max_level = ll;
4675 }
4676 
addlogfa(LogFA fa)4677 void addlogfa(LogFA fa)
4678 {
4679     ScopedLock gg(srt_logger_config.mutex);
4680     srt_logger_config.enabled_fa.set(fa, true);
4681 }
4682 
dellogfa(LogFA fa)4683 void dellogfa(LogFA fa)
4684 {
4685     ScopedLock gg(srt_logger_config.mutex);
4686     srt_logger_config.enabled_fa.set(fa, false);
4687 }
4688 
resetlogfa(set<LogFA> fas)4689 void resetlogfa(set<LogFA> fas)
4690 {
4691     ScopedLock gg(srt_logger_config.mutex);
4692     for (int i = 0; i <= SRT_LOGFA_LASTNONE; ++i)
4693         srt_logger_config.enabled_fa.set(i, fas.count(i));
4694 }
4695 
resetlogfa(const int * fara,size_t fara_size)4696 void resetlogfa(const int* fara, size_t fara_size)
4697 {
4698     ScopedLock gg(srt_logger_config.mutex);
4699     srt_logger_config.enabled_fa.reset();
4700     for (const int* i = fara; i != fara + fara_size; ++i)
4701         srt_logger_config.enabled_fa.set(*i, true);
4702 }
4703 
setlogstream(std::ostream & stream)4704 void setlogstream(std::ostream& stream)
4705 {
4706     ScopedLock gg(srt_logger_config.mutex);
4707     srt_logger_config.log_stream = &stream;
4708 }
4709 
setloghandler(void * opaque,SRT_LOG_HANDLER_FN * handler)4710 void setloghandler(void* opaque, SRT_LOG_HANDLER_FN* handler)
4711 {
4712     ScopedLock gg(srt_logger_config.mutex);
4713     srt_logger_config.loghandler_opaque = opaque;
4714     srt_logger_config.loghandler_fn = handler;
4715 }
4716 
setlogflags(int flags)4717 void setlogflags(int flags)
4718 {
4719     ScopedLock gg(srt_logger_config.mutex);
4720     srt_logger_config.flags = flags;
4721 }
4722 
setstreamid(SRTSOCKET u,const std::string & sid)4723 SRT_API bool setstreamid(SRTSOCKET u, const std::string& sid)
4724 {
4725     return CUDT::setstreamid(u, sid);
4726 }
getstreamid(SRTSOCKET u)4727 SRT_API std::string getstreamid(SRTSOCKET u)
4728 {
4729     return CUDT::getstreamid(u);
4730 }
4731 
getrejectreason(SRTSOCKET u)4732 int getrejectreason(SRTSOCKET u)
4733 {
4734     return CUDT::rejectReason(u);
4735 }
4736 
setrejectreason(SRTSOCKET u,int value)4737 int setrejectreason(SRTSOCKET u, int value)
4738 {
4739     return CUDT::rejectReason(u, value);
4740 }
4741 
4742 }  // namespace srt
4743