1 /* 2 * SRT - Secure, Reliable, Transport 3 * Copyright (c) 2018 Haivision Systems Inc. 4 * 5 * This Source Code Form is subject to the terms of the Mozilla Public 6 * License, v. 2.0. If a copy of the MPL was not distributed with this 7 * file, You can obtain one at http://mozilla.org/MPL/2.0/. 8 * 9 */ 10 11 /***************************************************************************** 12 Copyright (c) 2001 - 2009, The Board of Trustees of the University of Illinois. 13 All rights reserved. 14 15 Redistribution and use in source and binary forms, with or without 16 modification, are permitted provided that the following conditions are 17 met: 18 19 * Redistributions of source code must retain the above 20 copyright notice, this list of conditions and the 21 following disclaimer. 22 23 * Redistributions in binary form must reproduce the 24 above copyright notice, this list of conditions 25 and the following disclaimer in the documentation 26 and/or other materials provided with the distribution. 27 28 * Neither the name of the University of Illinois 29 nor the names of its contributors may be used to 30 endorse or promote products derived from this 31 software without specific prior written permission. 32 33 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 34 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 35 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 36 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 37 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 38 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 39 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 40 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 41 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 42 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 43 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 44 *****************************************************************************/ 45 46 /***************************************************************************** 47 written by 48 Yunhong Gu, last updated 05/05/2009 49 modified by 50 Haivision Systems Inc. 51 *****************************************************************************/ 52 53 #ifndef INC_SRT_BUFFER_H 54 #define INC_SRT_BUFFER_H 55 56 #include "udt.h" 57 #include "list.h" 58 #include "queue.h" 59 #include "tsbpd_time.h" 60 #include "utilities.h" 61 62 // The notation used for "circular numbers" in comments: 63 // The "cicrular numbers" are numbers that when increased up to the 64 // maximum become zero, and similarly, when the zero value is decreased, 65 // it turns into the maximum value minus one. This wrapping works the 66 // same for adding and subtracting. Circular numbers cannot be multiplied. 67 68 // Operations done on these numbers are marked with additional % character: 69 // a %> b : a is later than b 70 // a ++% (++%a) : shift a by 1 forward 71 // a +% b : shift a by b 72 // a == b : equality is same as for just numbers 73 74 /// The AvgBufSize class is used to calculate moving average of the buffer (RCV or SND) 75 class AvgBufSize 76 { 77 typedef srt::sync::steady_clock::time_point time_point; 78 79 public: AvgBufSize()80 AvgBufSize() 81 : m_dBytesCountMAvg(0.0) 82 , m_dCountMAvg(0.0) 83 , m_dTimespanMAvg(0.0) 84 { 85 } 86 87 public: 88 bool isTimeToUpdate(const time_point& now) const; 89 void update(const time_point& now, int pkts, int bytes, int timespan_ms); 90 91 public: pkts()92 inline double pkts() const { return m_dCountMAvg; } timespan_ms()93 inline double timespan_ms() const { return m_dTimespanMAvg; } bytes()94 inline double bytes() const { return m_dBytesCountMAvg; } 95 96 private: 97 time_point m_tsLastSamplingTime; 98 double m_dBytesCountMAvg; 99 double m_dCountMAvg; 100 double m_dTimespanMAvg; 101 }; 102 103 class CSndBuffer 104 { 105 typedef srt::sync::steady_clock::time_point time_point; 106 typedef srt::sync::steady_clock::duration duration; 107 108 public: 109 // XXX There's currently no way to access the socket ID set for 110 // whatever the buffer is currently working for. Required to find 111 // some way to do this, possibly by having a "reverse pointer". 112 // Currently just "unimplemented". CONID()113 std::string CONID() const { return ""; } 114 115 CSndBuffer(int size = 32, int mss = 1500); 116 ~CSndBuffer(); 117 118 public: 119 /// Insert a user buffer into the sending list. 120 /// For @a w_mctrl the following fields are used: 121 /// INPUT: 122 /// - msgttl: timeout for retransmitting the message, if lost 123 /// - inorder: request to deliver the message in order of sending 124 /// - srctime: local time as a base for packet's timestamp (0 if unused) 125 /// - pktseq: sequence number to be stamped on the packet (-1 if unused) 126 /// - msgno: message number to be stamped on the packet (-1 if unused) 127 /// OUTPUT: 128 /// - srctime: local time stamped on the packet (same as input, if input wasn't 0) 129 /// - pktseq: sequence number to be stamped on the next packet 130 /// - msgno: message number stamped on the packet 131 /// @param [in] data pointer to the user data block. 132 /// @param [in] len size of the block. 133 /// @param [inout] w_mctrl Message control data 134 void addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl); 135 136 /// Read a block of data from file and insert it into the sending list. 137 /// @param [in] ifs input file stream. 138 /// @param [in] len size of the block. 139 /// @return actual size of data added from the file. 140 int addBufferFromFile(std::fstream& ifs, int len); 141 142 /// Find data position to pack a DATA packet from the furthest reading point. 143 /// @param [out] data the pointer to the data position. 144 /// @param [out] msgno message number of the packet. 145 /// @param [out] origintime origin time stamp of the message 146 /// @param [in] kflags Odd|Even crypto key flag 147 /// @return Actual length of data read. 148 int readData(srt::CPacket& w_packet, time_point& w_origintime, int kflgs); 149 150 /// Find data position to pack a DATA packet for a retransmission. 151 /// @param [out] data the pointer to the data position. 152 /// @param [in] offset offset from the last ACK point (backward sequence number difference) 153 /// @param [out] msgno message number of the packet. 154 /// @param [out] origintime origin time stamp of the message 155 /// @param [out] msglen length of the message 156 /// @return Actual length of data read (return 0 if offset too large, -1 if TTL exceeded). 157 int readData(const int offset, srt::CPacket& w_packet, time_point& w_origintime, int& w_msglen); 158 159 /// Get the time of the last retransmission (if any) of the DATA packet. 160 /// @param [in] offset offset from the last ACK point (backward sequence number difference) 161 /// 162 /// @return Last time of the last retransmission event for the corresponding DATA packet. 163 time_point getPacketRexmitTime(const int offset); 164 165 /// Update the ACK point and may release/unmap/return the user data according to the flag. 166 /// @param [in] offset number of packets acknowledged. 167 int32_t getMsgNoAt(const int offset); 168 169 void ackData(int offset); 170 171 /// Read size of data still in the sending list. 172 /// @return Current size of the data in the sending list. 173 int getCurrBufSize() const; 174 175 int dropLateData(int& bytes, int32_t& w_first_msgno, const time_point& too_late_time); 176 177 void updAvgBufSize(const time_point& time); 178 int getAvgBufSize(int& bytes, int& timespan); 179 int getCurrBufSize(int& bytes, int& timespan); 180 getInRatePeriod()181 uint64_t getInRatePeriod() const { return m_InRatePeriod; } 182 183 /// Retrieve input bitrate in bytes per second getInputRate()184 int getInputRate() const { return m_iInRateBps; } 185 186 /// Update input rate calculation. 187 /// @param [in] time current time in microseconds 188 /// @param [in] pkts number of packets newly added to the buffer 189 /// @param [in] bytes number of payload bytes in those newly added packets 190 /// 191 /// @return Current size of the data in the sending list. 192 void updateInputRate(const time_point& time, int pkts = 0, int bytes = 0); 193 194 void resetInputRateSmpPeriod(bool disable = false) { setInputRateSmpPeriod(disable ? 0 : INPUTRATE_FAST_START_US); } 195 196 private: 197 void increase(); 198 void setInputRateSmpPeriod(int period); 199 200 struct Block; // Defined below 201 static time_point getSourceTime(const CSndBuffer::Block& block); 202 203 private: // Constants 204 static const uint64_t INPUTRATE_FAST_START_US = 500000; // 500 ms 205 static const uint64_t INPUTRATE_RUNNING_US = 1000000; // 1000 ms 206 static const int64_t INPUTRATE_MAX_PACKETS = 2000; // ~ 21 Mbps of 1316 bytes payload 207 static const int INPUTRATE_INITIAL_BYTESPS = BW_INFINITE; 208 209 private: 210 srt::sync::Mutex m_BufLock; // used to synchronize buffer operation 211 212 struct Block 213 { 214 char* m_pcData; // pointer to the data block 215 int m_iLength; // length of the block 216 217 int32_t m_iMsgNoBitset; // message number 218 int32_t m_iSeqNo; // sequence number for scheduling 219 time_point m_tsOriginTime; // original request time 220 time_point m_tsRexmitTime; // packet retransmission time 221 uint64_t m_llSourceTime_us; 222 int m_iTTL; // time to live (milliseconds) 223 224 Block* m_pNext; // next block 225 getMsgSeqBlock226 int32_t getMsgSeq() 227 { 228 // NOTE: this extracts message ID with regard to REXMIT flag. 229 // This is valid only for message ID that IS GENERATED in this instance, 230 // not provided by the peer. This can be otherwise sent to the peer - it doesn't matter 231 // for the peer that it uses LESS bits to represent the message. 232 return m_iMsgNoBitset & MSGNO_SEQ::mask; 233 } 234 235 } * m_pBlock, *m_pFirstBlock, *m_pCurrBlock, *m_pLastBlock; 236 237 // m_pBlock: The head pointer 238 // m_pFirstBlock: The first block 239 // m_pCurrBlock: The current block 240 // m_pLastBlock: The last block (if first == last, buffer is empty) 241 242 struct Buffer 243 { 244 char* m_pcData; // buffer 245 int m_iSize; // size 246 Buffer* m_pNext; // next buffer 247 } * m_pBuffer; // physical buffer 248 249 int32_t m_iNextMsgNo; // next message number 250 251 int m_iSize; // buffer size (number of packets) 252 int m_iMSS; // maximum seqment/packet size 253 254 int m_iCount; // number of used blocks 255 256 int m_iBytesCount; // number of payload bytes in queue 257 time_point m_tsLastOriginTime; 258 259 AvgBufSize m_mavg; 260 261 int m_iInRatePktsCount; // number of payload bytes added since InRateStartTime 262 int m_iInRateBytesCount; // number of payload bytes added since InRateStartTime 263 time_point m_tsInRateStartTime; 264 uint64_t m_InRatePeriod; // usec 265 int m_iInRateBps; // Input Rate in Bytes/sec 266 int m_iAvgPayloadSz; // Average packet payload size 267 268 private: 269 CSndBuffer(const CSndBuffer&); 270 CSndBuffer& operator=(const CSndBuffer&); 271 }; 272 273 //////////////////////////////////////////////////////////////////////////////// 274 275 class CRcvBuffer 276 { 277 typedef srt::sync::steady_clock::time_point time_point; 278 typedef srt::sync::steady_clock::duration duration; 279 280 public: 281 // XXX There's currently no way to access the socket ID set for 282 // whatever the queue is currently working for. Required to find 283 // some way to do this, possibly by having a "reverse pointer". 284 // Currently just "unimplemented". CONID()285 std::string CONID() const { return ""; } 286 287 static const int DEFAULT_SIZE = 65536; 288 /// Construct the buffer. 289 /// @param [in] queue CUnitQueue that actually holds the units (packets) 290 /// @param [in] bufsize_pkts in units (packets) 291 CRcvBuffer(srt::CUnitQueue* queue, int bufsize_pkts = DEFAULT_SIZE); 292 ~CRcvBuffer(); 293 294 public: 295 /// Write data into the buffer. 296 /// @param [in] unit pointer to a data unit containing new packet 297 /// @param [in] offset offset from last ACK point. 298 /// @return 0 is success, -1 if data is repeated. 299 int addData(srt::CUnit* unit, int offset); 300 301 /// Read data into a user buffer. 302 /// @param [in] data pointer to user buffer. 303 /// @param [in] len length of user buffer. 304 /// @return size of data read. 305 int readBuffer(char* data, int len); 306 307 /// Read data directly into file. 308 /// @param [in] file C++ file stream. 309 /// @param [in] len expected length of data to write into the file. 310 /// @return size of data read. 311 int readBufferToFile(std::fstream& ofs, int len); 312 313 /// Update the ACK point of the buffer. 314 /// @param [in] len number of units to be acknowledged. 315 /// @return 1 if a user buffer is fulfilled, otherwise 0. 316 int ackData(int len); 317 318 /// Query how many buffer space left for data receiving. 319 /// Actually only acknowledged packets, that are still in the buffer, 320 /// are considered to take buffer space. 321 /// 322 /// @return size of available buffer space (including user buffer) for data receiving. 323 /// Not counting unacknowledged packets. 324 int getAvailBufSize() const; 325 326 /// Query how many data has been continuously received (for reading) and ready to play (tsbpdtime < now). 327 /// @return size of valid (continous) data for reading. 328 int getRcvDataSize() const; 329 330 /// Query how many data was received and acknowledged. 331 /// @param [out] bytes bytes 332 /// @param [out] spantime spantime 333 /// @return size in pkts of acked data. 334 int getRcvDataSize(int& bytes, int& spantime); 335 336 /// Query a 1 sec moving average of how many data was received and acknowledged. 337 /// @param [out] bytes bytes 338 /// @param [out] spantime spantime 339 /// @return size in pkts of acked data. 340 int getRcvAvgDataSize(int& bytes, int& spantime); 341 342 /// Query how many data of the receive buffer is acknowledged. 343 /// @param [in] now current time in us. 344 /// @return none. 345 void updRcvAvgDataSize(const time_point& now); 346 347 /// Query the received average payload size. 348 /// @return size (bytes) of payload size 349 unsigned getRcvAvgPayloadSize() const; 350 351 struct ReadingState 352 { 353 time_point tsStart; 354 time_point tsLastAck; 355 time_point tsEnd; 356 int iNumAcknowledged; 357 int iNumUnacknowledged; 358 }; 359 360 ReadingState debugGetReadingState() const; 361 362 /// Form a string of the current buffer fullness state. 363 /// number of packets acknowledged, TSBPD readiness, etc. 364 std::string strFullnessState(const time_point& tsNow) const; 365 366 /// Mark the message to be dropped from the message list. 367 /// @param [in] msgno message number. 368 /// @param [in] using_rexmit_flag whether the MSGNO field uses rexmit flag (if not, one more bit is part of the 369 /// msgno value) 370 void dropMsg(int32_t msgno, bool using_rexmit_flag); 371 372 /// read a message. 373 /// @param [out] data buffer to write the message into. 374 /// @param [in] len size of the buffer. 375 /// @return actuall size of data read. 376 int readMsg(char* data, int len); 377 378 #if ENABLE_HEAVY_LOGGING 379 void readMsgHeavyLogging(int p); 380 #endif 381 382 /// read a message. 383 /// @param [out] data buffer to write the message into. 384 /// @param [in] len size of the buffer. 385 /// @param [out] tsbpdtime localtime-based (uSec) packet time stamp including buffering delay 386 /// @return actuall size of data read. 387 int readMsg(char* data, int len, SRT_MSGCTRL& w_mctrl, int upto); 388 389 /// Query if data is ready to read (tsbpdtime <= now if TsbPD is active). 390 /// @param [out] tsbpdtime localtime-based (uSec) packet time stamp including buffering delay 391 /// of next packet in recv buffer, ready or not. 392 /// @param [out] curpktseq Sequence number of the packet if there is one ready to play 393 /// @return true if ready to play, false otherwise (tsbpdtime may be !0 in 394 /// both cases). 395 bool isRcvDataReady(time_point& w_tsbpdtime, int32_t& w_curpktseq, int32_t seqdistance); 396 397 #ifdef SRT_DEBUG_TSBPD_OUTJITTER 398 void debugTraceJitter(time_point t); 399 #else debugTraceJitter(time_point)400 void debugTraceJitter(time_point) {} 401 #endif /* SRT_DEBUG_TSBPD_OUTJITTER */ 402 403 bool isRcvDataReady(); isRcvDataAvailable()404 bool isRcvDataAvailable() { return m_iLastAckPos != m_iStartPos; } 405 srt::CPacket* getRcvReadyPacket(int32_t seqdistance); 406 407 /// Set TimeStamp-Based Packet Delivery Rx Mode 408 /// @param [in] timebase localtime base (uSec) of packet time stamps including buffering delay 409 /// @param [in] delay aggreed TsbPD delay 410 void setRcvTsbPdMode(const time_point& timebase, const duration& delay); 411 412 /// Add packet timestamp for drift caclculation and compensation 413 /// @param [in] timestamp packet time stamp 414 /// @param [in] rtt RTT sample 415 bool addRcvTsbPdDriftSample(uint32_t timestamp, int rtt); 416 417 #ifdef SRT_DEBUG_TSBPD_DRIFT 418 void printDriftHistogram(int64_t iDrift); 419 void printDriftOffset(int tsbPdOffset, int tsbPdDriftAvg); 420 #endif 421 422 /// Get information on the 1st message in queue. 423 // Parameters (of the 1st packet queue, ready to play or not): 424 /// @param [out] w_tsbpdtime localtime-based (uSec) packet time stamp including buffering delay of 1st packet or 0 425 /// if none 426 /// @param [out] w_passack true if 1st ready packet is not yet acknowleged (allowed to be delivered to the app) 427 /// @param [out] w_skipseqno SRT_SEQNO_NONE or seq number of 1st unacknowledged pkt ready to play preceeded by 428 /// missing packets. 429 /// @param base_seq SRT_SEQNO_NONE or desired, ignore seq smaller than base if exist packet ready-to-play 430 /// and larger than base 431 /// @retval true 1st packet ready to play (tsbpdtime <= now). Not yet acknowledged if passack == true 432 /// @retval false IF tsbpdtime = 0: rcv buffer empty; ELSE: 433 /// IF skipseqno != SRT_SEQNO_NONE, packet ready to play preceeded by missing packets.; 434 /// IF skipseqno == SRT_SEQNO_NONE, no missing packet but 1st not ready to play. 435 bool getRcvFirstMsg(time_point& w_tsbpdtime, 436 bool& w_passack, 437 int32_t& w_skipseqno, 438 int32_t& w_curpktseq, 439 int32_t base_seq = SRT_SEQNO_NONE); 440 441 /// Update the ACK point of the buffer. 442 /// @param [in] len size of data to be skip & acknowledged. 443 void skipData(int len); 444 445 #if ENABLE_HEAVY_LOGGING 446 void reportBufferStats() const; // Heavy logging Debug only 447 #endif empty()448 bool empty() const 449 { 450 // This will not always return the intended value, 451 // that is, it may return false when the buffer really is 452 // empty - but it will return true then in one of next calls. 453 // This function will be always called again at some point 454 // if it returned false, and on true the connection 455 // is going to be broken - so this behavior is acceptable. 456 return m_iStartPos == m_iLastAckPos; 457 } full()458 bool full() const { return m_iStartPos == (m_iLastAckPos + 1) % m_iSize; } capacity()459 int capacity() const { return m_iSize; } 460 461 private: 462 /// This gives up unit at index p. The unit is given back to the 463 /// free unit storage for further assignment for the new incoming 464 /// data. freeUnitAt(size_t p)465 size_t freeUnitAt(size_t p) 466 { 467 srt::CUnit* u = m_pUnit[p]; 468 m_pUnit[p] = NULL; 469 size_t rmbytes = u->m_Packet.getLength(); 470 m_pUnitQueue->makeUnitFree(u); 471 return rmbytes; 472 } 473 474 /// Adjust receive queue to 1st ready to play message (tsbpdtime < now). 475 /// Parameters (of the 1st packet queue, ready to play or not): 476 /// @param [out] tsbpdtime localtime-based (uSec) packet time stamp including buffering delay of 1st packet or 0 if 477 /// none 478 /// @param base_seq SRT_SEQNO_NONE or desired, ignore seq smaller than base 479 /// @retval true 1st packet ready to play without discontinuity (no hole) 480 /// @retval false tsbpdtime = 0: no packet ready to play 481 bool getRcvReadyMsg(time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto, int base_seq = SRT_SEQNO_NONE); 482 483 public: 484 /// @brief Get clock drift in microseconds. getDrift()485 int64_t getDrift() const { return m_tsbpd.drift(); } 486 487 public: 488 int32_t getTopMsgno() const; 489 490 void getInternalTimeBase(time_point& w_tb, bool& w_wrp, duration& w_udrift); 491 492 void applyGroupTime(const time_point& timebase, bool wrapcheck, uint32_t delay, const duration& udrift); 493 void applyGroupDrift(const time_point& timebase, bool wrapcheck, const duration& udrift); 494 time_point getPktTsbPdTime(uint32_t timestamp); 495 int debugGetSize() const; 496 time_point debugGetDeliveryTime(int offset); 497 498 size_t dropData(int len); 499 500 private: 501 int extractData(char* data, int len, int p, int q, bool passack); 502 bool accessMsg(int& w_p, int& w_q, bool& w_passack, int64_t& w_playtime, int upto); 503 504 /// Describes the state of the first N packets 505 std::string debugTimeState(size_t first_n_pkts) const; 506 507 /// thread safe bytes counter of the Recv & Ack buffer 508 /// @param [in] pkts acked or removed pkts from rcv buffer (used with acked = true) 509 /// @param [in] bytes number of bytes added/delete (if negative) to/from rcv buffer. 510 /// @param [in] acked true when adding new pkt in RcvBuffer; false when acking/removing pkts to/from buffer 511 void countBytes(int pkts, int bytes, bool acked = false); 512 513 private: 514 bool scanMsg(int& w_start, int& w_end, bool& w_passack); 515 shift(int basepos,int shift)516 int shift(int basepos, int shift) const { return (basepos + shift) % m_iSize; } 517 518 /// Simplified versions with ++ and --; avoid using division instruction shiftFwd(int basepos)519 int shiftFwd(int basepos) const 520 { 521 if (++basepos == m_iSize) 522 return 0; 523 return basepos; 524 } 525 shiftBack(int basepos)526 int shiftBack(int basepos) const 527 { 528 if (basepos == 0) 529 return m_iSize - 1; 530 return --basepos; 531 } 532 533 private: 534 srt::CUnit** m_pUnit; // Array of pointed units collected in the buffer 535 const int m_iSize; // Size of the internal array of CUnit* items 536 srt::CUnitQueue* m_pUnitQueue; // the shared unit queue 537 538 int m_iStartPos; // HEAD: first packet available for reading 539 int m_iLastAckPos; // the last ACKed position (exclusive), follows the last readable 540 // EMPTY: m_iStartPos = m_iLastAckPos FULL: m_iStartPos = m_iLastAckPos + 1 541 int m_iMaxPos; // delta between acked-TAIL and reception-TAIL 542 543 int m_iNotch; // the starting read point of the first unit 544 // (this is required for stream reading mode; it's 545 // the position in the first unit in the list 546 // up to which data are already retrieved; 547 // in message reading mode it's unused and always 0) 548 549 srt::sync::Mutex m_BytesCountLock; // used to protect counters operations 550 int m_iBytesCount; // Number of payload bytes in the buffer 551 int m_iAckedPktsCount; // Number of acknowledged pkts in the buffer 552 int m_iAckedBytesCount; // Number of acknowledged payload bytes in the buffer 553 unsigned m_uAvgPayloadSz; // Average payload size for dropped bytes estimation 554 555 srt::CTsbpdTime m_tsbpd; 556 557 AvgBufSize m_mavg; 558 559 private: 560 CRcvBuffer(); 561 CRcvBuffer(const CRcvBuffer&); 562 CRcvBuffer& operator=(const CRcvBuffer&); 563 }; 564 565 #endif 566