1 /* 2 * SRT - Secure, Reliable, Transport 3 * Copyright (c) 2020 Haivision Systems Inc. 4 * 5 * This Source Code Form is subject to the terms of the Mozilla Public 6 * License, v. 2.0. If a copy of the MPL was not distributed with this 7 * file, You can obtain one at http://mozilla.org/MPL/2.0/. 8 * 9 */ 10 11 /***************************************************************************** 12 Written by 13 Haivision Systems Inc. 14 *****************************************************************************/ 15 16 #ifndef INC_SRT_GROUP_H 17 #define INC_SRT_GROUP_H 18 19 #include "srt.h" 20 #include "common.h" 21 #include "packet.h" 22 #include "group_common.h" 23 #include "group_backup.h" 24 25 namespace srt 26 { 27 28 #if ENABLE_HEAVY_LOGGING 29 const char* const srt_log_grp_state[] = {"PENDING", "IDLE", "RUNNING", "BROKEN"}; 30 #endif 31 32 33 class CUDTGroup 34 { 35 friend class CUDTUnited; 36 37 typedef sync::steady_clock::time_point time_point; 38 typedef sync::steady_clock::duration duration; 39 typedef sync::steady_clock steady_clock; 40 typedef groups::SocketData SocketData; 41 typedef groups::SendBackupCtx SendBackupCtx; 42 typedef groups::BackupMemberState BackupMemberState; 43 44 public: 45 typedef SRT_MEMBERSTATUS GroupState; 46 47 // Note that the use of states may differ in particular group types: 48 // 49 // Broadcast: links that are freshly connected become PENDING and then IDLE only 50 // for a short moment to be activated immediately at the nearest sending operation. 51 // 52 // Balancing: like with broadcast, just that the link activation gets its shared percentage 53 // of traffic balancing 54 // 55 // Multicast: The link is never idle. The data are always sent over the UDP multicast link 56 // and the receiver simply gets subscribed and reads packets once it's ready. 57 // 58 // Backup: The link stays idle until it's activated, and the activation can only happen 59 // at the moment when the currently active link is "suspected of being likely broken" 60 // (the current active link fails to receive ACK in a time when two ACKs should already 61 // be received). After a while when the current active link is confirmed broken, it turns 62 // into broken state. 63 64 static const char* StateStr(GroupState); 65 66 static int32_t s_tokenGen; genToken()67 static int32_t genToken() { ++s_tokenGen; if (s_tokenGen < 0) s_tokenGen = 0; return s_tokenGen;} 68 69 struct ConfigItem 70 { 71 SRT_SOCKOPT so; 72 std::vector<unsigned char> value; 73 74 template <class T> getConfigItem75 bool get(T& refr) 76 { 77 if (sizeof(T) > value.size()) 78 return false; 79 refr = *(T*)&value[0]; 80 return true; 81 } 82 ConfigItemConfigItem83 ConfigItem(SRT_SOCKOPT o, const void* val, int size) 84 : so(o) 85 { 86 value.resize(size); 87 unsigned char* begin = (unsigned char*)val; 88 std::copy(begin, begin + size, value.begin()); 89 } 90 91 struct OfType 92 { 93 SRT_SOCKOPT so; OfTypeConfigItem::OfType94 OfType(SRT_SOCKOPT soso) 95 : so(soso) 96 { 97 } operatorConfigItem::OfType98 bool operator()(ConfigItem& ci) { return ci.so == so; } 99 }; 100 }; 101 102 typedef std::list<SocketData> group_t; 103 typedef group_t::iterator gli_t; 104 typedef std::vector< std::pair<SRTSOCKET, srt::CUDTSocket*> > sendable_t; 105 106 struct Sendstate 107 { 108 SRTSOCKET id; 109 SocketData* mb; 110 int stat; 111 int code; 112 }; 113 114 CUDTGroup(SRT_GROUP_TYPE); 115 ~CUDTGroup(); 116 117 SocketData* add(SocketData data); 118 119 struct HaveID 120 { 121 SRTSOCKET id; HaveIDHaveID122 HaveID(SRTSOCKET sid) 123 : id(sid) 124 { 125 } operatorHaveID126 bool operator()(const SocketData& s) { return s.id == id; } 127 }; 128 contains(SRTSOCKET id,SocketData * & w_f)129 bool contains(SRTSOCKET id, SocketData*& w_f) 130 { 131 srt::sync::ScopedLock g(m_GroupLock); 132 gli_t f = std::find_if(m_Group.begin(), m_Group.end(), HaveID(id)); 133 if (f == m_Group.end()) 134 { 135 w_f = NULL; 136 return false; 137 } 138 w_f = &*f; 139 return true; 140 } 141 142 // NEED LOCKING begin()143 gli_t begin() { return m_Group.begin(); } end()144 gli_t end() { return m_Group.end(); } 145 146 /// Remove the socket from the group container. 147 /// REMEMBER: the group spec should be taken from the socket 148 /// (set m_GroupOf and m_GroupMemberData to NULL 149 /// PRIOR TO calling this function. 150 /// @param id Socket ID to look for in the container to remove 151 /// @return true if the container still contains any sockets after the operation remove(SRTSOCKET id)152 bool remove(SRTSOCKET id) 153 { 154 using srt_logging::gmlog; 155 srt::sync::ScopedLock g(m_GroupLock); 156 157 bool empty = false; 158 HLOGC(gmlog.Debug, log << "group/remove: going to remove @" << id << " from $" << m_GroupID); 159 160 gli_t f = std::find_if(m_Group.begin(), m_Group.end(), HaveID(id)); 161 if (f != m_Group.end()) 162 { 163 m_Group.erase(f); 164 165 // Reset sequence numbers on a dead group so that they are 166 // initialized anew with the new alive connection within 167 // the group. 168 // XXX The problem is that this should be done after the 169 // socket is considered DISCONNECTED, not when it's being 170 // closed. After being disconnected, the sequence numbers 171 // are no longer valid, and will be reinitialized when the 172 // socket is connected again. This may stay as is for now 173 // as in SRT it's not predicted to do anything with the socket 174 // that was disconnected other than immediately closing it. 175 if (m_Group.empty()) 176 { 177 // When the group is empty, there's no danger that this 178 // number will collide with any ISN provided by a socket. 179 // Also since now every socket will derive this ISN. 180 m_iLastSchedSeqNo = generateISN(); 181 resetInitialRxSequence(); 182 empty = true; 183 } 184 } 185 else 186 { 187 HLOGC(gmlog.Debug, log << "group/remove: IPE: id @" << id << " NOT FOUND"); 188 empty = true; // not exactly true, but this is to cause error on group in the APP 189 } 190 191 if (m_Group.empty()) 192 { 193 m_bOpened = false; 194 m_bConnected = false; 195 } 196 197 // XXX BUGFIX 198 m_Positions.erase(id); 199 200 return !empty; 201 } 202 groupEmpty()203 bool groupEmpty() 204 { 205 srt::sync::ScopedLock g(m_GroupLock); 206 return m_Group.empty(); 207 } 208 209 void setGroupConnected(); 210 211 int send(const char* buf, int len, SRT_MSGCTRL& w_mc); 212 int sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc); 213 int sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc); 214 static int32_t generateISN(); 215 216 private: 217 // For Backup, sending all previous packet 218 int sendBackupRexmit(srt::CUDT& core, SRT_MSGCTRL& w_mc); 219 220 // Support functions for sendBackup and sendBroadcast 221 /// Check if group member is idle. 222 /// @param d group member 223 /// @param[in,out] w_wipeme array of sockets to remove from group 224 /// @param[in,out] w_pendingLinks array of sockets pending for connection 225 /// @returns true if d is idle (standby), false otherwise 226 bool send_CheckIdle(const gli_t d, std::vector<SRTSOCKET>& w_wipeme, std::vector<SRTSOCKET>& w_pendingLinks); 227 228 229 /// This function checks if the member has just become idle (check if sender buffer is empty) to send a KEEPALIVE immidiatelly. 230 /// @todo Check it is some abandoned logic. 231 void sendBackup_CheckIdleTime(gli_t w_d); 232 233 /// Qualify states of member links. 234 /// [[using locked(this->m_GroupLock, m_pGlobal->m_GlobControlLock)]] 235 /// @param[out] w_sendBackupCtx the context will be updated with state qualifications 236 /// @param[in] currtime current timestamp 237 void sendBackup_QualifyMemberStates(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime); 238 239 void sendBackup_AssignBackupState(srt::CUDT& socket, BackupMemberState state, const steady_clock::time_point& currtime); 240 241 /// Qualify the state of the active link: fresh, stable, unstable, wary. 242 /// @retval active backup member state: fresh, stable, unstable, wary. 243 BackupMemberState sendBackup_QualifyActiveState(const gli_t d, const time_point currtime); 244 245 BackupMemberState sendBackup_QualifyIfStandBy(const gli_t d); 246 247 /// Sends the same payload over all active members. 248 /// @param[in] buf payload 249 /// @param[in] len payload length in bytes 250 /// @param[in,out] w_mc message control 251 /// @param[in] currtime current time 252 /// @param[in] currseq current packet sequence number 253 /// @param[out] w_nsuccessful number of members with successfull sending. 254 /// @param[in,out] maxActiveWeight 255 /// @param[in,out] sendBackupCtx context 256 /// @param[in,out] w_cx error 257 /// @return group send result: -1 if sending over all members has failed; number of bytes sent overwise. 258 int sendBackup_SendOverActive(const char* buf, int len, SRT_MSGCTRL& w_mc, const steady_clock::time_point& currtime, int32_t& w_curseq, 259 size_t& w_nsuccessful, uint16_t& w_maxActiveWeight, SendBackupCtx& w_sendBackupCtx, CUDTException& w_cx); 260 261 /// Check link sending status 262 /// @param[in] currtime Current time (logging only) 263 /// @param[in] send_status Result of sending over the socket 264 /// @param[in] lastseq Last sent sequence number before the current sending operation 265 /// @param[in] pktseq Packet sequence number currently tried to be sent 266 /// @param[out] w_u CUDT unit of the current member (to allow calling overrideSndSeqNo) 267 /// @param[out] w_curseq Group's current sequence number (either -1 or the value used already for other links) 268 /// @param[out] w_final_stat w_final_stat = send_status if sending succeded. 269 /// 270 /// @returns true if the sending operation result (submitted in stat) is a success, false otherwise. 271 bool sendBackup_CheckSendStatus(const time_point& currtime, 272 const int send_status, 273 const int32_t lastseq, 274 const int32_t pktseq, 275 CUDT& w_u, 276 int32_t& w_curseq, 277 int& w_final_stat); 278 void sendBackup_Buffering(const char* buf, const int len, int32_t& curseq, SRT_MSGCTRL& w_mc); 279 280 size_t sendBackup_TryActivateStandbyIfNeeded( 281 const char* buf, 282 const int len, 283 bool& w_none_succeeded, 284 SRT_MSGCTRL& w_mc, 285 int32_t& w_curseq, 286 int32_t& w_final_stat, 287 SendBackupCtx& w_sendBackupCtx, 288 CUDTException& w_cx, 289 const steady_clock::time_point& currtime); 290 291 /// Check if pending sockets are to be qualified as broken. 292 /// This qualification later results in removing the socket from a group and closing it. 293 /// @param[in,out] a context with a list of member sockets, some pending might qualified broken 294 void sendBackup_CheckPendingSockets(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime); 295 296 /// Check if unstable sockets are to be qualified as broken. 297 /// The main reason for such qualification is if a socket is unstable for too long. 298 /// This qualification later results in removing the socket from a group and closing it. 299 /// @param[in,out] a context with a list of member sockets, some pending might qualified broken 300 void sendBackup_CheckUnstableSockets(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime); 301 302 /// @brief Marks broken sockets as closed. Used in broadcast sending. 303 /// @param w_wipeme a list of sockets to close 304 void send_CloseBrokenSockets(std::vector<SRTSOCKET>& w_wipeme); 305 306 /// @brief Marks broken sockets as closed. Used in backup sending. 307 /// @param w_sendBackupCtx the context with a list of broken sockets 308 void sendBackup_CloseBrokenSockets(SendBackupCtx& w_sendBackupCtx); 309 310 void sendBackup_RetryWaitBlocked(SendBackupCtx& w_sendBackupCtx, 311 int& w_final_stat, 312 bool& w_none_succeeded, 313 SRT_MSGCTRL& w_mc, 314 CUDTException& w_cx); 315 void sendBackup_SilenceRedundantLinks(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime); 316 317 void send_CheckValidSockets(); 318 319 public: 320 int recv(char* buf, int len, SRT_MSGCTRL& w_mc); 321 322 void close(); 323 324 void setOpt(SRT_SOCKOPT optname, const void* optval, int optlen); 325 void getOpt(SRT_SOCKOPT optName, void* optval, int& w_optlen); 326 void deriveSettings(srt::CUDT* source); 327 bool applyFlags(uint32_t flags, HandshakeSide); 328 329 SRT_SOCKSTATUS getStatus(); 330 331 void debugMasterData(SRTSOCKET slave); 332 isGroupReceiver()333 bool isGroupReceiver() 334 { 335 // XXX add here also other group types, which 336 // predict group receiving. 337 return m_type == SRT_GTYPE_BROADCAST; 338 } 339 exp_groupLock()340 sync::Mutex* exp_groupLock() { return &m_GroupLock; } 341 void addEPoll(int eid); 342 void removeEPollEvents(const int eid); 343 void removeEPollID(const int eid); 344 void updateReadState(SRTSOCKET sock, int32_t sequence); 345 void updateWriteState(); 346 void updateFailedLink(); 347 void activateUpdateEvent(bool still_have_items); 348 int32_t getRcvBaseSeqNo(); 349 350 /// Update the in-group array of packet providers per sequence number. 351 /// Also basing on the information already provided by possibly other sockets, 352 /// report the real status of packet loss, including packets maybe lost 353 /// by the caller provider, but already received from elsewhere. Note that 354 /// these packets are not ready for extraction until ACK-ed. 355 /// 356 /// @param exp_sequence The previously received sequence at this socket 357 /// @param sequence The sequence of this packet 358 /// @param provider The core of the socket for which the packet was dispatched 359 /// @param time TSBPD time of this packet 360 /// @return The bitmap that marks by 'false' packets lost since next to exp_sequence 361 std::vector<bool> providePacket(int32_t exp_sequence, int32_t sequence, srt::CUDT* provider, uint64_t time); 362 363 /// This is called from the ACK action by particular socket, which 364 /// actually signs off the packet for extraction. 365 /// 366 /// @param core The socket core for which the ACK was sent 367 /// @param ack The past-the-last-received ACK sequence number 368 void readyPackets(srt::CUDT* core, int32_t ack); 369 370 void syncWithSocket(const srt::CUDT& core, const HandshakeSide side); 371 int getGroupData(SRT_SOCKGROUPDATA* pdata, size_t* psize); 372 int getGroupData_LOCKED(SRT_SOCKGROUPDATA* pdata, size_t* psize); 373 int configure(const char* str); 374 375 /// Predicted to be called from the reading function to fill 376 /// the group data array as requested. 377 void fillGroupData(SRT_MSGCTRL& w_out, //< MSGCTRL to be written 378 const SRT_MSGCTRL& in //< MSGCTRL read from the data-providing socket 379 ); 380 381 void copyGroupData(const CUDTGroup::SocketData& source, SRT_SOCKGROUPDATA& w_target); 382 383 #if ENABLE_HEAVY_LOGGING 384 void debugGroup(); 385 #else debugGroup()386 void debugGroup() {} 387 #endif 388 389 void ackMessage(int32_t msgno); 390 void handleKeepalive(SocketData*); 391 void internalKeepalive(SocketData*); 392 393 private: 394 // Check if there's at least one connected socket. 395 // If so, grab the status of all member sockets. 396 void getGroupCount(size_t& w_size, bool& w_still_alive); 397 398 class srt::CUDTUnited* m_pGlobal; 399 srt::sync::Mutex m_GroupLock; 400 401 SRTSOCKET m_GroupID; 402 SRTSOCKET m_PeerGroupID; 403 struct GroupContainer 404 { 405 std::list<SocketData> m_List; 406 407 /// This field is used only by some types of groups that need 408 /// to keep track as to which link was lately used. Note that 409 /// by removal of a node from the m_List container, this link 410 /// must be appropriately reset. 411 gli_t m_LastActiveLink; 412 GroupContainerGroupContainer413 GroupContainer() 414 : m_LastActiveLink(m_List.end()) 415 { 416 } 417 418 // Property<gli_t> active = { m_LastActiveLink; } 419 SRTU_PROPERTY_RW(gli_t, active, m_LastActiveLink); 420 beginGroupContainer421 gli_t begin() { return m_List.begin(); } endGroupContainer422 gli_t end() { return m_List.end(); } emptyGroupContainer423 bool empty() { return m_List.empty(); } push_backGroupContainer424 void push_back(const SocketData& data) { m_List.push_back(data); } clearGroupContainer425 void clear() 426 { 427 m_LastActiveLink = end(); 428 m_List.clear(); 429 } sizeGroupContainer430 size_t size() { return m_List.size(); } 431 432 void erase(gli_t it); 433 }; 434 GroupContainer m_Group; 435 bool m_selfManaged; 436 bool m_bSyncOnMsgNo; 437 SRT_GROUP_TYPE m_type; 438 CUDTSocket* m_listener; // A "group" can only have one listener. 439 srt::sync::atomic<int> m_iBusy; 440 CallbackHolder<srt_connect_callback_fn> m_cbConnectHook; installConnectHook(srt_connect_callback_fn * hook,void * opaq)441 void installConnectHook(srt_connect_callback_fn* hook, void* opaq) 442 { 443 m_cbConnectHook.set(opaq, hook); 444 } 445 446 public: apiAcquire()447 void apiAcquire() { ++m_iBusy; } apiRelease()448 void apiRelease() { --m_iBusy; } 449 450 // A normal cycle of the send/recv functions is the following: 451 // - [Initial API call for a group] 452 // - GroupKeeper - ctor 453 // - LOCK: GlobControlLock 454 // - Find the group ID in the group container (break if not found) 455 // - LOCK: GroupLock of that group 456 // - Set BUSY flag 457 // - UNLOCK GroupLock 458 // - UNLOCK GlobControlLock 459 // - [Call the sending function (sendBroadcast/sendBackup)] 460 // - LOCK GroupLock 461 // - Preparation activities 462 // - Loop over group members 463 // - Send over a single socket 464 // - Check send status and conditions 465 // - Exit, if nothing else to be done 466 // - Check links to send extra 467 // - UNLOCK GroupLock 468 // - Wait for first ready link 469 // - LOCK GroupLock 470 // - Check status and find sendable link 471 // - Send over a single socket 472 // - Check status and update data 473 // - UNLOCK GroupLock, Exit 474 // - GroupKeeper - dtor 475 // - LOCK GroupLock 476 // - Clear BUSY flag 477 // - UNLOCK GroupLock 478 // END. 479 // 480 // The possibility for isStillBusy to go on is only the following: 481 // 1. Before calling the API function. As GlobControlLock is locked, 482 // the nearest lock on GlobControlLock by GroupKeeper can happen: 483 // - before the group is moved to ClosedGroups (this allows it to be found) 484 // - after the group is moved to ClosedGroups (this makes the group not found) 485 // - NOT after the group was deleted, as it could not be found and occupied. 486 // 487 // 2. Before release of GlobControlLock (acquired by GC), but before the 488 // API function locks GroupLock: 489 // - the GC call to isStillBusy locks GroupLock, but BUSY flag is already set 490 // - GC then avoids deletion of the group 491 // 492 // 3. In any further place up to the exit of the API implementation function, 493 // the BUSY flag is still set. 494 // 495 // 4. After exit of GroupKeeper destructor and unlock of GroupLock 496 // - the group is no longer being accessed and can be freely deleted. 497 // - the group also can no longer be found by ID. 498 isStillBusy()499 bool isStillBusy() 500 { 501 sync::ScopedLock glk(m_GroupLock); 502 return m_iBusy || !m_Group.empty(); 503 } 504 505 struct BufferedMessageStorage 506 { 507 size_t blocksize; 508 size_t maxstorage; 509 std::vector<char*> storage; 510 511 BufferedMessageStorage(size_t blk, size_t max = 0) blocksizeBufferedMessageStorage512 : blocksize(blk) 513 , maxstorage(max) 514 , storage() 515 { 516 } 517 getBufferedMessageStorage518 char* get() 519 { 520 if (storage.empty()) 521 return new char[blocksize]; 522 523 // Get the element from the end 524 char* block = storage.back(); 525 storage.pop_back(); 526 return block; 527 } 528 putBufferedMessageStorage529 void put(char* block) 530 { 531 if (storage.size() >= maxstorage) 532 { 533 // Simply delete 534 delete[] block; 535 return; 536 } 537 538 // Put the block into the spare buffer 539 storage.push_back(block); 540 } 541 ~BufferedMessageStorageBufferedMessageStorage542 ~BufferedMessageStorage() 543 { 544 for (size_t i = 0; i < storage.size(); ++i) 545 delete[] storage[i]; 546 } 547 }; 548 549 struct BufferedMessage 550 { 551 static BufferedMessageStorage storage; 552 553 SRT_MSGCTRL mc; 554 mutable char* data; 555 size_t size; 556 BufferedMessageBufferedMessage557 BufferedMessage() 558 : data() 559 , size() 560 { 561 } ~BufferedMessageBufferedMessage562 ~BufferedMessage() 563 { 564 if (data) 565 storage.put(data); 566 } 567 568 // NOTE: size 's' must be checked against SRT_LIVE_MAX_PLSIZE 569 // before calling copyBufferedMessage570 void copy(const char* buf, size_t s) 571 { 572 size = s; 573 data = storage.get(); 574 memcpy(data, buf, s); 575 } 576 BufferedMessageBufferedMessage577 BufferedMessage(const BufferedMessage& foreign) 578 : mc(foreign.mc) 579 , data(foreign.data) 580 , size(foreign.size) 581 { 582 foreign.data = 0; 583 } 584 585 BufferedMessage& operator=(const BufferedMessage& foreign) 586 { 587 data = foreign.data; 588 size = foreign.size; 589 mc = foreign.mc; 590 591 foreign.data = 0; 592 return *this; 593 } 594 595 private: swap_withBufferedMessage596 void swap_with(BufferedMessage& b) 597 { 598 std::swap(this->mc, b.mc); 599 std::swap(this->data, b.data); 600 std::swap(this->size, b.size); 601 } 602 }; 603 604 typedef std::deque<BufferedMessage> senderBuffer_t; 605 // typedef StaticBuffer<BufferedMessage, 1000> senderBuffer_t; 606 607 private: 608 // Fields required for SRT_GTYPE_BACKUP groups. 609 senderBuffer_t m_SenderBuffer; 610 int32_t m_iSndOldestMsgNo; // oldest position in the sender buffer 611 volatile int32_t m_iSndAckedMsgNo; 612 uint32_t m_uOPT_StabilityTimeout; 613 614 // THIS function must be called only in a function for a group type 615 // that does use sender buffer. 616 int32_t addMessageToBuffer(const char* buf, size_t len, SRT_MSGCTRL& w_mc); 617 618 std::set<int> m_sPollID; // set of epoll ID to trigger 619 int m_iMaxPayloadSize; 620 int m_iAvgPayloadSize; 621 bool m_bSynRecving; 622 bool m_bSynSending; 623 bool m_bTsbPd; 624 bool m_bTLPktDrop; 625 int64_t m_iTsbPdDelay_us; 626 int m_RcvEID; 627 class CEPollDesc* m_RcvEpolld; 628 int m_SndEID; 629 class CEPollDesc* m_SndEpolld; 630 631 int m_iSndTimeOut; // sending timeout in milliseconds 632 int m_iRcvTimeOut; // receiving timeout in milliseconds 633 634 // Start times for TsbPd. These times shall be synchronized 635 // between all sockets in the group. The first connected one 636 // defines it, others shall derive it. The value 0 decides if 637 // this has been already set. 638 time_point m_tsStartTime; 639 time_point m_tsRcvPeerStartTime; 640 641 struct ReadPos 642 { 643 std::vector<char> packet; 644 SRT_MSGCTRL mctrl; ReadPosReadPos645 ReadPos(int32_t s) 646 : mctrl(srt_msgctrl_default) 647 { 648 mctrl.pktseq = s; 649 } 650 }; 651 std::map<SRTSOCKET, ReadPos> m_Positions; 652 653 ReadPos* checkPacketAhead(); 654 655 void recv_CollectAliveAndBroken(std::vector<srt::CUDTSocket*>& w_alive, std::set<srt::CUDTSocket*>& w_broken); 656 657 /// The function polls alive member sockets and retrieves a list of read-ready. 658 /// [acquires lock for CUDT::s_UDTUnited.m_GlobControlLock] 659 /// [[using locked(m_GroupLock)]] temporally unlocks-locks internally 660 /// 661 /// @returns list of read-ready sockets 662 /// @throws CUDTException(MJ_CONNECTION, MN_NOCONN, 0) 663 /// @throws CUDTException(MJ_AGAIN, MN_RDAVAIL, 0) 664 std::vector<srt::CUDTSocket*> recv_WaitForReadReady(const std::vector<srt::CUDTSocket*>& aliveMembers, std::set<srt::CUDTSocket*>& w_broken); 665 666 // This is the sequence number of a packet that has been previously 667 // delivered. Initially it should be set to SRT_SEQNO_NONE so that the sequence read 668 // from the first delivering socket will be taken as a good deal. 669 volatile int32_t m_RcvBaseSeqNo; 670 671 bool m_bOpened; // Set to true when at least one link is at least pending 672 bool m_bConnected; // Set to true on first link confirmed connected 673 bool m_bClosing; 674 675 // There's no simple way of transforming config 676 // items that are predicted to be used on socket. 677 // Use some options for yourself, store the others 678 // for setting later on a socket. 679 std::vector<ConfigItem> m_config; 680 681 // Signal for the blocking user thread that the packet 682 // is ready to deliver. 683 srt::sync::Condition m_RcvDataCond; 684 srt::sync::Mutex m_RcvDataLock; 685 volatile int32_t m_iLastSchedSeqNo; // represetnts the value of CUDT::m_iSndNextSeqNo for each running socket 686 volatile int32_t m_iLastSchedMsgNo; 687 // Statistics 688 689 struct Stats 690 { 691 // Stats state 692 time_point tsActivateTime; // Time when this group sent or received the first data packet 693 time_point tsLastSampleTime; // Time reset when clearing stats 694 695 MetricUsage<PacketMetric> sent; // number of packets sent from the application 696 MetricUsage<PacketMetric> recv; // number of packets delivered from the group to the application 697 MetricUsage<PacketMetric> 698 recvDrop; // number of packets dropped by the group receiver (not received from any member) 699 MetricUsage<PacketMetric> recvDiscard; // number of packets discarded as already delivered 700 initStats701 void init() 702 { 703 tsActivateTime = srt::sync::steady_clock::time_point(); 704 sent.Init(); 705 recv.Init(); 706 recvDrop.Init(); 707 recvDiscard.Init(); 708 709 reset(); 710 } 711 resetStats712 void reset() 713 { 714 sent.Clear(); 715 recv.Clear(); 716 recvDrop.Clear(); 717 recvDiscard.Clear(); 718 719 tsLastSampleTime = srt::sync::steady_clock::now(); 720 } 721 } m_stats; 722 updateAvgPayloadSize(int size)723 void updateAvgPayloadSize(int size) 724 { 725 if (m_iAvgPayloadSize == -1) 726 m_iAvgPayloadSize = size; 727 else 728 m_iAvgPayloadSize = avg_iir<4>(m_iAvgPayloadSize, size); 729 } 730 avgRcvPacketSize()731 int avgRcvPacketSize() 732 { 733 // In case when no packet has been received yet, but already notified 734 // a dropped packet, its size will be SRT_LIVE_DEF_PLSIZE. It will be 735 // the value most matching in the typical uses, although no matter what 736 // value would be used here, each one would be wrong from some points 737 // of view. This one is simply the best choice for typical uses of groups 738 // provided that they are to be ued only for live mode. 739 return m_iAvgPayloadSize == -1 ? SRT_LIVE_DEF_PLSIZE : m_iAvgPayloadSize; 740 } 741 742 public: 743 void bstatsSocket(CBytePerfMon* perf, bool clear); 744 745 // Required after the call on newGroup on the listener side. 746 // On the listener side the group is lazily created just before 747 // accepting a new socket and therefore always open. setOpen()748 void setOpen() { m_bOpened = true; } 749 CONID()750 std::string CONID() const 751 { 752 #if ENABLE_LOGGING 753 std::ostringstream os; 754 os << "@" << m_GroupID << ":"; 755 return os.str(); 756 #else 757 return ""; 758 #endif 759 } 760 resetInitialRxSequence()761 void resetInitialRxSequence() 762 { 763 // The app-reader doesn't care about the real sequence number. 764 // The first provided one will be taken as a good deal; even if 765 // this is going to be past the ISN, at worst it will be caused 766 // by TLPKTDROP. 767 m_RcvBaseSeqNo = SRT_SEQNO_NONE; 768 } 769 applyGroupTime(time_point & w_start_time,time_point & w_peer_start_time)770 bool applyGroupTime(time_point& w_start_time, time_point& w_peer_start_time) 771 { 772 using srt::sync::is_zero; 773 using srt_logging::gmlog; 774 775 if (is_zero(m_tsStartTime)) 776 { 777 // The first socket, defines the group time for the whole group. 778 m_tsStartTime = w_start_time; 779 m_tsRcvPeerStartTime = w_peer_start_time; 780 return true; 781 } 782 783 // Sanity check. This should never happen, fix the bug if found! 784 if (is_zero(m_tsRcvPeerStartTime)) 785 { 786 LOGC(gmlog.Error, log << "IPE: only StartTime is set, RcvPeerStartTime still 0!"); 787 // Kinda fallback, but that's not too safe. 788 m_tsRcvPeerStartTime = w_peer_start_time; 789 } 790 791 // The redundant connection, derive the times 792 w_start_time = m_tsStartTime; 793 w_peer_start_time = m_tsRcvPeerStartTime; 794 795 return false; 796 } 797 798 // Live state synchronization 799 bool getBufferTimeBase(srt::CUDT* forthesakeof, time_point& w_tb, bool& w_wp, duration& w_dr); 800 bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn); 801 802 /// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference. 803 /// @param srcMember a reference for synchronization. 804 void synchronizeDrift(const srt::CUDT* srcMember); 805 806 void updateLatestRcv(srt::CUDTSocket*); 807 808 // Property accessors 809 SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, id, m_GroupID); 810 SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, peerid, m_PeerGroupID); 811 SRTU_PROPERTY_RW_CHAIN(CUDTGroup, bool, managed, m_selfManaged); 812 SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRT_GROUP_TYPE, type, m_type); 813 SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentSchedSequence, m_iLastSchedSeqNo); 814 SRTU_PROPERTY_RRW(std::set<int>&, epollset, m_sPollID); 815 SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency, m_iTsbPdDelay_us); 816 SRTU_PROPERTY_RO(bool, synconmsgno, m_bSyncOnMsgNo); 817 SRTU_PROPERTY_RO(bool, closing, m_bClosing); 818 }; 819 820 } // namespace srt 821 822 #endif // INC_SRT_GROUP_H 823