1 /*
2  * SRT - Secure, Reliable, Transport
3  * Copyright (c) 2020 Haivision Systems Inc.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  *
9  */
10 
11 /*****************************************************************************
12 Written by
13    Haivision Systems Inc.
14 *****************************************************************************/
15 
16 #ifndef INC_SRT_GROUP_H
17 #define INC_SRT_GROUP_H
18 
19 #include "srt.h"
20 #include "common.h"
21 #include "packet.h"
22 #include "group_common.h"
23 #include "group_backup.h"
24 
25 namespace srt
26 {
27 
28 #if ENABLE_HEAVY_LOGGING
29 const char* const srt_log_grp_state[] = {"PENDING", "IDLE", "RUNNING", "BROKEN"};
30 #endif
31 
32 
33 class CUDTGroup
34 {
35     friend class CUDTUnited;
36 
37     typedef sync::steady_clock::time_point time_point;
38     typedef sync::steady_clock::duration   duration;
39     typedef sync::steady_clock             steady_clock;
40     typedef groups::SocketData SocketData;
41     typedef groups::SendBackupCtx SendBackupCtx;
42     typedef groups::BackupMemberState BackupMemberState;
43 
44 public:
45     typedef SRT_MEMBERSTATUS GroupState;
46 
47     // Note that the use of states may differ in particular group types:
48     //
49     // Broadcast: links that are freshly connected become PENDING and then IDLE only
50     // for a short moment to be activated immediately at the nearest sending operation.
51     //
52     // Balancing: like with broadcast, just that the link activation gets its shared percentage
53     // of traffic balancing
54     //
55     // Multicast: The link is never idle. The data are always sent over the UDP multicast link
56     // and the receiver simply gets subscribed and reads packets once it's ready.
57     //
58     // Backup: The link stays idle until it's activated, and the activation can only happen
59     // at the moment when the currently active link is "suspected of being likely broken"
60     // (the current active link fails to receive ACK in a time when two ACKs should already
61     // be received). After a while when the current active link is confirmed broken, it turns
62     // into broken state.
63 
64     static const char* StateStr(GroupState);
65 
66     static int32_t s_tokenGen;
genToken()67     static int32_t genToken() { ++s_tokenGen; if (s_tokenGen < 0) s_tokenGen = 0; return s_tokenGen;}
68 
69     struct ConfigItem
70     {
71         SRT_SOCKOPT                so;
72         std::vector<unsigned char> value;
73 
74         template <class T>
getConfigItem75         bool get(T& refr)
76         {
77             if (sizeof(T) > value.size())
78                 return false;
79             refr = *(T*)&value[0];
80             return true;
81         }
82 
ConfigItemConfigItem83         ConfigItem(SRT_SOCKOPT o, const void* val, int size)
84             : so(o)
85         {
86             value.resize(size);
87             unsigned char* begin = (unsigned char*)val;
88             std::copy(begin, begin + size, value.begin());
89         }
90 
91         struct OfType
92         {
93             SRT_SOCKOPT so;
OfTypeConfigItem::OfType94             OfType(SRT_SOCKOPT soso)
95                 : so(soso)
96             {
97             }
operatorConfigItem::OfType98             bool operator()(ConfigItem& ci) { return ci.so == so; }
99         };
100     };
101 
102     typedef std::list<SocketData> group_t;
103     typedef group_t::iterator     gli_t;
104     typedef std::vector< std::pair<SRTSOCKET, srt::CUDTSocket*> > sendable_t;
105 
106     struct Sendstate
107     {
108         SRTSOCKET id;
109         SocketData* mb;
110         int   stat;
111         int   code;
112     };
113 
114     CUDTGroup(SRT_GROUP_TYPE);
115     ~CUDTGroup();
116 
117     SocketData* add(SocketData data);
118 
119     struct HaveID
120     {
121         SRTSOCKET id;
HaveIDHaveID122         HaveID(SRTSOCKET sid)
123             : id(sid)
124         {
125         }
operatorHaveID126         bool operator()(const SocketData& s) { return s.id == id; }
127     };
128 
contains(SRTSOCKET id,SocketData * & w_f)129     bool contains(SRTSOCKET id, SocketData*& w_f)
130     {
131         srt::sync::ScopedLock g(m_GroupLock);
132         gli_t f = std::find_if(m_Group.begin(), m_Group.end(), HaveID(id));
133         if (f == m_Group.end())
134         {
135             w_f = NULL;
136             return false;
137         }
138         w_f = &*f;
139         return true;
140     }
141 
142     // NEED LOCKING
begin()143     gli_t begin() { return m_Group.begin(); }
end()144     gli_t end() { return m_Group.end(); }
145 
146     /// Remove the socket from the group container.
147     /// REMEMBER: the group spec should be taken from the socket
148     /// (set m_GroupOf and m_GroupMemberData to NULL
149     /// PRIOR TO calling this function.
150     /// @param id Socket ID to look for in the container to remove
151     /// @return true if the container still contains any sockets after the operation
remove(SRTSOCKET id)152     bool remove(SRTSOCKET id)
153     {
154         using srt_logging::gmlog;
155         srt::sync::ScopedLock g(m_GroupLock);
156 
157         bool empty = false;
158         HLOGC(gmlog.Debug, log << "group/remove: going to remove @" << id << " from $" << m_GroupID);
159 
160         gli_t f = std::find_if(m_Group.begin(), m_Group.end(), HaveID(id));
161         if (f != m_Group.end())
162         {
163             m_Group.erase(f);
164 
165             // Reset sequence numbers on a dead group so that they are
166             // initialized anew with the new alive connection within
167             // the group.
168             // XXX The problem is that this should be done after the
169             // socket is considered DISCONNECTED, not when it's being
170             // closed. After being disconnected, the sequence numbers
171             // are no longer valid, and will be reinitialized when the
172             // socket is connected again. This may stay as is for now
173             // as in SRT it's not predicted to do anything with the socket
174             // that was disconnected other than immediately closing it.
175             if (m_Group.empty())
176             {
177                 // When the group is empty, there's no danger that this
178                 // number will collide with any ISN provided by a socket.
179                 // Also since now every socket will derive this ISN.
180                 m_iLastSchedSeqNo = generateISN();
181                 resetInitialRxSequence();
182                 empty = true;
183             }
184         }
185         else
186         {
187             HLOGC(gmlog.Debug, log << "group/remove: IPE: id @" << id << " NOT FOUND");
188             empty = true; // not exactly true, but this is to cause error on group in the APP
189         }
190 
191         if (m_Group.empty())
192         {
193             m_bOpened    = false;
194             m_bConnected = false;
195         }
196 
197         // XXX BUGFIX
198         m_Positions.erase(id);
199 
200         return !empty;
201     }
202 
groupEmpty()203     bool groupEmpty()
204     {
205         srt::sync::ScopedLock g(m_GroupLock);
206         return m_Group.empty();
207     }
208 
209     void setGroupConnected();
210 
211     int            send(const char* buf, int len, SRT_MSGCTRL& w_mc);
212     int            sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc);
213     int            sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc);
214     static int32_t generateISN();
215 
216 private:
217     // For Backup, sending all previous packet
218     int sendBackupRexmit(srt::CUDT& core, SRT_MSGCTRL& w_mc);
219 
220     // Support functions for sendBackup and sendBroadcast
221     /// Check if group member is idle.
222     /// @param d group member
223     /// @param[in,out] w_wipeme array of sockets to remove from group
224     /// @param[in,out] w_pendingLinks array of sockets pending for connection
225     /// @returns true if d is idle (standby), false otherwise
226     bool send_CheckIdle(const gli_t d, std::vector<SRTSOCKET>& w_wipeme, std::vector<SRTSOCKET>& w_pendingLinks);
227 
228 
229     /// This function checks if the member has just become idle (check if sender buffer is empty) to send a KEEPALIVE immidiatelly.
230     /// @todo Check it is some abandoned logic.
231     void sendBackup_CheckIdleTime(gli_t w_d);
232 
233     /// Qualify states of member links.
234     /// [[using locked(this->m_GroupLock, m_pGlobal->m_GlobControlLock)]]
235     /// @param[out] w_sendBackupCtx  the context will be updated with state qualifications
236     /// @param[in] currtime          current timestamp
237     void sendBackup_QualifyMemberStates(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime);
238 
239     void sendBackup_AssignBackupState(srt::CUDT& socket, BackupMemberState state, const steady_clock::time_point& currtime);
240 
241     /// Qualify the state of the active link: fresh, stable, unstable, wary.
242     /// @retval active backup member state: fresh, stable, unstable, wary.
243     BackupMemberState sendBackup_QualifyActiveState(const gli_t d, const time_point currtime);
244 
245     BackupMemberState sendBackup_QualifyIfStandBy(const gli_t d);
246 
247     /// Sends the same payload over all active members.
248     /// @param[in] buf payload
249     /// @param[in] len payload length in bytes
250     /// @param[in,out] w_mc message control
251     /// @param[in] currtime current time
252     /// @param[in] currseq current packet sequence number
253     /// @param[out] w_nsuccessful number of members with successfull sending.
254     /// @param[in,out] maxActiveWeight
255     /// @param[in,out] sendBackupCtx context
256     /// @param[in,out] w_cx error
257     /// @return group send result: -1 if sending over all members has failed; number of bytes sent overwise.
258     int sendBackup_SendOverActive(const char* buf, int len, SRT_MSGCTRL& w_mc, const steady_clock::time_point& currtime, int32_t& w_curseq,
259         size_t& w_nsuccessful, uint16_t& w_maxActiveWeight, SendBackupCtx& w_sendBackupCtx, CUDTException& w_cx);
260 
261     /// Check link sending status
262     /// @param[in]  currtime       Current time (logging only)
263     /// @param[in]  send_status    Result of sending over the socket
264     /// @param[in]  lastseq        Last sent sequence number before the current sending operation
265     /// @param[in]  pktseq         Packet sequence number currently tried to be sent
266     /// @param[out] w_u            CUDT unit of the current member (to allow calling overrideSndSeqNo)
267     /// @param[out] w_curseq       Group's current sequence number (either -1 or the value used already for other links)
268     /// @param[out] w_final_stat   w_final_stat = send_status if sending succeded.
269     ///
270     /// @returns true if the sending operation result (submitted in stat) is a success, false otherwise.
271     bool sendBackup_CheckSendStatus(const time_point&   currtime,
272                                     const int           send_status,
273                                     const int32_t       lastseq,
274                                     const int32_t       pktseq,
275                                     CUDT&               w_u,
276                                     int32_t&            w_curseq,
277                                     int&                w_final_stat);
278     void sendBackup_Buffering(const char* buf, const int len, int32_t& curseq, SRT_MSGCTRL& w_mc);
279 
280     size_t sendBackup_TryActivateStandbyIfNeeded(
281         const char* buf,
282         const int   len,
283         bool& w_none_succeeded,
284         SRT_MSGCTRL& w_mc,
285         int32_t& w_curseq,
286         int32_t& w_final_stat,
287         SendBackupCtx& w_sendBackupCtx,
288         CUDTException& w_cx,
289         const steady_clock::time_point& currtime);
290 
291     /// Check if pending sockets are to be qualified as broken.
292     /// This qualification later results in removing the socket from a group and closing it.
293     /// @param[in,out]  a context with a list of member sockets, some pending might qualified broken
294     void sendBackup_CheckPendingSockets(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime);
295 
296     /// Check if unstable sockets are to be qualified as broken.
297     /// The main reason for such qualification is if a socket is unstable for too long.
298     /// This qualification later results in removing the socket from a group and closing it.
299     /// @param[in,out]  a context with a list of member sockets, some pending might qualified broken
300     void sendBackup_CheckUnstableSockets(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime);
301 
302     /// @brief Marks broken sockets as closed. Used in broadcast sending.
303     /// @param w_wipeme a list of sockets to close
304     void send_CloseBrokenSockets(std::vector<SRTSOCKET>& w_wipeme);
305 
306     /// @brief Marks broken sockets as closed. Used in backup sending.
307     /// @param w_sendBackupCtx the context with a list of broken sockets
308     void sendBackup_CloseBrokenSockets(SendBackupCtx& w_sendBackupCtx);
309 
310     void sendBackup_RetryWaitBlocked(SendBackupCtx& w_sendBackupCtx,
311                                      int&                      w_final_stat,
312                                      bool&                     w_none_succeeded,
313                                      SRT_MSGCTRL&              w_mc,
314                                      CUDTException&            w_cx);
315     void sendBackup_SilenceRedundantLinks(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime);
316 
317     void send_CheckValidSockets();
318 
319 public:
320     int recv(char* buf, int len, SRT_MSGCTRL& w_mc);
321 
322     void close();
323 
324     void setOpt(SRT_SOCKOPT optname, const void* optval, int optlen);
325     void getOpt(SRT_SOCKOPT optName, void* optval, int& w_optlen);
326     void deriveSettings(srt::CUDT* source);
327     bool applyFlags(uint32_t flags, HandshakeSide);
328 
329     SRT_SOCKSTATUS getStatus();
330 
331     void debugMasterData(SRTSOCKET slave);
332 
isGroupReceiver()333     bool isGroupReceiver()
334     {
335         // XXX add here also other group types, which
336         // predict group receiving.
337         return m_type == SRT_GTYPE_BROADCAST;
338     }
339 
exp_groupLock()340     sync::Mutex* exp_groupLock() { return &m_GroupLock; }
341     void         addEPoll(int eid);
342     void         removeEPollEvents(const int eid);
343     void         removeEPollID(const int eid);
344     void         updateReadState(SRTSOCKET sock, int32_t sequence);
345     void         updateWriteState();
346     void         updateFailedLink();
347     void         activateUpdateEvent(bool still_have_items);
348     int32_t      getRcvBaseSeqNo();
349 
350     /// Update the in-group array of packet providers per sequence number.
351     /// Also basing on the information already provided by possibly other sockets,
352     /// report the real status of packet loss, including packets maybe lost
353     /// by the caller provider, but already received from elsewhere. Note that
354     /// these packets are not ready for extraction until ACK-ed.
355     ///
356     /// @param exp_sequence The previously received sequence at this socket
357     /// @param sequence The sequence of this packet
358     /// @param provider The core of the socket for which the packet was dispatched
359     /// @param time TSBPD time of this packet
360     /// @return The bitmap that marks by 'false' packets lost since next to exp_sequence
361     std::vector<bool> providePacket(int32_t exp_sequence, int32_t sequence, srt::CUDT* provider, uint64_t time);
362 
363     /// This is called from the ACK action by particular socket, which
364     /// actually signs off the packet for extraction.
365     ///
366     /// @param core The socket core for which the ACK was sent
367     /// @param ack The past-the-last-received ACK sequence number
368     void readyPackets(srt::CUDT* core, int32_t ack);
369 
370     void syncWithSocket(const srt::CUDT& core, const HandshakeSide side);
371     int  getGroupData(SRT_SOCKGROUPDATA* pdata, size_t* psize);
372     int  getGroupData_LOCKED(SRT_SOCKGROUPDATA* pdata, size_t* psize);
373     int  configure(const char* str);
374 
375     /// Predicted to be called from the reading function to fill
376     /// the group data array as requested.
377     void fillGroupData(SRT_MSGCTRL&       w_out, //< MSGCTRL to be written
378                        const SRT_MSGCTRL& in     //< MSGCTRL read from the data-providing socket
379     );
380 
381     void copyGroupData(const CUDTGroup::SocketData& source, SRT_SOCKGROUPDATA& w_target);
382 
383 #if ENABLE_HEAVY_LOGGING
384     void debugGroup();
385 #else
debugGroup()386     void debugGroup() {}
387 #endif
388 
389     void ackMessage(int32_t msgno);
390     void handleKeepalive(SocketData*);
391     void internalKeepalive(SocketData*);
392 
393 private:
394     // Check if there's at least one connected socket.
395     // If so, grab the status of all member sockets.
396     void getGroupCount(size_t& w_size, bool& w_still_alive);
397 
398     class srt::CUDTUnited* m_pGlobal;
399     srt::sync::Mutex  m_GroupLock;
400 
401     SRTSOCKET m_GroupID;
402     SRTSOCKET m_PeerGroupID;
403     struct GroupContainer
404     {
405         std::list<SocketData>        m_List;
406 
407         /// This field is used only by some types of groups that need
408         /// to keep track as to which link was lately used. Note that
409         /// by removal of a node from the m_List container, this link
410         /// must be appropriately reset.
411         gli_t m_LastActiveLink;
412 
GroupContainerGroupContainer413         GroupContainer()
414             : m_LastActiveLink(m_List.end())
415         {
416         }
417 
418         // Property<gli_t> active = { m_LastActiveLink; }
419         SRTU_PROPERTY_RW(gli_t, active, m_LastActiveLink);
420 
beginGroupContainer421         gli_t        begin() { return m_List.begin(); }
endGroupContainer422         gli_t        end() { return m_List.end(); }
emptyGroupContainer423         bool         empty() { return m_List.empty(); }
push_backGroupContainer424         void         push_back(const SocketData& data) { m_List.push_back(data); }
clearGroupContainer425         void         clear()
426         {
427             m_LastActiveLink = end();
428             m_List.clear();
429         }
sizeGroupContainer430         size_t size() { return m_List.size(); }
431 
432         void erase(gli_t it);
433     };
434     GroupContainer m_Group;
435     bool           m_selfManaged;
436     bool           m_bSyncOnMsgNo;
437     SRT_GROUP_TYPE m_type;
438     CUDTSocket*    m_listener; // A "group" can only have one listener.
439     srt::sync::atomic<int> m_iBusy;
440     CallbackHolder<srt_connect_callback_fn> m_cbConnectHook;
installConnectHook(srt_connect_callback_fn * hook,void * opaq)441     void installConnectHook(srt_connect_callback_fn* hook, void* opaq)
442     {
443         m_cbConnectHook.set(opaq, hook);
444     }
445 
446 public:
apiAcquire()447     void apiAcquire() { ++m_iBusy; }
apiRelease()448     void apiRelease() { --m_iBusy; }
449 
450     // A normal cycle of the send/recv functions is the following:
451     // - [Initial API call for a group]
452     // - GroupKeeper - ctor
453     //    - LOCK: GlobControlLock
454     //       - Find the group ID in the group container (break if not found)
455     //       - LOCK: GroupLock of that group
456     //           - Set BUSY flag
457     //       - UNLOCK GroupLock
458     //    - UNLOCK GlobControlLock
459     // - [Call the sending function (sendBroadcast/sendBackup)]
460     //    - LOCK GroupLock
461     //       - Preparation activities
462     //       - Loop over group members
463     //       - Send over a single socket
464     //       - Check send status and conditions
465     //       - Exit, if nothing else to be done
466     //       - Check links to send extra
467     //           - UNLOCK GroupLock
468     //               - Wait for first ready link
469     //           - LOCK GroupLock
470     //       - Check status and find sendable link
471     //       - Send over a single socket
472     //       - Check status and update data
473     //    - UNLOCK GroupLock, Exit
474     // - GroupKeeper - dtor
475     // - LOCK GroupLock
476     //    - Clear BUSY flag
477     // - UNLOCK GroupLock
478     // END.
479     //
480     // The possibility for isStillBusy to go on is only the following:
481     // 1. Before calling the API function. As GlobControlLock is locked,
482     //    the nearest lock on GlobControlLock by GroupKeeper can happen:
483     //    - before the group is moved to ClosedGroups (this allows it to be found)
484     //    - after the group is moved to ClosedGroups (this makes the group not found)
485     //    - NOT after the group was deleted, as it could not be found and occupied.
486     //
487     // 2. Before release of GlobControlLock (acquired by GC), but before the
488     //    API function locks GroupLock:
489     //    - the GC call to isStillBusy locks GroupLock, but BUSY flag is already set
490     //    - GC then avoids deletion of the group
491     //
492     // 3. In any further place up to the exit of the API implementation function,
493     // the BUSY flag is still set.
494     //
495     // 4. After exit of GroupKeeper destructor and unlock of GroupLock
496     //    - the group is no longer being accessed and can be freely deleted.
497     //    - the group also can no longer be found by ID.
498 
isStillBusy()499     bool isStillBusy()
500     {
501         sync::ScopedLock glk(m_GroupLock);
502         return m_iBusy || !m_Group.empty();
503     }
504 
505     struct BufferedMessageStorage
506     {
507         size_t             blocksize;
508         size_t             maxstorage;
509         std::vector<char*> storage;
510 
511         BufferedMessageStorage(size_t blk, size_t max = 0)
blocksizeBufferedMessageStorage512             : blocksize(blk)
513             , maxstorage(max)
514             , storage()
515         {
516         }
517 
getBufferedMessageStorage518         char* get()
519         {
520             if (storage.empty())
521                 return new char[blocksize];
522 
523             // Get the element from the end
524             char* block = storage.back();
525             storage.pop_back();
526             return block;
527         }
528 
putBufferedMessageStorage529         void put(char* block)
530         {
531             if (storage.size() >= maxstorage)
532             {
533                 // Simply delete
534                 delete[] block;
535                 return;
536             }
537 
538             // Put the block into the spare buffer
539             storage.push_back(block);
540         }
541 
~BufferedMessageStorageBufferedMessageStorage542         ~BufferedMessageStorage()
543         {
544             for (size_t i = 0; i < storage.size(); ++i)
545                 delete[] storage[i];
546         }
547     };
548 
549     struct BufferedMessage
550     {
551         static BufferedMessageStorage storage;
552 
553         SRT_MSGCTRL   mc;
554         mutable char* data;
555         size_t        size;
556 
BufferedMessageBufferedMessage557         BufferedMessage()
558             : data()
559             , size()
560         {
561         }
~BufferedMessageBufferedMessage562         ~BufferedMessage()
563         {
564             if (data)
565                 storage.put(data);
566         }
567 
568         // NOTE: size 's' must be checked against SRT_LIVE_MAX_PLSIZE
569         // before calling
copyBufferedMessage570         void copy(const char* buf, size_t s)
571         {
572             size = s;
573             data = storage.get();
574             memcpy(data, buf, s);
575         }
576 
BufferedMessageBufferedMessage577         BufferedMessage(const BufferedMessage& foreign)
578             : mc(foreign.mc)
579             , data(foreign.data)
580             , size(foreign.size)
581         {
582             foreign.data = 0;
583         }
584 
585         BufferedMessage& operator=(const BufferedMessage& foreign)
586         {
587             data = foreign.data;
588             size = foreign.size;
589             mc = foreign.mc;
590 
591             foreign.data = 0;
592             return *this;
593         }
594 
595     private:
swap_withBufferedMessage596         void swap_with(BufferedMessage& b)
597         {
598             std::swap(this->mc, b.mc);
599             std::swap(this->data, b.data);
600             std::swap(this->size, b.size);
601         }
602     };
603 
604     typedef std::deque<BufferedMessage> senderBuffer_t;
605     // typedef StaticBuffer<BufferedMessage, 1000> senderBuffer_t;
606 
607 private:
608     // Fields required for SRT_GTYPE_BACKUP groups.
609     senderBuffer_t   m_SenderBuffer;
610     int32_t          m_iSndOldestMsgNo; // oldest position in the sender buffer
611     volatile int32_t m_iSndAckedMsgNo;
612     uint32_t         m_uOPT_StabilityTimeout;
613 
614     // THIS function must be called only in a function for a group type
615     // that does use sender buffer.
616     int32_t addMessageToBuffer(const char* buf, size_t len, SRT_MSGCTRL& w_mc);
617 
618     std::set<int>      m_sPollID; // set of epoll ID to trigger
619     int                m_iMaxPayloadSize;
620     int                m_iAvgPayloadSize;
621     bool               m_bSynRecving;
622     bool               m_bSynSending;
623     bool               m_bTsbPd;
624     bool               m_bTLPktDrop;
625     int64_t            m_iTsbPdDelay_us;
626     int                m_RcvEID;
627     class CEPollDesc*  m_RcvEpolld;
628     int                m_SndEID;
629     class CEPollDesc*  m_SndEpolld;
630 
631     int m_iSndTimeOut; // sending timeout in milliseconds
632     int m_iRcvTimeOut; // receiving timeout in milliseconds
633 
634     // Start times for TsbPd. These times shall be synchronized
635     // between all sockets in the group. The first connected one
636     // defines it, others shall derive it. The value 0 decides if
637     // this has been already set.
638     time_point m_tsStartTime;
639     time_point m_tsRcvPeerStartTime;
640 
641     struct ReadPos
642     {
643         std::vector<char> packet;
644         SRT_MSGCTRL       mctrl;
ReadPosReadPos645         ReadPos(int32_t s)
646             : mctrl(srt_msgctrl_default)
647         {
648             mctrl.pktseq = s;
649         }
650     };
651     std::map<SRTSOCKET, ReadPos> m_Positions;
652 
653     ReadPos* checkPacketAhead();
654 
655     void recv_CollectAliveAndBroken(std::vector<srt::CUDTSocket*>& w_alive, std::set<srt::CUDTSocket*>& w_broken);
656 
657     /// The function polls alive member sockets and retrieves a list of read-ready.
658     /// [acquires lock for CUDT::s_UDTUnited.m_GlobControlLock]
659     /// [[using locked(m_GroupLock)]] temporally unlocks-locks internally
660     ///
661     /// @returns list of read-ready sockets
662     /// @throws CUDTException(MJ_CONNECTION, MN_NOCONN, 0)
663     /// @throws CUDTException(MJ_AGAIN, MN_RDAVAIL, 0)
664     std::vector<srt::CUDTSocket*> recv_WaitForReadReady(const std::vector<srt::CUDTSocket*>& aliveMembers, std::set<srt::CUDTSocket*>& w_broken);
665 
666     // This is the sequence number of a packet that has been previously
667     // delivered. Initially it should be set to SRT_SEQNO_NONE so that the sequence read
668     // from the first delivering socket will be taken as a good deal.
669     volatile int32_t m_RcvBaseSeqNo;
670 
671     bool m_bOpened;    // Set to true when at least one link is at least pending
672     bool m_bConnected; // Set to true on first link confirmed connected
673     bool m_bClosing;
674 
675     // There's no simple way of transforming config
676     // items that are predicted to be used on socket.
677     // Use some options for yourself, store the others
678     // for setting later on a socket.
679     std::vector<ConfigItem> m_config;
680 
681     // Signal for the blocking user thread that the packet
682     // is ready to deliver.
683     srt::sync::Condition m_RcvDataCond;
684     srt::sync::Mutex     m_RcvDataLock;
685     volatile int32_t     m_iLastSchedSeqNo; // represetnts the value of CUDT::m_iSndNextSeqNo for each running socket
686     volatile int32_t     m_iLastSchedMsgNo;
687     // Statistics
688 
689     struct Stats
690     {
691         // Stats state
692         time_point tsActivateTime;   // Time when this group sent or received the first data packet
693         time_point tsLastSampleTime; // Time reset when clearing stats
694 
695         MetricUsage<PacketMetric> sent; // number of packets sent from the application
696         MetricUsage<PacketMetric> recv; // number of packets delivered from the group to the application
697         MetricUsage<PacketMetric>
698                                   recvDrop; // number of packets dropped by the group receiver (not received from any member)
699         MetricUsage<PacketMetric> recvDiscard; // number of packets discarded as already delivered
700 
initStats701         void init()
702         {
703             tsActivateTime = srt::sync::steady_clock::time_point();
704             sent.Init();
705             recv.Init();
706             recvDrop.Init();
707             recvDiscard.Init();
708 
709             reset();
710         }
711 
resetStats712         void reset()
713         {
714             sent.Clear();
715             recv.Clear();
716             recvDrop.Clear();
717             recvDiscard.Clear();
718 
719             tsLastSampleTime = srt::sync::steady_clock::now();
720         }
721     } m_stats;
722 
updateAvgPayloadSize(int size)723     void updateAvgPayloadSize(int size)
724     {
725         if (m_iAvgPayloadSize == -1)
726             m_iAvgPayloadSize = size;
727         else
728             m_iAvgPayloadSize = avg_iir<4>(m_iAvgPayloadSize, size);
729     }
730 
avgRcvPacketSize()731     int avgRcvPacketSize()
732     {
733         // In case when no packet has been received yet, but already notified
734         // a dropped packet, its size will be SRT_LIVE_DEF_PLSIZE. It will be
735         // the value most matching in the typical uses, although no matter what
736         // value would be used here, each one would be wrong from some points
737         // of view. This one is simply the best choice for typical uses of groups
738         // provided that they are to be ued only for live mode.
739         return m_iAvgPayloadSize == -1 ? SRT_LIVE_DEF_PLSIZE : m_iAvgPayloadSize;
740     }
741 
742 public:
743     void bstatsSocket(CBytePerfMon* perf, bool clear);
744 
745     // Required after the call on newGroup on the listener side.
746     // On the listener side the group is lazily created just before
747     // accepting a new socket and therefore always open.
setOpen()748     void setOpen() { m_bOpened = true; }
749 
CONID()750     std::string CONID() const
751     {
752 #if ENABLE_LOGGING
753         std::ostringstream os;
754         os << "@" << m_GroupID << ":";
755         return os.str();
756 #else
757         return "";
758 #endif
759     }
760 
resetInitialRxSequence()761     void resetInitialRxSequence()
762     {
763         // The app-reader doesn't care about the real sequence number.
764         // The first provided one will be taken as a good deal; even if
765         // this is going to be past the ISN, at worst it will be caused
766         // by TLPKTDROP.
767         m_RcvBaseSeqNo = SRT_SEQNO_NONE;
768     }
769 
applyGroupTime(time_point & w_start_time,time_point & w_peer_start_time)770     bool applyGroupTime(time_point& w_start_time, time_point& w_peer_start_time)
771     {
772         using srt::sync::is_zero;
773         using srt_logging::gmlog;
774 
775         if (is_zero(m_tsStartTime))
776         {
777             // The first socket, defines the group time for the whole group.
778             m_tsStartTime        = w_start_time;
779             m_tsRcvPeerStartTime = w_peer_start_time;
780             return true;
781         }
782 
783         // Sanity check. This should never happen, fix the bug if found!
784         if (is_zero(m_tsRcvPeerStartTime))
785         {
786             LOGC(gmlog.Error, log << "IPE: only StartTime is set, RcvPeerStartTime still 0!");
787             // Kinda fallback, but that's not too safe.
788             m_tsRcvPeerStartTime = w_peer_start_time;
789         }
790 
791         // The redundant connection, derive the times
792         w_start_time      = m_tsStartTime;
793         w_peer_start_time = m_tsRcvPeerStartTime;
794 
795         return false;
796     }
797 
798     // Live state synchronization
799     bool getBufferTimeBase(srt::CUDT* forthesakeof, time_point& w_tb, bool& w_wp, duration& w_dr);
800     bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);
801 
802     /// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference.
803     /// @param srcMember a reference for synchronization.
804     void synchronizeDrift(const srt::CUDT* srcMember);
805 
806     void updateLatestRcv(srt::CUDTSocket*);
807 
808     // Property accessors
809     SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, id, m_GroupID);
810     SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, peerid, m_PeerGroupID);
811     SRTU_PROPERTY_RW_CHAIN(CUDTGroup, bool, managed, m_selfManaged);
812     SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRT_GROUP_TYPE, type, m_type);
813     SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentSchedSequence, m_iLastSchedSeqNo);
814     SRTU_PROPERTY_RRW(std::set<int>&, epollset, m_sPollID);
815     SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency, m_iTsbPdDelay_us);
816     SRTU_PROPERTY_RO(bool, synconmsgno, m_bSyncOnMsgNo);
817     SRTU_PROPERTY_RO(bool, closing, m_bClosing);
818 };
819 
820 } // namespace srt
821 
822 #endif // INC_SRT_GROUP_H
823