1 /*
2  * SRT - Secure, Reliable, Transport
3  * Copyright (c) 2018 Haivision Systems Inc.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  *
9  */
10 
11 /*****************************************************************************
12 Copyright (c) 2001 - 2010, The Board of Trustees of the University of Illinois.
13 All rights reserved.
14 
15 Redistribution and use in source and binary forms, with or without
16 modification, are permitted provided that the following conditions are
17 met:
18 
19 * Redistributions of source code must retain the above
20   copyright notice, this list of conditions and the
21   following disclaimer.
22 
23 * Redistributions in binary form must reproduce the
24   above copyright notice, this list of conditions
25   and the following disclaimer in the documentation
26   and/or other materials provided with the distribution.
27 
28 * Neither the name of the University of Illinois
29   nor the names of its contributors may be used to
30   endorse or promote products derived from this
31   software without specific prior written permission.
32 
33 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
34 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
35 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
36 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
37 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
38 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
39 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
40 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
41 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
42 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
43 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
44 *****************************************************************************/
45 
46 /*****************************************************************************
47 written by
48    Yunhong Gu, last updated 09/28/2010
49 modified by
50    Haivision Systems Inc.
51 *****************************************************************************/
52 
53 #ifndef INC_SRT_API_H
54 #define INC_SRT_API_H
55 
56 
57 #include <map>
58 #include <vector>
59 #include <string>
60 #include "netinet_any.h"
61 #include "udt.h"
62 #include "packet.h"
63 #include "queue.h"
64 #include "cache.h"
65 #include "epoll.h"
66 #include "handshake.h"
67 #include "core.h"
68 #if ENABLE_EXPERIMENTAL_BONDING
69 #include "group.h"
70 #endif
71 
72 // Please refer to structure and locking information provided in the
73 // docs/dev/low-level-info.md document.
74 
75 namespace srt {
76 
77 class CUDT;
78 
79 /// @brief Class CUDTSocket is a control layer on top of the CUDT core functionality layer.
80 /// CUDTSocket owns CUDT.
81 class CUDTSocket
82 {
83 public:
CUDTSocket()84    CUDTSocket()
85        : m_Status(SRTS_INIT)
86        , m_SocketID(0)
87        , m_ListenSocket(0)
88        , m_PeerID(0)
89 #if ENABLE_EXPERIMENTAL_BONDING
90        , m_GroupMemberData()
91        , m_GroupOf()
92 #endif
93        , m_iISN(0)
94        , m_UDT(this)
95        , m_AcceptCond()
96        , m_AcceptLock()
97        , m_uiBackLog(0)
98        , m_iMuxID(-1)
99    {
100        construct();
101    }
102 
CUDTSocket(const CUDTSocket & ancestor)103    CUDTSocket(const CUDTSocket& ancestor)
104        : m_Status(SRTS_INIT)
105        , m_SocketID(0)
106        , m_ListenSocket(0)
107        , m_PeerID(0)
108 #if ENABLE_EXPERIMENTAL_BONDING
109        , m_GroupMemberData()
110        , m_GroupOf()
111 #endif
112        , m_iISN(0)
113        , m_UDT(this, ancestor.m_UDT)
114        , m_AcceptCond()
115        , m_AcceptLock()
116        , m_uiBackLog(0)
117        , m_iMuxID(-1)
118    {
119        construct();
120    }
121 
122    ~CUDTSocket();
123 
124    void construct();
125 
126    srt::sync::atomic<SRT_SOCKSTATUS> m_Status;                  //< current socket state
127 
128    /// Time when the socket is closed.
129    /// When the socket is closed, it is not removed immediately from the list
130    /// of sockets in order to prevent other methods from accessing invalid address.
131    /// A timer is started and the socket will be removed after approximately
132    /// 1 second (see CUDTUnited::checkBrokenSockets()).
133    sync::steady_clock::time_point m_tsClosureTimeStamp;
134 
135    sockaddr_any m_SelfAddr;                  //< local address of the socket
136    sockaddr_any m_PeerAddr;                  //< peer address of the socket
137 
138    SRTSOCKET m_SocketID;                     //< socket ID
139    SRTSOCKET m_ListenSocket;                 //< ID of the listener socket; 0 means this is an independent socket
140 
141    SRTSOCKET m_PeerID;                       //< peer socket ID
142 #if ENABLE_EXPERIMENTAL_BONDING
143    groups::SocketData* m_GroupMemberData; //< Pointer to group member data, or NULL if not a group member
144    CUDTGroup* m_GroupOf;                       //< Group this socket is a member of, or NULL if it isn't
145 #endif
146 
147    int32_t m_iISN;                           //< initial sequence number, used to tell different connection from same IP:port
148 
149 private:
150    CUDT m_UDT;                               //< internal SRT socket logic
151 
152 public:
153    std::set<SRTSOCKET> m_QueuedSockets;      //< set of connections waiting for accept()
154 
155    sync::Condition m_AcceptCond;             //< used to block "accept" call
156    sync::Mutex m_AcceptLock;                 //< mutex associated to m_AcceptCond
157 
158    unsigned int m_uiBackLog;                 //< maximum number of connections in queue
159 
160    // XXX A refactoring might be needed here.
161 
162    // There are no reasons found why the socket can't contain a list iterator to a
163    // multiplexer INSTEAD of m_iMuxID. There's no danger in this solution because
164    // the multiplexer is never deleted until there's at least one socket using it.
165    //
166    // The multiplexer may even physically be contained in the CUDTUnited object,
167    // just track the multiple users of it (the listener and the accepted sockets).
168    // When deleting, you simply "unsubscribe" yourself from the multiplexer, which
169    // will unref it and remove the list element by the iterator kept by the
170    // socket.
171    int m_iMuxID;                        //< multiplexer ID
172 
173    sync::Mutex m_ControlLock;           //< lock this socket exclusively for control APIs: bind/listen/connect
174 
core()175    CUDT& core() { return m_UDT; }
core()176    const CUDT& core() const { return m_UDT; }
177 
getPeerSpec(SRTSOCKET id,int32_t isn)178    static int64_t getPeerSpec(SRTSOCKET id, int32_t isn)
179    {
180        return (int64_t(id) << 30) + isn;
181    }
getPeerSpec()182    int64_t getPeerSpec()
183    {
184        return getPeerSpec(m_PeerID, m_iISN);
185    }
186 
187    SRT_SOCKSTATUS getStatus();
188 
189    /// This function shall be called always wherever
190    /// you'd like to call cudtsocket->m_pUDT->close(),
191    /// from within the GC thread only (that is, only when
192    /// the socket should be no longer visible in the
193    /// connection, including for sending remaining data).
194    void breakSocket_LOCKED();
195 
196 
197    /// This makes the socket no longer capable of performing any transmission
198    /// operation, but continues to be responsive in the connection in order
199    /// to finish sending the data that were scheduled for sending so far.
200    void setClosed();
201 
202    /// This does the same as setClosed, plus sets the m_bBroken to true.
203    /// Such a socket can still be read from so that remaining data from
204    /// the receiver buffer can be read, but no longer sends anything.
205    void setBrokenClosed();
206    void removeFromGroup(bool broken);
207 
208    // Instrumentally used by select() and also required for non-blocking
209    // mode check in groups
210    bool readReady();
211    bool writeReady() const;
212    bool broken() const;
213 
214 private:
215    CUDTSocket& operator=(const CUDTSocket&);
216 };
217 
218 ////////////////////////////////////////////////////////////////////////////////
219 
220 class CUDTUnited
221 {
222 friend class CUDT;
223 friend class CUDTGroup;
224 friend class CRendezvousQueue;
225 
226 public:
227    CUDTUnited();
228    ~CUDTUnited();
229 
230    // Public constants
231    static const int32_t MAX_SOCKET_VAL = SRTGROUP_MASK - 1;    // maximum value for a regular socket
232 
233 public:
234 
235    enum ErrorHandling { ERH_RETURN, ERH_THROW, ERH_ABORT };
236    static std::string CONID(SRTSOCKET sock);
237 
238       /// initialize the UDT library.
239       /// @return 0 if success, otherwise -1 is returned.
240 
241    int startup();
242 
243       /// release the UDT library.
244       /// @return 0 if success, otherwise -1 is returned.
245 
246    int cleanup();
247 
248       /// Create a new UDT socket.
249       /// @param [out] pps Variable (optional) to which the new socket will be written, if succeeded
250       /// @return The new UDT socket ID, or INVALID_SOCK.
251 
252    SRTSOCKET newSocket(CUDTSocket** pps = NULL);
253 
254       /// Create a new UDT connection.
255       /// @param [in] listen the listening UDT socket;
256       /// @param [in] peer peer address.
257       /// @param [in,out] hs handshake information from peer side (in), negotiated value (out);
258       /// @param [out] w_error error code when failed
259       /// @param [out] w_acpu entity of accepted socket, if connection already exists
260       /// @return If the new connection is successfully created: 1 success, 0 already exist, -1 error.
261 
262    int newConnection(const SRTSOCKET listen, const sockaddr_any& peer, const CPacket& hspkt,
263            CHandShake& w_hs, int& w_error, CUDT*& w_acpu);
264 
265    int installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq);
266    int installConnectHook(const SRTSOCKET lsn, srt_connect_callback_fn* hook, void* opaq);
267 
268       /// Check the status of the UDT socket.
269       /// @param [in] u the UDT socket ID.
270       /// @return UDT socket status, or NONEXIST if not found.
271 
272    SRT_SOCKSTATUS getStatus(const SRTSOCKET u);
273 
274       // socket APIs
275 
276    int bind(CUDTSocket* u, const sockaddr_any& name);
277    int bind(CUDTSocket* u, UDPSOCKET udpsock);
278    int listen(const SRTSOCKET u, int backlog);
279    SRTSOCKET accept(const SRTSOCKET listen, sockaddr* addr, int* addrlen);
280    SRTSOCKET accept_bond(const SRTSOCKET listeners [], int lsize, int64_t msTimeOut);
281    int connect(SRTSOCKET u, const sockaddr* srcname, const sockaddr* tarname, int tarlen);
282    int connect(const SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn);
283    int connectIn(CUDTSocket* s, const sockaddr_any& target, int32_t forced_isn);
284 #if ENABLE_EXPERIMENTAL_BONDING
285    int groupConnect(CUDTGroup* g, SRT_SOCKGROUPCONFIG targets [], int arraysize);
286    int singleMemberConnect(CUDTGroup* g, SRT_SOCKGROUPCONFIG* target);
287 #endif
288    int close(const SRTSOCKET u);
289    int close(CUDTSocket* s);
290    void getpeername(const SRTSOCKET u, sockaddr* name, int* namelen);
291    void getsockname(const SRTSOCKET u, sockaddr* name, int* namelen);
292    int select(UDT::UDSET* readfds, UDT::UDSET* writefds, UDT::UDSET* exceptfds, const timeval* timeout);
293    int selectEx(const std::vector<SRTSOCKET>& fds, std::vector<SRTSOCKET>* readfds, std::vector<SRTSOCKET>* writefds, std::vector<SRTSOCKET>* exceptfds, int64_t msTimeOut);
294    int epoll_create();
295    int epoll_clear_usocks(int eid);
296    int epoll_add_usock(const int eid, const SRTSOCKET u, const int* events = NULL);
297    int epoll_add_usock_INTERNAL(const int eid, CUDTSocket* s, const int* events);
298    int epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events = NULL);
299    int epoll_remove_usock(const int eid, const SRTSOCKET u);
300    template <class EntityType>
301    int epoll_remove_entity(const int eid, EntityType* ent);
302    int epoll_remove_socket_INTERNAL(const int eid, CUDTSocket* ent);
303 #if ENABLE_EXPERIMENTAL_BONDING
304    int epoll_remove_group_INTERNAL(const int eid, CUDTGroup* ent);
305 #endif
306    int epoll_remove_ssock(const int eid, const SYSSOCKET s);
307    int epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events = NULL);
308    int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut);
309    int32_t epoll_set(const int eid, int32_t flags);
310    int epoll_release(const int eid);
311 
312 #if ENABLE_EXPERIMENTAL_BONDING
313    // [[using locked(m_GlobControlLock)]]
addGroup(SRTSOCKET id,SRT_GROUP_TYPE type)314    CUDTGroup& addGroup(SRTSOCKET id, SRT_GROUP_TYPE type)
315    {
316        // This only ensures that the element exists.
317        // If the element was newly added, it will be NULL.
318        CUDTGroup*& g = m_Groups[id];
319        if (!g)
320        {
321            // This is a reference to the cell, so it will
322            // rewrite it into the map.
323            g = new CUDTGroup(type);
324        }
325 
326        // Now we are sure that g is not NULL,
327        // and persistence of this object is in the map.
328        // The reference to the object can be safely returned here.
329        return *g;
330    }
331 
332    void deleteGroup(CUDTGroup* g);
333    void deleteGroup_LOCKED(CUDTGroup* g);
334 
335    // [[using locked(m_GlobControlLock)]]
findPeerGroup_LOCKED(SRTSOCKET peergroup)336    CUDTGroup* findPeerGroup_LOCKED(SRTSOCKET peergroup)
337    {
338        for (groups_t::iterator i = m_Groups.begin();
339                i != m_Groups.end(); ++i)
340        {
341            if (i->second->peerid() == peergroup)
342                return i->second;
343        }
344        return NULL;
345    }
346 #endif
347 
epoll_ref()348    CEPoll& epoll_ref() { return m_EPoll; }
349 
350 private:
351    /// Generates a new socket ID. This function starts from a randomly
352    /// generated value (at initialization time) and goes backward with
353    /// with next calls. The possible values come from the range without
354    /// the SRTGROUP_MASK bit, and the group bit is set when the ID is
355    /// generated for groups. It is also internally checked if the
356    /// newly generated ID isn't already used by an existing socket or group.
357    ///
358    /// Socket ID value range.
359    /// - [0]: reserved for handshake procedure. If the destination Socket ID is 0
360    ///   (destination Socket ID unknown) the packet will be sent to the listening socket
361    ///   or to a socket that is in the rendezvous connection phase.
362    /// - [1; 2 ^ 30): single socket ID range.
363    /// - (2 ^ 30; 2 ^ 31): group socket ID range. Effectively any positive number
364    ///   from [1; 2 ^ 30) with bit 30 set to 1. Bit 31 is zero.
365    /// The most significant bit 31 (sign bit) is left unused so that checking for a value <= 0 identifies an invalid socket ID.
366    ///
367    /// @param group The socket id should be for socket group.
368    /// @return The new socket ID.
369    /// @throw CUDTException if after rolling over all possible ID values nothing can be returned
370    SRTSOCKET generateSocketID(bool group = false);
371 
372 private:
373    typedef std::map<SRTSOCKET, CUDTSocket*> sockets_t;       // stores all the socket structures
374    sockets_t m_Sockets;
375 
376 #if ENABLE_EXPERIMENTAL_BONDING
377    typedef std::map<SRTSOCKET, CUDTGroup*> groups_t;
378    groups_t m_Groups;
379 #endif
380 
381    sync::Mutex m_GlobControlLock;               // used to synchronize UDT API
382 
383    sync::Mutex m_IDLock;                        // used to synchronize ID generation
384 
385    SRTSOCKET m_SocketIDGenerator;               // seed to generate a new unique socket ID
386    SRTSOCKET m_SocketIDGenerator_init;          // Keeps track of the very first one
387 
388    std::map<int64_t, std::set<SRTSOCKET> > m_PeerRec;// record sockets from peers to avoid repeated connection request, int64_t = (socker_id << 30) + isn
389 
390 private:
391    friend struct FLookupSocketWithEvent_LOCKED;
392 
393    CUDTSocket* locateSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);
394    // This function does the same as locateSocket, except that:
395    // - lock on m_GlobControlLock is expected (so that you don't unlock between finding and using)
396    // - only return NULL if not found
397    CUDTSocket* locateSocket_LOCKED(SRTSOCKET u);
398    CUDTSocket* locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn);
399 
400 #if ENABLE_EXPERIMENTAL_BONDING
401    CUDTGroup* locateAcquireGroup(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);
402    CUDTGroup* acquireSocketsGroup(CUDTSocket* s);
403 
404    struct GroupKeeper
405    {
406        CUDTGroup* group;
407 
408        // This is intended for API functions to lock the group's existence
409        // for the lifetime of their call.
GroupKeeperGroupKeeper410        GroupKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh)
411        {
412            group = glob.locateAcquireGroup(id, erh);
413        }
414 
415        // This is intended for TSBPD thread that should lock the group's
416        // existence until it exits.
GroupKeeperGroupKeeper417        GroupKeeper(CUDTUnited& glob, CUDTSocket* s)
418        {
419            group = glob.acquireSocketsGroup(s);
420        }
421 
~GroupKeeperGroupKeeper422        ~GroupKeeper()
423        {
424            if (group)
425            {
426                // We have a guarantee that if `group` was set
427                // as non-NULL here, it is also acquired and will not
428                // be deleted until this busy flag is set back to false.
429                sync::ScopedLock cgroup (*group->exp_groupLock());
430                group->apiRelease();
431                // Only now that the group lock is lifted, can the
432                // group be now deleted and this pointer potentially dangling
433            }
434        }
435    };
436 
437 #endif
438    void updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* = NULL);
439    bool updateListenerMux(CUDTSocket* s, const CUDTSocket* ls);
440 
441    // Utility functions for updateMux
442    void configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af);
443    uint16_t installMuxer(CUDTSocket* w_s, CMultiplexer& sm);
444    bool channelSettingsMatch(const CMultiplexer& m, const CUDTSocket* s);
445 
446 private:
447    std::map<int, CMultiplexer> m_mMultiplexer;		// UDP multiplexer
448    sync::Mutex            m_MultiplexerLock;
449 
450 private:
451    CCache<CInfoBlock>* m_pCache;			// UDT network information cache
452 
453 private:
454    srt::sync::atomic<bool> m_bClosing;
455    sync::Mutex m_GCStopLock;
456    sync::Condition m_GCStopCond;
457 
458    sync::Mutex m_InitLock;
459    int m_iInstanceCount;				// number of startup() called by application
460    bool m_bGCStatus;					// if the GC thread is working (true)
461 
462    sync::CThread m_GCThread;
463    static void* garbageCollect(void*);
464 
465    sockets_t m_ClosedSockets;   // temporarily store closed sockets
466 #if ENABLE_EXPERIMENTAL_BONDING
467    groups_t m_ClosedGroups;
468 #endif
469 
470    void checkBrokenSockets();
471    void removeSocket(const SRTSOCKET u);
472 
473    CEPoll m_EPoll;                                     // handling epoll data structures and events
474 
475 private:
476    CUDTUnited(const CUDTUnited&);
477    CUDTUnited& operator=(const CUDTUnited&);
478 };
479 
480 } // namespace srt
481 
482 #endif
483