1 /*
2 * SRT - Secure, Reliable, Transport
3 * Copyright (c) 2018 Haivision Systems Inc.
4 *
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 *
9 */
10
11 /*****************************************************************************
12 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
13 All rights reserved.
14
15 Redistribution and use in source and binary forms, with or without
16 modification, are permitted provided that the following conditions are
17 met:
18
19 * Redistributions of source code must retain the above
20 copyright notice, this list of conditions and the
21 following disclaimer.
22
23 * Redistributions in binary form must reproduce the
24 above copyright notice, this list of conditions
25 and the following disclaimer in the documentation
26 and/or other materials provided with the distribution.
27
28 * Neither the name of the University of Illinois
29 nor the names of its contributors may be used to
30 endorse or promote products derived from this
31 software without specific prior written permission.
32
33 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
34 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
35 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
36 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
37 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
38 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
39 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
40 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
41 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
42 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
43 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
44 *****************************************************************************/
45
46 /*****************************************************************************
47 written by
48 Yunhong Gu, last updated 03/12/2011
49 modified by
50 Haivision Systems Inc.
51 *****************************************************************************/
52
53 #include "platform_sys.h"
54
55 #include <cstring>
56 #include <cmath>
57 #include "buffer.h"
58 #include "packet.h"
59 #include "core.h" // provides some constants
60 #include "logging.h"
61
62 using namespace std;
63 using namespace srt_logging;
64 using namespace srt;
65 using namespace srt::sync;
66
67 // You can change this value at build config by using "ENFORCE" options.
68 #if !defined(SRT_MAVG_SAMPLING_RATE)
69 #define SRT_MAVG_SAMPLING_RATE 40
70 #endif
71
isTimeToUpdate(const time_point & now) const72 bool AvgBufSize::isTimeToUpdate(const time_point& now) const
73 {
74 const int usMAvgBasePeriod = 1000000; // 1s in microseconds
75 const int us2ms = 1000;
76 const int msMAvgPeriod = (usMAvgBasePeriod / SRT_MAVG_SAMPLING_RATE) / us2ms;
77 const uint64_t elapsed_ms = count_milliseconds(now - m_tsLastSamplingTime); // ms since last sampling
78 return (elapsed_ms >= msMAvgPeriod);
79 }
80
update(const steady_clock::time_point & now,int pkts,int bytes,int timespan_ms)81 void AvgBufSize::update(const steady_clock::time_point& now, int pkts, int bytes, int timespan_ms)
82 {
83 const uint64_t elapsed_ms = count_milliseconds(now - m_tsLastSamplingTime); // ms since last sampling
84 m_tsLastSamplingTime = now;
85 const uint64_t one_second_in_ms = 1000;
86 if (elapsed_ms > one_second_in_ms)
87 {
88 // No sampling in last 1 sec, initialize average
89 m_dCountMAvg = pkts;
90 m_dBytesCountMAvg = bytes;
91 m_dTimespanMAvg = timespan_ms;
92 return;
93 }
94
95 //
96 // weight last average value between -1 sec and last sampling time (LST)
97 // and new value between last sampling time and now
98 // |elapsed_ms|
99 // +----------------------------------+-------+
100 // -1 LST 0(now)
101 //
102 m_dCountMAvg = avg_iir_w<1000, double>(m_dCountMAvg, pkts, elapsed_ms);
103 m_dBytesCountMAvg = avg_iir_w<1000, double>(m_dBytesCountMAvg, bytes, elapsed_ms);
104 m_dTimespanMAvg = avg_iir_w<1000, double>(m_dTimespanMAvg, timespan_ms, elapsed_ms);
105 }
106
round_val(double val)107 int round_val(double val)
108 {
109 return static_cast<int>(round(val));
110 }
111
CSndBuffer(int size,int mss)112 CSndBuffer::CSndBuffer(int size, int mss)
113 : m_BufLock()
114 , m_pBlock(NULL)
115 , m_pFirstBlock(NULL)
116 , m_pCurrBlock(NULL)
117 , m_pLastBlock(NULL)
118 , m_pBuffer(NULL)
119 , m_iNextMsgNo(1)
120 , m_iSize(size)
121 , m_iMSS(mss)
122 , m_iCount(0)
123 , m_iBytesCount(0)
124 , m_iInRatePktsCount(0)
125 , m_iInRateBytesCount(0)
126 , m_InRatePeriod(INPUTRATE_FAST_START_US) // 0.5 sec (fast start)
127 , m_iInRateBps(INPUTRATE_INITIAL_BYTESPS)
128 {
129 // initial physical buffer of "size"
130 m_pBuffer = new Buffer;
131 m_pBuffer->m_pcData = new char[m_iSize * m_iMSS];
132 m_pBuffer->m_iSize = m_iSize;
133 m_pBuffer->m_pNext = NULL;
134
135 // circular linked list for out bound packets
136 m_pBlock = new Block;
137 Block* pb = m_pBlock;
138 for (int i = 1; i < m_iSize; ++i)
139 {
140 pb->m_pNext = new Block;
141 pb->m_iMsgNoBitset = 0;
142 pb = pb->m_pNext;
143 }
144 pb->m_pNext = m_pBlock;
145
146 pb = m_pBlock;
147 char* pc = m_pBuffer->m_pcData;
148 for (int i = 0; i < m_iSize; ++i)
149 {
150 pb->m_pcData = pc;
151 pb = pb->m_pNext;
152 pc += m_iMSS;
153 }
154
155 m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;
156
157 setupMutex(m_BufLock, "Buf");
158 }
159
~CSndBuffer()160 CSndBuffer::~CSndBuffer()
161 {
162 Block* pb = m_pBlock->m_pNext;
163 while (pb != m_pBlock)
164 {
165 Block* temp = pb;
166 pb = pb->m_pNext;
167 delete temp;
168 }
169 delete m_pBlock;
170
171 while (m_pBuffer != NULL)
172 {
173 Buffer* temp = m_pBuffer;
174 m_pBuffer = m_pBuffer->m_pNext;
175 delete[] temp->m_pcData;
176 delete temp;
177 }
178
179 releaseMutex(m_BufLock);
180 }
181
addBuffer(const char * data,int len,SRT_MSGCTRL & w_mctrl)182 void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
183 {
184 int32_t& w_msgno = w_mctrl.msgno;
185 int32_t& w_seqno = w_mctrl.pktseq;
186 int64_t& w_srctime = w_mctrl.srctime;
187 const int& ttl = w_mctrl.msgttl;
188 int size = len / m_iMSS;
189 if ((len % m_iMSS) != 0)
190 size++;
191
192 HLOGC(bslog.Debug,
193 log << "addBuffer: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for "
194 << len << " bytes");
195
196 // dynamically increase sender buffer
197 while (size + m_iCount >= m_iSize)
198 {
199 HLOGC(bslog.Debug, log << "addBuffer: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
200 increase();
201 }
202
203 const steady_clock::time_point time = steady_clock::now();
204 const int32_t inorder = w_mctrl.inorder ? MSGNO_PACKET_INORDER::mask : 0;
205
206 HLOGC(bslog.Debug,
207 log << CONID() << "addBuffer: adding " << size << " packets (" << len << " bytes) to send, msgno="
208 << (w_msgno > 0 ? w_msgno : m_iNextMsgNo) << (inorder ? "" : " NOT") << " in order");
209
210 // The sequence number passed to this function is the sequence number
211 // that the very first packet from the packet series should get here.
212 // If there's more than one packet, this function must increase it by itself
213 // and then return the accordingly modified sequence number in the reference.
214
215 Block* s = m_pLastBlock;
216
217 if (w_msgno == SRT_MSGNO_NONE) // DEFAULT-UNCHANGED msgno supplied
218 {
219 HLOGC(bslog.Debug, log << "addBuffer: using internally managed msgno=" << m_iNextMsgNo);
220 w_msgno = m_iNextMsgNo;
221 }
222 else
223 {
224 HLOGC(bslog.Debug, log << "addBuffer: OVERWRITTEN by msgno supplied by caller: msgno=" << w_msgno);
225 m_iNextMsgNo = w_msgno;
226 }
227
228 for (int i = 0; i < size; ++i)
229 {
230 int pktlen = len - i * m_iMSS;
231 if (pktlen > m_iMSS)
232 pktlen = m_iMSS;
233
234 HLOGC(bslog.Debug,
235 log << "addBuffer: %" << w_seqno << " #" << w_msgno << " spreading from=" << (i * m_iMSS)
236 << " size=" << pktlen << " TO BUFFER:" << (void*)s->m_pcData);
237 memcpy((s->m_pcData), data + i * m_iMSS, pktlen);
238 s->m_iLength = pktlen;
239
240 s->m_iSeqNo = w_seqno;
241 w_seqno = CSeqNo::incseq(w_seqno);
242
243 s->m_iMsgNoBitset = m_iNextMsgNo | inorder;
244 if (i == 0)
245 s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
246 if (i == size - 1)
247 s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
248 // NOTE: if i is neither 0 nor size-1, it resuls with PB_SUBSEQUENT.
249 // if i == 0 == size-1, it results with PB_SOLO.
250 // Packets assigned to one message can be:
251 // [PB_FIRST] [PB_SUBSEQUENT] [PB_SUBSEQUENT] [PB_LAST] - 4 packets per message
252 // [PB_FIRST] [PB_LAST] - 2 packets per message
253 // [PB_SOLO] - 1 packet per message
254
255 s->m_llSourceTime_us = w_srctime;
256 s->m_tsOriginTime = time;
257 s->m_tsRexmitTime = time_point();
258 s->m_iTTL = ttl;
259 // Rewrite the actual sending time back into w_srctime
260 // so that the calling facilities can reuse it
261 if (!w_srctime)
262 w_srctime = count_microseconds(s->m_tsOriginTime.time_since_epoch());
263
264 // XXX unchecked condition: s->m_pNext == NULL.
265 // Should never happen, as the call to increase() should ensure enough buffers.
266 SRT_ASSERT(s->m_pNext);
267 s = s->m_pNext;
268 }
269 m_pLastBlock = s;
270
271 enterCS(m_BufLock);
272 m_iCount += size;
273
274 m_iBytesCount += len;
275 m_tsLastOriginTime = time;
276
277 updateInputRate(time, size, len);
278
279 updAvgBufSize(time);
280
281 leaveCS(m_BufLock);
282
283 // MSGNO_SEQ::mask has a form: 00000011111111...
284 // At least it's known that it's from some index inside til the end (to bit 0).
285 // If this value has been reached in a step of incrementation, it means that the
286 // maximum value has been reached. Casting to int32_t to ensure the same sign
287 // in comparison, although it's far from reaching the sign bit.
288
289 const int nextmsgno = ++MsgNo(m_iNextMsgNo);
290 HLOGC(bslog.Debug, log << "CSndBuffer::addBuffer: updating msgno: #" << m_iNextMsgNo << " -> #" << nextmsgno);
291 m_iNextMsgNo = nextmsgno;
292 }
293
setInputRateSmpPeriod(int period)294 void CSndBuffer::setInputRateSmpPeriod(int period)
295 {
296 m_InRatePeriod = (uint64_t)period; //(usec) 0=no input rate calculation
297 }
298
updateInputRate(const steady_clock::time_point & time,int pkts,int bytes)299 void CSndBuffer::updateInputRate(const steady_clock::time_point& time, int pkts, int bytes)
300 {
301 // no input rate calculation
302 if (m_InRatePeriod == 0)
303 return;
304
305 if (is_zero(m_tsInRateStartTime))
306 {
307 m_tsInRateStartTime = time;
308 return;
309 }
310
311 m_iInRatePktsCount += pkts;
312 m_iInRateBytesCount += bytes;
313
314 // Trigger early update in fast start mode
315 const bool early_update = (m_InRatePeriod < INPUTRATE_RUNNING_US) && (m_iInRatePktsCount > INPUTRATE_MAX_PACKETS);
316
317 const uint64_t period_us = count_microseconds(time - m_tsInRateStartTime);
318 if (early_update || period_us > m_InRatePeriod)
319 {
320 // Required Byte/sec rate (payload + headers)
321 m_iInRateBytesCount += (m_iInRatePktsCount * srt::CPacket::SRT_DATA_HDR_SIZE);
322 m_iInRateBps = (int)(((int64_t)m_iInRateBytesCount * 1000000) / period_us);
323 HLOGC(bslog.Debug,
324 log << "updateInputRate: pkts:" << m_iInRateBytesCount << " bytes:" << m_iInRatePktsCount
325 << " rate=" << (m_iInRateBps * 8) / 1000 << "kbps interval=" << period_us);
326 m_iInRatePktsCount = 0;
327 m_iInRateBytesCount = 0;
328 m_tsInRateStartTime = time;
329
330 setInputRateSmpPeriod(INPUTRATE_RUNNING_US);
331 }
332 }
333
addBufferFromFile(fstream & ifs,int len)334 int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
335 {
336 int size = len / m_iMSS;
337 if ((len % m_iMSS) != 0)
338 size++;
339
340 HLOGC(bslog.Debug,
341 log << "addBufferFromFile: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size
342 << " buffers for " << len << " bytes");
343
344 // dynamically increase sender buffer
345 while (size + m_iCount >= m_iSize)
346 {
347 HLOGC(bslog.Debug,
348 log << "addBufferFromFile: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
349 increase();
350 }
351
352 HLOGC(bslog.Debug,
353 log << CONID() << "addBufferFromFile: adding " << size << " packets (" << len
354 << " bytes) to send, msgno=" << m_iNextMsgNo);
355
356 Block* s = m_pLastBlock;
357 int total = 0;
358 for (int i = 0; i < size; ++i)
359 {
360 if (ifs.bad() || ifs.fail() || ifs.eof())
361 break;
362
363 int pktlen = len - i * m_iMSS;
364 if (pktlen > m_iMSS)
365 pktlen = m_iMSS;
366
367 HLOGC(bslog.Debug,
368 log << "addBufferFromFile: reading from=" << (i * m_iMSS) << " size=" << pktlen
369 << " TO BUFFER:" << (void*)s->m_pcData);
370 ifs.read(s->m_pcData, pktlen);
371 if ((pktlen = int(ifs.gcount())) <= 0)
372 break;
373
374 // currently file transfer is only available in streaming mode, message is always in order, ttl = infinite
375 s->m_iMsgNoBitset = m_iNextMsgNo | MSGNO_PACKET_INORDER::mask;
376 if (i == 0)
377 s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
378 if (i == size - 1)
379 s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
380 // NOTE: PB_FIRST | PB_LAST == PB_SOLO.
381 // none of PB_FIRST & PB_LAST == PB_SUBSEQUENT.
382
383 s->m_iLength = pktlen;
384 s->m_iTTL = SRT_MSGTTL_INF;
385 s = s->m_pNext;
386
387 total += pktlen;
388 }
389 m_pLastBlock = s;
390
391 enterCS(m_BufLock);
392 m_iCount += size;
393 m_iBytesCount += total;
394
395 leaveCS(m_BufLock);
396
397 m_iNextMsgNo++;
398 if (m_iNextMsgNo == int32_t(MSGNO_SEQ::mask))
399 m_iNextMsgNo = 1;
400
401 return total;
402 }
403
getSourceTime(const CSndBuffer::Block & block)404 steady_clock::time_point CSndBuffer::getSourceTime(const CSndBuffer::Block& block)
405 {
406 if (block.m_llSourceTime_us)
407 {
408 return steady_clock::time_point() + microseconds_from(block.m_llSourceTime_us);
409 }
410
411 return block.m_tsOriginTime;
412 }
413
readData(srt::CPacket & w_packet,steady_clock::time_point & w_srctime,int kflgs)414 int CSndBuffer::readData(srt::CPacket& w_packet, steady_clock::time_point& w_srctime, int kflgs)
415 {
416 // No data to read
417 if (m_pCurrBlock == m_pLastBlock)
418 return 0;
419
420 // Make the packet REFLECT the data stored in the buffer.
421 w_packet.m_pcData = m_pCurrBlock->m_pcData;
422 int readlen = m_pCurrBlock->m_iLength;
423 w_packet.setLength(readlen);
424 w_packet.m_iSeqNo = m_pCurrBlock->m_iSeqNo;
425
426 // XXX This is probably done because the encryption should happen
427 // just once, and so this sets the encryption flags to both msgno bitset
428 // IN THE PACKET and IN THE BLOCK. This is probably to make the encryption
429 // happen at the time when scheduling a new packet to send, but the packet
430 // must remain in the send buffer until it's ACKed. For the case of rexmit
431 // the packet will be taken "as is" (that is, already encrypted).
432 //
433 // The problem is in the order of things:
434 // 0. When the application stores the data, some of the flags for PH_MSGNO are set.
435 // 1. The readData() is called to get the original data sent by the application.
436 // 2. The data are original and must be encrypted. They WILL BE encrypted, later.
437 // 3. So far we are in readData() so the encryption flags must be updated NOW because
438 // later we won't have access to the block's data.
439 // 4. After exiting from readData(), the packet is being encrypted. It's immediately
440 // sent, however the data must remain in the sending buffer until they are ACKed.
441 // 5. In case when rexmission is needed, the second overloaded version of readData
442 // is being called, and the buffer + PH_MSGNO value is extracted. All interesting
443 // flags must be present and correct at that time.
444 //
445 // The only sensible way to fix this problem is to encrypt the packet not after
446 // extracting from here, but when the packet is stored into CSndBuffer. The appropriate
447 // flags for PH_MSGNO will be applied directly there. Then here the value for setting
448 // PH_MSGNO will be set as is.
449
450 if (kflgs == -1)
451 {
452 HLOGC(bslog.Debug, log << CONID() << " CSndBuffer: ERROR: encryption required and not possible. NOT SENDING.");
453 readlen = 0;
454 }
455 else
456 {
457 m_pCurrBlock->m_iMsgNoBitset |= MSGNO_ENCKEYSPEC::wrap(kflgs);
458 }
459
460 w_packet.m_iMsgNo = m_pCurrBlock->m_iMsgNoBitset;
461 w_srctime = getSourceTime(*m_pCurrBlock);
462 m_pCurrBlock = m_pCurrBlock->m_pNext;
463
464 HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send");
465
466 return readlen;
467 }
468
getMsgNoAt(const int offset)469 int32_t CSndBuffer::getMsgNoAt(const int offset)
470 {
471 ScopedLock bufferguard(m_BufLock);
472
473 Block* p = m_pFirstBlock;
474
475 if (p)
476 {
477 HLOGC(bslog.Debug,
478 log << "CSndBuffer::getMsgNoAt: FIRST MSG: size=" << p->m_iLength << " %" << p->m_iSeqNo << " #"
479 << p->getMsgSeq() << " !" << BufferStamp(p->m_pcData, p->m_iLength));
480 }
481
482 if (offset >= m_iCount)
483 {
484 // Prevent accessing the last "marker" block
485 LOGC(bslog.Error,
486 log << "CSndBuffer::getMsgNoAt: IPE: offset=" << offset << " not found, max offset=" << m_iCount);
487 return SRT_MSGNO_CONTROL;
488 }
489
490 // XXX Suboptimal procedure to keep the blocks identifiable
491 // by sequence number. Consider using some circular buffer.
492 int i;
493 Block* ee SRT_ATR_UNUSED = 0;
494 for (i = 0; i < offset && p; ++i)
495 {
496 ee = p;
497 p = p->m_pNext;
498 }
499
500 if (!p)
501 {
502 LOGC(bslog.Error,
503 log << "CSndBuffer::getMsgNoAt: IPE: offset=" << offset << " not found, stopped at " << i << " with #"
504 << (ee ? ee->getMsgSeq() : SRT_MSGNO_NONE));
505 return SRT_MSGNO_CONTROL;
506 }
507
508 HLOGC(bslog.Debug,
509 log << "CSndBuffer::getMsgNoAt: offset=" << offset << " found, size=" << p->m_iLength << " %" << p->m_iSeqNo
510 << " #" << p->getMsgSeq() << " !" << BufferStamp(p->m_pcData, p->m_iLength));
511
512 return p->getMsgSeq();
513 }
514
readData(const int offset,srt::CPacket & w_packet,steady_clock::time_point & w_srctime,int & w_msglen)515 int CSndBuffer::readData(const int offset, srt::CPacket& w_packet, steady_clock::time_point& w_srctime, int& w_msglen)
516 {
517 int32_t& msgno_bitset = w_packet.m_iMsgNo;
518
519 ScopedLock bufferguard(m_BufLock);
520
521 Block* p = m_pFirstBlock;
522
523 // XXX Suboptimal procedure to keep the blocks identifiable
524 // by sequence number. Consider using some circular buffer.
525 for (int i = 0; i < offset && p != m_pLastBlock; ++i)
526 {
527 p = p->m_pNext;
528 }
529 if (p == m_pLastBlock)
530 {
531 LOGC(qslog.Error, log << "CSndBuffer::readData: offset " << offset << " too large!");
532 return 0;
533 }
534 #if ENABLE_HEAVY_LOGGING
535 const int32_t first_seq = p->m_iSeqNo;
536 int32_t last_seq = p->m_iSeqNo;
537 #endif
538
539 // Check if the block that is the next candidate to send (m_pCurrBlock pointing) is stale.
540
541 // If so, then inform the caller that it should first take care of the whole
542 // message (all blocks with that message id). Shift the m_pCurrBlock pointer
543 // to the position past the last of them. Then return -1 and set the
544 // msgno_bitset return reference to the message id that should be dropped as
545 // a whole.
546
547 // After taking care of that, the caller should immediately call this function again,
548 // this time possibly in order to find the real data to be sent.
549
550 // if found block is stale
551 // (This is for messages that have declared TTL - messages that fail to be sent
552 // before the TTL defined time comes, will be dropped).
553
554 if ((p->m_iTTL >= 0) && (count_milliseconds(steady_clock::now() - p->m_tsOriginTime) > p->m_iTTL))
555 {
556 int32_t msgno = p->getMsgSeq();
557 w_msglen = 1;
558 p = p->m_pNext;
559 bool move = false;
560 while (p != m_pLastBlock && msgno == p->getMsgSeq())
561 {
562 #if ENABLE_HEAVY_LOGGING
563 last_seq = p->m_iSeqNo;
564 #endif
565 if (p == m_pCurrBlock)
566 move = true;
567 p = p->m_pNext;
568 if (move)
569 m_pCurrBlock = p;
570 w_msglen++;
571 }
572
573 HLOGC(qslog.Debug,
574 log << "CSndBuffer::readData: due to TTL exceeded, SEQ " << first_seq << " - " << last_seq << ", "
575 << w_msglen << " packets to drop, msgno=" << msgno);
576
577 // If readData returns -1, then msgno_bitset is understood as a Message ID to drop.
578 // This means that in this case it should be written by the message sequence value only
579 // (not the whole 4-byte bitset written at PH_MSGNO).
580 msgno_bitset = msgno;
581 return -1;
582 }
583
584 w_packet.m_pcData = p->m_pcData;
585 int readlen = p->m_iLength;
586 w_packet.setLength(readlen);
587
588 // XXX Here the value predicted to be applied to PH_MSGNO field is extracted.
589 // As this function is predicted to extract the data to send as a rexmited packet,
590 // the packet must be in the form ready to send - so, in case of encryption,
591 // encrypted, and with all ENC flags already set. So, the first call to send
592 // the packet originally (the other overload of this function) must set these
593 // flags.
594 w_packet.m_iMsgNo = p->m_iMsgNoBitset;
595 w_srctime = getSourceTime(*p);
596
597 // This function is called when packet retransmission is triggered.
598 // Therefore we are setting the rexmit time.
599 p->m_tsRexmitTime = steady_clock::now();
600
601 HLOGC(qslog.Debug,
602 log << CONID() << "CSndBuffer: getting packet %" << p->m_iSeqNo << " as per %" << w_packet.m_iSeqNo
603 << " size=" << readlen << " to send [REXMIT]");
604
605 return readlen;
606 }
607
getPacketRexmitTime(const int offset)608 srt::sync::steady_clock::time_point CSndBuffer::getPacketRexmitTime(const int offset)
609 {
610 ScopedLock bufferguard(m_BufLock);
611 const Block* p = m_pFirstBlock;
612
613 // XXX Suboptimal procedure to keep the blocks identifiable
614 // by sequence number. Consider using some circular buffer.
615 for (int i = 0; i < offset; ++i)
616 {
617 SRT_ASSERT(p);
618 p = p->m_pNext;
619 }
620
621 SRT_ASSERT(p);
622 return p->m_tsRexmitTime;
623 }
624
ackData(int offset)625 void CSndBuffer::ackData(int offset)
626 {
627 ScopedLock bufferguard(m_BufLock);
628
629 bool move = false;
630 for (int i = 0; i < offset; ++i)
631 {
632 m_iBytesCount -= m_pFirstBlock->m_iLength;
633 if (m_pFirstBlock == m_pCurrBlock)
634 move = true;
635 m_pFirstBlock = m_pFirstBlock->m_pNext;
636 }
637 if (move)
638 m_pCurrBlock = m_pFirstBlock;
639
640 m_iCount -= offset;
641
642 updAvgBufSize(steady_clock::now());
643 }
644
getCurrBufSize() const645 int CSndBuffer::getCurrBufSize() const
646 {
647 return m_iCount;
648 }
649
getAvgBufSize(int & w_bytes,int & w_tsp)650 int CSndBuffer::getAvgBufSize(int& w_bytes, int& w_tsp)
651 {
652 ScopedLock bufferguard(m_BufLock); /* Consistency of pkts vs. bytes vs. spantime */
653
654 /* update stats in case there was no add/ack activity lately */
655 updAvgBufSize(steady_clock::now());
656
657 // Average number of packets and timespan could be small,
658 // so rounding is beneficial, while for the number of
659 // bytes in the buffer is a higher value, so rounding can be omitted,
660 // but probably better to round all three values.
661 w_bytes = round_val(m_mavg.bytes());
662 w_tsp = round_val(m_mavg.timespan_ms());
663 return round_val(m_mavg.pkts());
664 }
665
updAvgBufSize(const steady_clock::time_point & now)666 void CSndBuffer::updAvgBufSize(const steady_clock::time_point& now)
667 {
668 if (!m_mavg.isTimeToUpdate(now))
669 return;
670
671 int bytes = 0;
672 int timespan_ms = 0;
673 const int pkts = getCurrBufSize((bytes), (timespan_ms));
674 m_mavg.update(now, pkts, bytes, timespan_ms);
675 }
676
getCurrBufSize(int & w_bytes,int & w_timespan)677 int CSndBuffer::getCurrBufSize(int& w_bytes, int& w_timespan)
678 {
679 w_bytes = m_iBytesCount;
680 /*
681 * Timespan can be less then 1000 us (1 ms) if few packets.
682 * Also, if there is only one pkt in buffer, the time difference will be 0.
683 * Therefore, always add 1 ms if not empty.
684 */
685 w_timespan = 0 < m_iCount ? count_milliseconds(m_tsLastOriginTime - m_pFirstBlock->m_tsOriginTime) + 1 : 0;
686
687 return m_iCount;
688 }
689
dropLateData(int & w_bytes,int32_t & w_first_msgno,const steady_clock::time_point & too_late_time)690 int CSndBuffer::dropLateData(int& w_bytes, int32_t& w_first_msgno, const steady_clock::time_point& too_late_time)
691 {
692 int dpkts = 0;
693 int dbytes = 0;
694 bool move = false;
695 int32_t msgno = 0;
696
697 ScopedLock bufferguard(m_BufLock);
698 for (int i = 0; i < m_iCount && m_pFirstBlock->m_tsOriginTime < too_late_time; ++i)
699 {
700 dpkts++;
701 dbytes += m_pFirstBlock->m_iLength;
702 msgno = m_pFirstBlock->getMsgSeq();
703
704 if (m_pFirstBlock == m_pCurrBlock)
705 move = true;
706 m_pFirstBlock = m_pFirstBlock->m_pNext;
707 }
708
709 if (move)
710 {
711 m_pCurrBlock = m_pFirstBlock;
712 }
713 m_iCount -= dpkts;
714
715 m_iBytesCount -= dbytes;
716 w_bytes = dbytes;
717
718 // We report the increased number towards the last ever seen
719 // by the loop, as this last one is the last received. So remained
720 // (even if "should remain") is the first after the last removed one.
721 w_first_msgno = ++MsgNo(msgno);
722
723 updAvgBufSize(steady_clock::now());
724
725 return (dpkts);
726 }
727
increase()728 void CSndBuffer::increase()
729 {
730 int unitsize = m_pBuffer->m_iSize;
731
732 // new physical buffer
733 Buffer* nbuf = NULL;
734 try
735 {
736 nbuf = new Buffer;
737 nbuf->m_pcData = new char[unitsize * m_iMSS];
738 }
739 catch (...)
740 {
741 delete nbuf;
742 throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
743 }
744 nbuf->m_iSize = unitsize;
745 nbuf->m_pNext = NULL;
746
747 // insert the buffer at the end of the buffer list
748 Buffer* p = m_pBuffer;
749 while (p->m_pNext != NULL)
750 p = p->m_pNext;
751 p->m_pNext = nbuf;
752
753 // new packet blocks
754 Block* nblk = NULL;
755 try
756 {
757 nblk = new Block;
758 }
759 catch (...)
760 {
761 delete nblk;
762 throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
763 }
764 Block* pb = nblk;
765 for (int i = 1; i < unitsize; ++i)
766 {
767 pb->m_pNext = new Block;
768 pb = pb->m_pNext;
769 }
770
771 // insert the new blocks onto the existing one
772 pb->m_pNext = m_pLastBlock->m_pNext;
773 m_pLastBlock->m_pNext = nblk;
774
775 pb = nblk;
776 char* pc = nbuf->m_pcData;
777 for (int i = 0; i < unitsize; ++i)
778 {
779 pb->m_pcData = pc;
780 pb = pb->m_pNext;
781 pc += m_iMSS;
782 }
783
784 m_iSize += unitsize;
785
786 HLOGC(bslog.Debug,
787 log << "CSndBuffer: BUFFER FULL - adding " << (unitsize * m_iMSS) << " bytes spread to " << unitsize
788 << " blocks"
789 << " (total size: " << m_iSize << " bytes)");
790 }
791
792 ////////////////////////////////////////////////////////////////////////////////
793
794 /*
795 * RcvBuffer (circular buffer):
796 *
797 * |<------------------- m_iSize ----------------------------->|
798 * | |<--- acked pkts -->|<--- m_iMaxPos --->| |
799 * | | | | |
800 * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+
801 * | 0 | 0 | 1 | 1 | 1 | 0 | 1 | 1 | 1 | 1 | 0 | 1 | 0 |...| 0 | m_pUnit[]
802 * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+
803 * | | | |
804 * | | \__last pkt received
805 * | \___ m_iLastAckPos: last ack sent
806 * \___ m_iStartPos: first message to read
807 *
808 * m_pUnit[i]->m_iFlag: 0:free, 1:good, 2:passack, 3:dropped
809 *
810 * thread safety:
811 * m_iStartPos: CUDT::m_RecvLock
812 * m_iLastAckPos: CUDT::m_AckLock
813 * m_iMaxPos: none? (modified on add and ack
814 */
815
816 // XXX Init values moved to in-class.
817 // const uint32_t CRcvBuffer::TSBPD_WRAP_PERIOD = (30*1000000); //30 seconds (in usec)
818 // const int CRcvBuffer::TSBPD_DRIFT_MAX_VALUE = 5000; // usec
819 // const int CRcvBuffer::TSBPD_DRIFT_MAX_SAMPLES = 1000; // ACK-ACK packets
820 #ifdef SRT_DEBUG_TSBPD_DRIFT
821 // const int CRcvBuffer::TSBPD_DRIFT_PRT_SAMPLES = 200; // ACK-ACK packets
822 #endif
823
CRcvBuffer(CUnitQueue * queue,int bufsize_pkts)824 CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize_pkts)
825 : m_pUnit(NULL)
826 , m_iSize(bufsize_pkts)
827 , m_pUnitQueue(queue)
828 , m_iStartPos(0)
829 , m_iLastAckPos(0)
830 , m_iMaxPos(0)
831 , m_iNotch(0)
832 , m_BytesCountLock()
833 , m_iBytesCount(0)
834 , m_iAckedPktsCount(0)
835 , m_iAckedBytesCount(0)
836 , m_uAvgPayloadSz(7 * 188)
837 {
838 m_pUnit = new CUnit*[m_iSize];
839 for (int i = 0; i < m_iSize; ++i)
840 m_pUnit[i] = NULL;
841
842 #ifdef SRT_DEBUG_TSBPD_DRIFT
843 memset(m_TsbPdDriftHisto100us, 0, sizeof(m_TsbPdDriftHisto100us));
844 memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms));
845 #endif
846
847 setupMutex(m_BytesCountLock, "BytesCount");
848 }
849
~CRcvBuffer()850 CRcvBuffer::~CRcvBuffer()
851 {
852 for (int i = 0; i < m_iSize; ++i)
853 {
854 if (m_pUnit[i] != NULL)
855 {
856 m_pUnitQueue->makeUnitFree(m_pUnit[i]);
857 }
858 }
859
860 delete[] m_pUnit;
861
862 releaseMutex(m_BytesCountLock);
863 }
864
countBytes(int pkts,int bytes,bool acked)865 void CRcvBuffer::countBytes(int pkts, int bytes, bool acked)
866 {
867 /*
868 * Byte counter changes from both sides (Recv & Ack) of the buffer
869 * so the higher level lock is not enough for thread safe op.
870 *
871 * pkts are...
872 * added (bytes>0, acked=false),
873 * acked (bytes>0, acked=true),
874 * removed (bytes<0, acked=n/a)
875 */
876 ScopedLock cg(m_BytesCountLock);
877
878 if (!acked) // adding new pkt in RcvBuffer
879 {
880 m_iBytesCount += bytes; /* added or removed bytes from rcv buffer */
881 if (bytes > 0) /* Assuming one pkt when adding bytes */
882 m_uAvgPayloadSz = ((m_uAvgPayloadSz * (100 - 1)) + bytes) / 100;
883 }
884 else // acking/removing pkts to/from buffer
885 {
886 m_iAckedPktsCount += pkts; /* acked or removed pkts from rcv buffer */
887 m_iAckedBytesCount += bytes; /* acked or removed bytes from rcv buffer */
888
889 if (bytes < 0)
890 m_iBytesCount += bytes; /* removed bytes from rcv buffer */
891 }
892 }
893
addData(CUnit * unit,int offset)894 int CRcvBuffer::addData(CUnit* unit, int offset)
895 {
896 SRT_ASSERT(unit != NULL);
897 if (offset >= getAvailBufSize())
898 return -1;
899
900 const int pos = (m_iLastAckPos + offset) % m_iSize;
901 if (offset >= m_iMaxPos)
902 m_iMaxPos = offset + 1;
903
904 if (m_pUnit[pos] != NULL)
905 {
906 HLOGC(qrlog.Debug, log << "addData: unit %" << unit->m_Packet.m_iSeqNo << " rejected, already exists");
907 return -1;
908 }
909 m_pUnit[pos] = unit;
910 countBytes(1, (int)unit->m_Packet.getLength());
911
912 m_pUnitQueue->makeUnitGood(unit);
913
914 HLOGC(qrlog.Debug,
915 log << "addData: unit %" << unit->m_Packet.m_iSeqNo << " accepted, off=" << offset << " POS=" << pos);
916 return 0;
917 }
918
readBuffer(char * data,int len)919 int CRcvBuffer::readBuffer(char* data, int len)
920 {
921 int p = m_iStartPos;
922 int lastack = m_iLastAckPos;
923 int rs = len;
924 IF_HEAVY_LOGGING(char* begin = data);
925
926 const bool bTsbPdEnabled = m_tsbpd.isEnabled();
927 const steady_clock::time_point now = (bTsbPdEnabled ? steady_clock::now() : steady_clock::time_point());
928
929 HLOGC(brlog.Debug, log << CONID() << "readBuffer: start=" << p << " lastack=" << lastack);
930 while ((p != lastack) && (rs > 0))
931 {
932 if (m_pUnit[p] == NULL)
933 {
934 LOGC(brlog.Error, log << CONID() << " IPE readBuffer on null packet pointer");
935 return -1;
936 }
937
938 const srt::CPacket& pkt = m_pUnit[p]->m_Packet;
939
940 if (bTsbPdEnabled)
941 {
942 HLOGC(brlog.Debug,
943 log << CONID() << "readBuffer: chk if time2play:"
944 << " NOW=" << FormatTime(now)
945 << " PKT TS=" << FormatTime(getPktTsbPdTime(pkt.getMsgTimeStamp())));
946
947 if ((getPktTsbPdTime(pkt.getMsgTimeStamp()) > now))
948 break; /* too early for this unit, return whatever was copied */
949 }
950
951 const int pktlen = (int) pkt.getLength();
952 const int remain_pktlen = pktlen - m_iNotch;
953
954 const int unitsize = std::min(remain_pktlen, rs);
955
956 HLOGC(brlog.Debug,
957 log << CONID() << "readBuffer: copying buffer #" << p << " targetpos=" << int(data - begin)
958 << " sourcepos=" << m_iNotch << " size=" << unitsize << " left=" << (unitsize - rs));
959 memcpy((data), pkt.m_pcData + m_iNotch, unitsize);
960
961 data += unitsize;
962
963 if (rs >= remain_pktlen)
964 {
965 freeUnitAt(p);
966 p = shiftFwd(p);
967
968 m_iNotch = 0;
969 }
970 else
971 m_iNotch += rs;
972
973 rs -= unitsize;
974 }
975
976 /* we removed acked bytes form receive buffer */
977 countBytes(-1, -(len - rs), true);
978 m_iStartPos = p;
979
980 return len - rs;
981 }
982
readBufferToFile(fstream & ofs,int len)983 int CRcvBuffer::readBufferToFile(fstream& ofs, int len)
984 {
985 int p = m_iStartPos;
986 int lastack = m_iLastAckPos;
987 int rs = len;
988
989 int32_t trace_seq SRT_ATR_UNUSED = SRT_SEQNO_NONE;
990 int trace_shift SRT_ATR_UNUSED = -1;
991
992 while ((p != lastack) && (rs > 0))
993 {
994 #if ENABLE_LOGGING
995 ++trace_shift;
996 #endif
997 // Skip empty units. Note that this shouldn't happen
998 // in case of a file transfer.
999 if (!m_pUnit[p])
1000 {
1001 p = shiftFwd(p);
1002 LOGC(brlog.Error, log << "readBufferToFile: IPE: NULL unit found in file transmission, last good %"
1003 << trace_seq << " + " << trace_shift);
1004 continue;
1005 }
1006
1007 const srt::CPacket& pkt = m_pUnit[p]->m_Packet;
1008
1009 #if ENABLE_LOGGING
1010 trace_seq = pkt.getSeqNo();
1011 #endif
1012 const int pktlen = (int) pkt.getLength();
1013 const int remain_pktlen = pktlen - m_iNotch;
1014
1015 const int unitsize = std::min(remain_pktlen, rs);
1016
1017 ofs.write(pkt.m_pcData + m_iNotch, unitsize);
1018 if (ofs.fail())
1019 break;
1020
1021 if (rs >= remain_pktlen)
1022 {
1023 freeUnitAt(p);
1024 p = shiftFwd(p);
1025
1026 m_iNotch = 0;
1027 }
1028 else
1029 m_iNotch += rs;
1030
1031 rs -= unitsize;
1032 }
1033
1034 /* we removed acked bytes form receive buffer */
1035 countBytes(-1, -(len - rs), true);
1036 m_iStartPos = p;
1037
1038 return len - rs;
1039 }
1040
ackData(int len)1041 int CRcvBuffer::ackData(int len)
1042 {
1043 SRT_ASSERT(len < m_iSize);
1044 SRT_ASSERT(len > 0);
1045 int end = shift(m_iLastAckPos, len);
1046
1047 {
1048 int pkts = 0;
1049 int bytes = 0;
1050 for (int i = m_iLastAckPos; i != end; i = shiftFwd(i))
1051 {
1052 if (m_pUnit[i] == NULL)
1053 continue;
1054
1055 pkts++;
1056 bytes += (int)m_pUnit[i]->m_Packet.getLength();
1057 }
1058 if (pkts > 0)
1059 countBytes(pkts, bytes, true);
1060 }
1061
1062 HLOGC(brlog.Debug,
1063 log << "ackData: shift by " << len << ", start=" << m_iStartPos << " end=" << m_iLastAckPos << " -> " << end);
1064
1065 m_iLastAckPos = end;
1066 m_iMaxPos -= len;
1067 if (m_iMaxPos < 0)
1068 m_iMaxPos = 0;
1069
1070 // Returned value is the distance towards the starting
1071 // position from m_iLastAckPos, which is in sync with CUDT::m_iRcvLastSkipAck.
1072 // This should help determine the sequence number at first read-ready position.
1073
1074 const int dist = m_iLastAckPos - m_iStartPos;
1075 if (dist < 0)
1076 return dist + m_iSize;
1077 return dist;
1078 }
1079
skipData(int len)1080 void CRcvBuffer::skipData(int len)
1081 {
1082 /*
1083 * Caller need protect both AckLock and RecvLock
1084 * to move both m_iStartPos and m_iLastAckPost
1085 */
1086 if (m_iStartPos == m_iLastAckPos)
1087 m_iStartPos = (m_iStartPos + len) % m_iSize;
1088 m_iLastAckPos = (m_iLastAckPos + len) % m_iSize;
1089 m_iMaxPos -= len;
1090 if (m_iMaxPos < 0)
1091 m_iMaxPos = 0;
1092 }
1093
dropData(int len)1094 size_t CRcvBuffer::dropData(int len)
1095 {
1096 // This function does the same as skipData, although skipData
1097 // should work in the condition of absence of data, so no need
1098 // to force the units in the range to be freed. This function
1099 // works in more general condition where we don't know if there
1100 // are any data in the given range, but want to remove these
1101 // "sequence positions" from the buffer, whether there are data
1102 // at them or not.
1103
1104 size_t stats_bytes = 0;
1105
1106 int p = m_iStartPos;
1107 int past_q = shift(p, len);
1108 while (p != past_q)
1109 {
1110 if (m_pUnit[p] && m_pUnit[p]->m_iFlag == CUnit::GOOD)
1111 {
1112 stats_bytes += m_pUnit[p]->m_Packet.getLength();
1113 freeUnitAt(p);
1114 }
1115
1116 p = shiftFwd(p);
1117 }
1118
1119 m_iStartPos = past_q;
1120 return stats_bytes;
1121 }
1122
getRcvFirstMsg(steady_clock::time_point & w_tsbpdtime,bool & w_passack,int32_t & w_skipseqno,int32_t & w_curpktseq,int32_t base_seq)1123 bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
1124 bool& w_passack,
1125 int32_t& w_skipseqno,
1126 int32_t& w_curpktseq,
1127 int32_t base_seq)
1128 {
1129 HLOGC(brlog.Debug, log << "getRcvFirstMsg: base_seq=" << base_seq);
1130 w_skipseqno = SRT_SEQNO_NONE;
1131 w_passack = false;
1132 // tsbpdtime will be retrieved by the below call
1133 // Returned values:
1134 // - tsbpdtime: real time when the packet is ready to play (whether ready to play or not)
1135 // - w_passack: false (the report concerns a packet with an exactly next sequence)
1136 // - w_skipseqno == SRT_SEQNO_NONE: no packets to skip towards the first RTP
1137 // - w_curpktseq: that exactly packet that is reported (for debugging purposes)
1138 // - @return: whether the reported packet is ready to play
1139
1140 /* Check the acknowledged packets */
1141 // getRcvReadyMsg returns true if the time to play for the first message
1142 // that larger than base_seq is in the past.
1143 if (getRcvReadyMsg((w_tsbpdtime), (w_curpktseq), -1, base_seq))
1144 {
1145 HLOGC(brlog.Debug, log << "getRcvFirstMsg: ready CONTIG packet: %" << w_curpktseq);
1146 return true;
1147 }
1148 else if (!is_zero(w_tsbpdtime))
1149 {
1150 HLOGC(brlog.Debug, log << "getRcvFirstMsg: packets found, but in future");
1151 // This means that a message next to be played, has been found,
1152 // but the time to play is in future.
1153 return false;
1154 }
1155
1156 // Falling here means that there are NO PACKETS in the ACK-ed region
1157 // (m_iStartPos - m_iLastAckPos), but we may have something in the
1158 // region (m_iLastAckPos - (m_iLastAckPos+m_iMaxPos)), that is, packets
1159 // that may be separated from the last ACK-ed by lost ones.
1160
1161 // Below this line we have only two options:
1162 // - m_iMaxPos == 0, which means that no more packets are in the buffer
1163 // - returned: tsbpdtime=0, w_passack=true, w_skipseqno=SRT_SEQNO_NONE, w_curpktseq=<unchanged>, @return false
1164 // - m_iMaxPos > 0, which means that there are packets arrived after a lost packet:
1165 // - returned: tsbpdtime=PKT.TS, w_passack=true, w_skipseqno=PKT.SEQ, w_curpktseq=PKT, @return LOCAL(PKT.TS) <=
1166 // NOW
1167
1168 /*
1169 * No acked packets ready but caller want to know next packet to wait for
1170 * Check the not yet acked packets that may be stuck by missing packet(s).
1171 */
1172 bool haslost = false;
1173 int last_ready_pos = -1;
1174 steady_clock::time_point tsbpdtime = steady_clock::time_point();
1175 w_tsbpdtime = steady_clock::time_point();
1176 w_passack = true;
1177
1178 // XXX SUSPECTED ISSUE with this algorithm:
1179 // The above call to getRcvReadyMsg() should report as to whether:
1180 // - there is an EXACTLY NEXT SEQUENCE packet
1181 // - this packet is ready to play.
1182 //
1183 // Situations handled after the call are when:
1184 // - there's the next sequence packet available and it is ready to play
1185 // - there are no packets at all, ready to play or not
1186 //
1187 // So, the remaining situation is that THERE ARE PACKETS that follow
1188 // the current sequence, but they are not ready to play. This includes
1189 // packets that have the exactly next sequence and packets that jump
1190 // over a lost packet.
1191 //
1192 // As the getRcvReadyMsg() function walks through the incoming units
1193 // to see if there's anything that satisfies these conditions, it *SHOULD*
1194 // be also capable of checking if the next available packet, if it is
1195 // there, is the next sequence packet or not. Retrieving this exactly
1196 // packet would be most useful, as the test for play-readiness and
1197 // sequentiality can be done on it directly.
1198 //
1199 // When done so, the below loop would be completely unnecessary.
1200
1201 // Logical description of the below algorithm:
1202 // 1. update w_tsbpdtime and w_curpktseq if found one packet ready to play
1203 // - keep check the next packet if still smaller than base_seq
1204 // 2. set w_skipseqno if found packets before w_curpktseq lost
1205 // if no packets larger than base_seq ready to play, return the largest RTP
1206 // else return the first one that larger than base_seq and rady to play
1207
1208 for (int i = m_iLastAckPos, n = shift(m_iLastAckPos, m_iMaxPos); i != n; i = shiftFwd(i))
1209 {
1210 if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD)
1211 {
1212 /* There are packets in the sequence not received yet */
1213 haslost = true;
1214 HLOGC(brlog.Debug, log << "getRcvFirstMsg: empty hole at *" << i);
1215 }
1216 else
1217 {
1218 tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
1219 /* Packet ready to play */
1220 if (tsbpdtime <= steady_clock::now())
1221 {
1222 // If the last ready-to-play packet exists, free it.
1223 if (!is_zero(w_tsbpdtime)) {
1224 HLOGC(brlog.Debug,
1225 log << "getRcvFirstMsg: found next ready packet, free last %"
1226 << w_curpktseq << " POS=" << last_ready_pos);
1227 SRT_ASSERT(w_curpktseq != SRT_SEQNO_NONE);
1228 freeUnitAt(last_ready_pos);
1229 }
1230 w_tsbpdtime = tsbpdtime;
1231 w_curpktseq = m_pUnit[i]->m_Packet.m_iSeqNo;
1232 last_ready_pos = i;
1233 if (haslost)
1234 w_skipseqno = w_curpktseq;
1235
1236 if (base_seq != SRT_SEQNO_NONE && CSeqNo::seqcmp(w_curpktseq, base_seq) <= 0)
1237 {
1238 HLOGC(brlog.Debug,
1239 log << "getRcvFirstMsg: found ready packet %" << w_curpktseq
1240 << " but not larger than base_seq, try next");
1241 continue;
1242 }
1243
1244 HLOGC(brlog.Debug,
1245 log << "getRcvFirstMsg: found ready packet, nSKIPPED: "
1246 << ((i - m_iLastAckPos + m_iSize) % m_iSize));
1247
1248 // NOTE: if haslost is not set, it means that this is the VERY FIRST
1249 // packet, that is, packet currently at pos = m_iLastAckPos. There's no
1250 // possibility that it is so otherwise because:
1251 // - if this first good packet is ready to play, THIS HERE RETURNS NOW.
1252 // ...
1253 return true;
1254 }
1255
1256 if (!is_zero(w_tsbpdtime)) {
1257 return true;
1258 }
1259 HLOGC(brlog.Debug,
1260 log << "getRcvFirstMsg: found NOT READY packet, nSKIPPED: "
1261 << ((i - m_iLastAckPos + m_iSize) % m_iSize));
1262 // ... and if this first good packet WASN'T ready to play, THIS HERE RETURNS NOW, TOO,
1263 // just states that there's no ready packet to play.
1264 // ...
1265 return false;
1266 }
1267 // ... and if this first packet WASN'T GOOD, the loop continues, however since now
1268 // the 'haslost' is set, which means that it continues only to find the first valid
1269 // packet after stating that the very first packet isn't valid.
1270 }
1271 if (!is_zero(w_tsbpdtime)) {
1272 return true;
1273 }
1274 HLOGC(brlog.Debug, log << "getRcvFirstMsg: found NO PACKETS");
1275 return false;
1276 }
1277
debugGetDeliveryTime(int offset)1278 steady_clock::time_point CRcvBuffer::debugGetDeliveryTime(int offset)
1279 {
1280 int i;
1281 if (offset > 0)
1282 i = shift(m_iStartPos, offset);
1283 else
1284 i = m_iStartPos;
1285
1286 CUnit* u = m_pUnit[i];
1287 if (!u || u->m_iFlag != CUnit::GOOD)
1288 return steady_clock::time_point();
1289
1290 return getPktTsbPdTime(u->m_Packet.getMsgTimeStamp());
1291 }
1292
getTopMsgno() const1293 int32_t CRcvBuffer::getTopMsgno() const
1294 {
1295 if (m_iStartPos == m_iLastAckPos)
1296 return SRT_MSGNO_NONE; // No message is waiting
1297
1298 if (!m_pUnit[m_iStartPos])
1299 return SRT_MSGNO_NONE; // pity
1300
1301 return m_pUnit[m_iStartPos]->m_Packet.getMsgSeq();
1302 }
1303
getRcvReadyMsg(steady_clock::time_point & w_tsbpdtime,int32_t & w_curpktseq,int upto,int base_seq)1304 bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto, int base_seq)
1305 {
1306 const bool havelimit = upto != -1;
1307 int end = -1, past_end = -1;
1308 if (havelimit)
1309 {
1310 int stretch = (m_iSize + m_iStartPos - m_iLastAckPos) % m_iSize;
1311 if (upto > stretch)
1312 {
1313 HLOGC(brlog.Debug, log << "position back " << upto << " exceeds stretch " << stretch);
1314 // Do nothing. This position is already gone.
1315 return false;
1316 }
1317
1318 end = m_iLastAckPos - upto;
1319 if (end < 0)
1320 end += m_iSize;
1321 past_end = shiftFwd(end); // For in-loop comparison
1322 HLOGC(brlog.Debug, log << "getRcvReadyMsg: will read from position " << end);
1323 }
1324
1325 // NOTE: position m_iLastAckPos in the buffer represents the sequence number of
1326 // CUDT::m_iRcvLastSkipAck. Therefore 'upto' contains a positive value that should
1327 // be decreased from m_iLastAckPos to get the position in the buffer that represents
1328 // the sequence number up to which we'd like to read.
1329 IF_HEAVY_LOGGING(const char* reason = "NOT RECEIVED");
1330
1331 for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = shiftFwd(i))
1332 {
1333 // In case when we want to read only up to given sequence number, stop
1334 // the loop if this number was reached. This number must be extracted from
1335 // the buffer and any following must wait here for "better times". Note
1336 // that the unit that points to the requested sequence must remain in
1337 // the buffer, unless there is no valid packet at that position, in which
1338 // case it is allowed to point to the NEXT sequence towards it, however
1339 // if it does, this cell must remain in the buffer for prospective recovery.
1340 if (havelimit && i == past_end)
1341 break;
1342
1343 bool freeunit = false;
1344
1345 /* Skip any invalid skipped/dropped packets */
1346 if (m_pUnit[i] == NULL)
1347 {
1348 HLOGC(brlog.Debug,
1349 log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
1350 << " SKIPPED - no unit there");
1351 m_iStartPos = shiftFwd(m_iStartPos);
1352 continue;
1353 }
1354
1355 w_curpktseq = m_pUnit[i]->m_Packet.getSeqNo();
1356
1357 if (m_pUnit[i]->m_iFlag != CUnit::GOOD)
1358 {
1359 HLOGC(brlog.Debug,
1360 log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
1361 << " SKIPPED - unit not good");
1362 freeunit = true;
1363 }
1364 else
1365 {
1366 // This does:
1367 // 1. Get the TSBPD time of the unit. Stop and return false if this unit
1368 // is not yet ready to play.
1369 // 2. If it's ready to play, check also if it's decrypted. If not, skip it.
1370 // 3. Check also if it's larger than base_seq, if not, skip it.
1371 // 4. If it's ready to play, decrypted and larger than base, stop and return it.
1372 if (!havelimit)
1373 {
1374 w_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
1375 const steady_clock::duration towait = (w_tsbpdtime - steady_clock::now());
1376 if (towait.count() > 0)
1377 {
1378 HLOGC(brlog.Debug,
1379 log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
1380 << " pkt %" << w_curpktseq << " NOT ready to play (only in " << count_milliseconds(towait)
1381 << "ms)");
1382 return false;
1383 }
1384
1385 if (m_pUnit[i]->m_Packet.getMsgCryptoFlags() != EK_NOENC)
1386 {
1387 IF_HEAVY_LOGGING(reason = "DECRYPTION FAILED");
1388 freeunit = true; /* packet not decrypted */
1389 }
1390 else if (base_seq != SRT_SEQNO_NONE && CSeqNo::seqcmp(w_curpktseq, base_seq) <= 0)
1391 {
1392 IF_HEAVY_LOGGING(reason = "smaller than base_seq");
1393 w_tsbpdtime = steady_clock::time_point();
1394 freeunit = true;
1395 }
1396 else
1397 {
1398 HLOGC(brlog.Debug,
1399 log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
1400 << " pkt %" << w_curpktseq << " ready to play (delayed " << count_milliseconds(towait)
1401 << "ms)");
1402 return true;
1403 }
1404 }
1405 // In this case:
1406 // 1. We don't even look into the packet if this is not the requested sequence.
1407 // All packets that are earlier than the required sequence will be dropped.
1408 // 2. When found the packet with expected sequence number, and the condition for
1409 // good unit is passed, we get the timestamp.
1410 // 3. If the packet is not decrypted, we allow it to be removed
1411 // 4. If we reached the required sequence, and the packet is good, KEEP IT in the buffer,
1412 // and return with the pointer pointing to this very buffer. Only then return true.
1413 else
1414 {
1415 // We have a limit up to which the reading will be done,
1416 // no matter if the time has come or not - although retrieve it.
1417 if (i == end)
1418 {
1419 HLOGC(brlog.Debug, log << "CAUGHT required seq position " << i);
1420 // We have the packet we need. Extract its data.
1421 w_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
1422
1423 // If we have a decryption failure, allow the unit to be released.
1424 if (m_pUnit[i]->m_Packet.getMsgCryptoFlags() != EK_NOENC)
1425 {
1426 IF_HEAVY_LOGGING(reason = "DECRYPTION FAILED");
1427 freeunit = true; /* packet not decrypted */
1428 }
1429 else
1430 {
1431 // Stop here and keep the packet in the buffer, so it will be
1432 // next extracted.
1433 HLOGC(brlog.Debug,
1434 log << "getRcvReadyMsg: packet seq=" << w_curpktseq << " ready for extraction");
1435 return true;
1436 }
1437 }
1438 else
1439 {
1440 HLOGC(brlog.Debug, log << "SKIPPING position " << i);
1441 // Continue the loop and remove the current packet because
1442 // its sequence number is too old.
1443 freeunit = true;
1444 }
1445 }
1446 }
1447
1448 if (freeunit)
1449 {
1450 HLOGC(brlog.Debug, log << "getRcvReadyMsg: POS=" << i << " FREED: " << reason);
1451 /* removed skipped, dropped, undecryptable bytes from rcv buffer */
1452 const int rmbytes = (int)m_pUnit[i]->m_Packet.getLength();
1453 countBytes(-1, -rmbytes, true);
1454
1455 freeUnitAt(i);
1456 m_iStartPos = shiftFwd(m_iStartPos);
1457 }
1458 }
1459
1460 HLOGC(brlog.Debug, log << "getRcvReadyMsg: nothing to deliver: " << reason);
1461 return false;
1462 }
1463
1464 /*
1465 * Return receivable data status (packet timestamp_us ready to play if TsbPd mode)
1466 * Return playtime (tsbpdtime) of 1st packet in queue, ready to play or not
1467 *
1468 * Return data ready to be received (packet timestamp_us ready to play if TsbPd mode)
1469 * Using getRcvDataSize() to know if there is something to read as it was widely
1470 * used in the code (core.cpp) is expensive in TsbPD mode, hence this simpler function
1471 * that only check if first packet in queue is ready.
1472 */
isRcvDataReady(steady_clock::time_point & w_tsbpdtime,int32_t & w_curpktseq,int32_t seqdistance)1473 bool CRcvBuffer::isRcvDataReady(steady_clock::time_point& w_tsbpdtime, int32_t& w_curpktseq, int32_t seqdistance)
1474 {
1475 w_tsbpdtime = steady_clock::time_point();
1476
1477 if (m_tsbpd.isEnabled())
1478 {
1479 const srt::CPacket* pkt = getRcvReadyPacket(seqdistance);
1480 if (!pkt)
1481 {
1482 HLOGC(brlog.Debug, log << "isRcvDataReady: packet NOT extracted.");
1483 return false;
1484 }
1485
1486 /*
1487 * Acknowledged data is available,
1488 * Only say ready if time to deliver.
1489 * Report the timestamp, ready or not.
1490 */
1491 w_curpktseq = pkt->getSeqNo();
1492 w_tsbpdtime = getPktTsbPdTime(pkt->getMsgTimeStamp());
1493
1494 // If seqdistance was passed, then return true no matter what the
1495 // TSBPD time states.
1496 if (seqdistance != -1 || w_tsbpdtime <= steady_clock::now())
1497 {
1498 HLOGC(brlog.Debug,
1499 log << "isRcvDataReady: packet extracted seqdistance=" << seqdistance
1500 << " TsbPdTime=" << FormatTime(w_tsbpdtime));
1501 return true;
1502 }
1503
1504 HLOGC(brlog.Debug, log << "isRcvDataReady: packet extracted, but NOT READY");
1505 return false;
1506 }
1507
1508 return isRcvDataAvailable();
1509 }
1510
1511 // XXX This function may be called only after checking
1512 // if m_bTsbPdMode.
getRcvReadyPacket(int32_t seqdistance)1513 CPacket* CRcvBuffer::getRcvReadyPacket(int32_t seqdistance)
1514 {
1515 // If asked for readiness of a packet at given sequence distance
1516 // (that is, we need to extract the packet with given sequence number),
1517 // only check if this cell is occupied in the buffer, and if so,
1518 // if it's occupied with a "good" unit. That's all. It doesn't
1519 // matter whether it's ready to play.
1520 if (seqdistance != -1)
1521 {
1522 // Note: seqdistance is the value to to go BACKWARDS from m_iLastAckPos,
1523 // which is the position that is in sync with CUDT::m_iRcvLastSkipAck. This
1524 // position is the sequence number of a packet that is NOT received, but it's
1525 // expected to be received as next. So the minimum value of seqdistance is 1.
1526
1527 // SANITY CHECK
1528 if (seqdistance == 0)
1529 {
1530 LOGC(brlog.Fatal, log << "IPE: trying to extract packet past the last ACK-ed!");
1531 return 0;
1532 }
1533
1534 if (seqdistance > getRcvDataSize())
1535 {
1536 HLOGC(brlog.Debug,
1537 log << "getRcvReadyPacket: Sequence offset=" << seqdistance
1538 << " is in the past (start=" << m_iStartPos << " end=" << m_iLastAckPos << ")");
1539 return 0;
1540 }
1541
1542 int i = shift(m_iLastAckPos, -seqdistance);
1543 if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD)
1544 {
1545 HLOGC(brlog.Debug, log << "getRcvReadyPacket: FOUND PACKET %" << m_pUnit[i]->m_Packet.getSeqNo());
1546 return &m_pUnit[i]->m_Packet;
1547 }
1548
1549 HLOGC(brlog.Debug, log << "getRcvReadyPacket: Sequence offset=" << seqdistance << " IS NOT RECEIVED.");
1550 return 0;
1551 }
1552
1553 IF_HEAVY_LOGGING(int nskipped = 0);
1554 for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = shiftFwd(i))
1555 {
1556 /*
1557 * Skip missing packets that did not arrive in time.
1558 */
1559 if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD)
1560 {
1561 HLOGC(brlog.Debug,
1562 log << "getRcvReadyPacket: Found next packet seq=%" << m_pUnit[i]->m_Packet.getSeqNo() << " ("
1563 << nskipped << " empty cells skipped)");
1564 return &m_pUnit[i]->m_Packet;
1565 }
1566 IF_HEAVY_LOGGING(++nskipped);
1567 }
1568
1569 return 0;
1570 }
1571
1572 #if ENABLE_HEAVY_LOGGING
1573 // This function is for debug purposes only and it's called only
1574 // from within HLOG* macros.
reportBufferStats() const1575 void CRcvBuffer::reportBufferStats() const
1576 {
1577 int nmissing = 0;
1578 int32_t low_seq = SRT_SEQNO_NONE, high_seq = SRT_SEQNO_NONE;
1579 int32_t low_ts = 0, high_ts = 0;
1580
1581 for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = (i + 1) % m_iSize)
1582 {
1583 if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD)
1584 {
1585 low_seq = m_pUnit[i]->m_Packet.m_iSeqNo;
1586 low_ts = m_pUnit[i]->m_Packet.m_iTimeStamp;
1587 break;
1588 }
1589 ++nmissing;
1590 }
1591
1592 // Not sure if a packet MUST BE at the last ack pos position, so check, just in case.
1593 int n = m_iLastAckPos;
1594 if (m_pUnit[n] && m_pUnit[n]->m_iFlag == CUnit::GOOD)
1595 {
1596 high_ts = m_pUnit[n]->m_Packet.m_iTimeStamp;
1597 high_seq = m_pUnit[n]->m_Packet.m_iSeqNo;
1598 }
1599 else
1600 {
1601 // Possibilities are:
1602 // m_iStartPos == m_iLastAckPos, high_ts == low_ts, defined.
1603 // No packet: low_ts == 0, so high_ts == 0, too.
1604 high_ts = low_ts;
1605 }
1606 // The 32-bit timestamps are relative and roll over oftten; what
1607 // we really need is the timestamp difference. The only place where
1608 // we can ask for the time base is the upper time because when trying
1609 // to receive the time base for the lower time we'd break the requirement
1610 // for monotonic clock.
1611
1612 uint64_t upper_time = high_ts;
1613 uint64_t lower_time = low_ts;
1614
1615 if (lower_time > upper_time)
1616 upper_time += uint64_t(srt::CPacket::MAX_TIMESTAMP) + 1;
1617
1618 int32_t timespan = upper_time - lower_time;
1619 int seqspan = 0;
1620 if (low_seq != SRT_SEQNO_NONE && high_seq != SRT_SEQNO_NONE)
1621 {
1622 seqspan = CSeqNo::seqoff(low_seq, high_seq);
1623 }
1624
1625 LOGC(brlog.Debug,
1626 log << "RCV BUF STATS: seqspan=%(" << low_seq << "-" << high_seq << ":" << seqspan << ") missing=" << nmissing
1627 << "pkts");
1628 LOGC(brlog.Debug,
1629 log << "RCV BUF STATS: timespan=" << timespan << "us (lo=" << lower_time << " hi=" << upper_time << ")");
1630 }
1631
1632 #endif // ENABLE_HEAVY_LOGGING
1633
isRcvDataReady()1634 bool CRcvBuffer::isRcvDataReady()
1635 {
1636 steady_clock::time_point tsbpdtime;
1637 int32_t seq;
1638
1639 return isRcvDataReady((tsbpdtime), (seq), -1);
1640 }
1641
getAvailBufSize() const1642 int CRcvBuffer::getAvailBufSize() const
1643 {
1644 // One slot must be empty in order to tell the difference between "empty buffer" and "full buffer"
1645 return m_iSize - getRcvDataSize() - 1;
1646 }
1647
getRcvDataSize() const1648 int CRcvBuffer::getRcvDataSize() const
1649 {
1650 if (m_iLastAckPos >= m_iStartPos)
1651 return m_iLastAckPos - m_iStartPos;
1652
1653 return m_iSize + m_iLastAckPos - m_iStartPos;
1654 }
1655
debugGetSize() const1656 int CRcvBuffer::debugGetSize() const
1657 {
1658 // Does exactly the same as getRcvDataSize, but
1659 // it should be used FOR INFORMATIONAL PURPOSES ONLY.
1660 // The source values might be changed in another thread
1661 // during the calculation, although worst case the
1662 // resulting value may differ to the real buffer size by 1.
1663 int from = m_iStartPos, to = m_iLastAckPos;
1664 int size = to - from;
1665 if (size < 0)
1666 size += m_iSize;
1667
1668 return size;
1669 }
1670
1671 /* Return moving average of acked data pkts, bytes, and timespan (ms) of the receive buffer */
getRcvAvgDataSize(int & bytes,int & timespan)1672 int CRcvBuffer::getRcvAvgDataSize(int& bytes, int& timespan)
1673 {
1674 // Average number of packets and timespan could be small,
1675 // so rounding is beneficial, while for the number of
1676 // bytes in the buffer is a higher value, so rounding can be omitted,
1677 // but probably better to round all three values.
1678 timespan = round_val(m_mavg.timespan_ms());
1679 bytes = round_val(m_mavg.bytes());
1680 return round_val(m_mavg.pkts());
1681 }
1682
1683 /* Update moving average of acked data pkts, bytes, and timespan (ms) of the receive buffer */
updRcvAvgDataSize(const steady_clock::time_point & now)1684 void CRcvBuffer::updRcvAvgDataSize(const steady_clock::time_point& now)
1685 {
1686 if (!m_mavg.isTimeToUpdate(now))
1687 return;
1688
1689 int bytes = 0;
1690 int timespan_ms = 0;
1691 const int pkts = getRcvDataSize(bytes, timespan_ms);
1692 m_mavg.update(now, pkts, bytes, timespan_ms);
1693 }
1694
1695 /* Return acked data pkts, bytes, and timespan (ms) of the receive buffer */
getRcvDataSize(int & bytes,int & timespan)1696 int CRcvBuffer::getRcvDataSize(int& bytes, int& timespan)
1697 {
1698 timespan = 0;
1699 if (m_tsbpd.isEnabled())
1700 {
1701 // Get a valid startpos.
1702 // Skip invalid entries in the beginning, if any.
1703 int startpos = m_iStartPos;
1704 for (; startpos != m_iLastAckPos; startpos = shiftFwd(startpos))
1705 {
1706 if ((NULL != m_pUnit[startpos]) && (CUnit::GOOD == m_pUnit[startpos]->m_iFlag))
1707 break;
1708 }
1709
1710 int endpos = m_iLastAckPos;
1711
1712 if (m_iLastAckPos != startpos)
1713 {
1714 /*
1715 * |<--- DataSpan ---->|<- m_iMaxPos ->|
1716 * +---+---+---+---+---+---+---+---+---+---+---+---
1717 * | | 1 | 1 | 1 | 0 | 0 | 1 | 1 | 0 | 1 | | m_pUnits[]
1718 * +---+---+---+---+---+---+---+---+---+---+---+---
1719 * | |
1720 * \_ m_iStartPos \_ m_iLastAckPos
1721 *
1722 * m_pUnits[startpos] shall be valid (->m_iFlag==CUnit::GOOD).
1723 * If m_pUnits[m_iLastAckPos-1] is not valid (NULL or ->m_iFlag!=CUnit::GOOD),
1724 * it means m_pUnits[m_iLastAckPos] is valid since a valid unit is needed to skip.
1725 * Favor m_pUnits[m_iLastAckPos] if valid over [m_iLastAckPos-1] to include the whole acked interval.
1726 */
1727 if ((m_iMaxPos <= 0) || (!m_pUnit[m_iLastAckPos]) || (m_pUnit[m_iLastAckPos]->m_iFlag != CUnit::GOOD))
1728 {
1729 endpos = (m_iLastAckPos == 0 ? m_iSize - 1 : m_iLastAckPos - 1);
1730 }
1731
1732 if ((NULL != m_pUnit[endpos]) && (NULL != m_pUnit[startpos]))
1733 {
1734 const steady_clock::time_point startstamp =
1735 getPktTsbPdTime(m_pUnit[startpos]->m_Packet.getMsgTimeStamp());
1736 const steady_clock::time_point endstamp = getPktTsbPdTime(m_pUnit[endpos]->m_Packet.getMsgTimeStamp());
1737 /*
1738 * There are sampling conditions where spantime is < 0 (big unsigned value).
1739 * It has been observed after changing the SRT latency from 450 to 200 on the sender.
1740 *
1741 * Possible packet order corruption when dropping packet,
1742 * cause by bad thread protection when adding packet in queue
1743 * was later discovered and fixed. Security below kept.
1744 *
1745 * DateTime RecvRate LostRate DropRate AvailBw RTT RecvBufs PdDelay
1746 * 2014-12-08T15:04:25-0500 4712 110 0 96509 33.710 393 450
1747 * 2014-12-08T15:04:35-0500 4512 95 0 107771 33.493 1496542976 200
1748 * 2014-12-08T15:04:40-0500 4213 106 3 107352 53.657 9499425 200
1749 * 2014-12-08T15:04:45-0500 4575 104 0 102194 53.614 59666 200
1750 * 2014-12-08T15:04:50-0500 4475 124 0 100543 53.526 505 200
1751 */
1752 if (endstamp > startstamp)
1753 timespan = count_milliseconds(endstamp - startstamp);
1754 }
1755 /*
1756 * Timespan can be less then 1000 us (1 ms) if few packets.
1757 * Also, if there is only one pkt in buffer, the time difference will be 0.
1758 * Therefore, always add 1 ms if not empty.
1759 */
1760 if (0 < m_iAckedPktsCount)
1761 timespan += 1;
1762 }
1763 }
1764 HLOGF(brlog.Debug, "getRcvDataSize: %6d %6d %6d ms\n", m_iAckedPktsCount, m_iAckedBytesCount, timespan);
1765 bytes = m_iAckedBytesCount;
1766 return m_iAckedPktsCount;
1767 }
1768
getRcvAvgPayloadSize() const1769 unsigned CRcvBuffer::getRcvAvgPayloadSize() const
1770 {
1771 return m_uAvgPayloadSz;
1772 }
1773
debugGetReadingState() const1774 CRcvBuffer::ReadingState CRcvBuffer::debugGetReadingState() const
1775 {
1776 ReadingState readstate;
1777
1778 readstate.iNumAcknowledged = 0;
1779 readstate.iNumUnacknowledged = m_iMaxPos;
1780
1781 if ((NULL != m_pUnit[m_iStartPos]) && (m_pUnit[m_iStartPos]->m_iFlag == CUnit::GOOD))
1782 {
1783 if (m_tsbpd.isEnabled())
1784 readstate.tsStart = m_tsbpd.getPktTsbPdTime(m_pUnit[m_iStartPos]->m_Packet.getMsgTimeStamp());
1785
1786 readstate.iNumAcknowledged = m_iLastAckPos > m_iStartPos
1787 ? m_iLastAckPos - m_iStartPos
1788 : m_iLastAckPos + (m_iSize - m_iStartPos);
1789 }
1790
1791 // All further stats are valid if TSBPD is enabled.
1792 if (!m_tsbpd.isEnabled())
1793 return readstate;
1794
1795 // m_iLastAckPos points to the first unacknowledged packet
1796 const int iLastAckPos = (m_iLastAckPos - 1) % m_iSize;
1797 if (m_iLastAckPos != m_iStartPos && (NULL != m_pUnit[iLastAckPos]) && (m_pUnit[iLastAckPos]->m_iFlag == CUnit::GOOD))
1798 {
1799 readstate.tsLastAck = m_tsbpd.getPktTsbPdTime(m_pUnit[iLastAckPos]->m_Packet.getMsgTimeStamp());
1800 }
1801
1802 const int iEndPos = (m_iLastAckPos + m_iMaxPos - 1) % m_iSize;
1803 if (m_iMaxPos == 0)
1804 {
1805 readstate.tsEnd = readstate.tsLastAck;
1806 }
1807 else if ((NULL != m_pUnit[iEndPos]) && (m_pUnit[iEndPos]->m_iFlag == CUnit::GOOD))
1808 {
1809 readstate.tsEnd = m_tsbpd.getPktTsbPdTime(m_pUnit[iEndPos]->m_Packet.getMsgTimeStamp());
1810 }
1811
1812 return readstate;
1813 }
1814
strFullnessState(const time_point & tsNow) const1815 string CRcvBuffer::strFullnessState(const time_point& tsNow) const
1816 {
1817 const ReadingState bufstate = debugGetReadingState();
1818 stringstream ss;
1819
1820 ss << "Space avail " << getAvailBufSize() << "/" << m_iSize;
1821 ss << " pkts. Packets ACKed: " << bufstate.iNumAcknowledged;
1822 if (!is_zero(bufstate.tsStart) && !is_zero(bufstate.tsLastAck))
1823 {
1824 ss << " (TSBPD ready in ";
1825 ss << count_milliseconds(bufstate.tsStart - tsNow);
1826 ss << " : ";
1827 ss << count_milliseconds(bufstate.tsLastAck - tsNow);
1828 ss << " ms)";
1829 }
1830
1831 ss << ", not ACKed: " << bufstate.iNumUnacknowledged;
1832 if (!is_zero(bufstate.tsStart) && !is_zero(bufstate.tsEnd))
1833 {
1834 ss << ", timespan ";
1835 ss << count_milliseconds(bufstate.tsEnd - bufstate.tsStart);
1836 ss << " ms";
1837 }
1838
1839 ss << ". " SRT_SYNC_CLOCK_STR " drift " << getDrift() / 1000 << " ms.";
1840 return ss.str();
1841 }
1842
dropMsg(int32_t msgno,bool using_rexmit_flag)1843 void CRcvBuffer::dropMsg(int32_t msgno, bool using_rexmit_flag)
1844 {
1845 for (int i = m_iStartPos, n = shift(m_iLastAckPos, m_iMaxPos); i != n; i = shiftFwd(i))
1846 if ((m_pUnit[i] != NULL) && (m_pUnit[i]->m_Packet.getMsgSeq(using_rexmit_flag) == msgno))
1847 m_pUnit[i]->m_iFlag = CUnit::DROPPED;
1848 }
1849
applyGroupTime(const steady_clock::time_point & timebase,bool wrp,uint32_t delay,const steady_clock::duration & udrift)1850 void CRcvBuffer::applyGroupTime(const steady_clock::time_point& timebase,
1851 bool wrp,
1852 uint32_t delay,
1853 const steady_clock::duration& udrift)
1854 {
1855 m_tsbpd.applyGroupTime(timebase, wrp, delay, udrift);
1856 }
1857
applyGroupDrift(const steady_clock::time_point & timebase,bool wrp,const steady_clock::duration & udrift)1858 void CRcvBuffer::applyGroupDrift(const steady_clock::time_point& timebase,
1859 bool wrp,
1860 const steady_clock::duration& udrift)
1861 {
1862 m_tsbpd.applyGroupDrift(timebase, wrp, udrift);
1863 }
1864
getInternalTimeBase(steady_clock::time_point & w_timebase,bool & w_wrp,steady_clock::duration & w_udrift)1865 void CRcvBuffer::getInternalTimeBase(steady_clock::time_point& w_timebase, bool& w_wrp, steady_clock::duration& w_udrift)
1866 {
1867 return m_tsbpd.getInternalTimeBase(w_timebase, w_wrp, w_udrift);
1868 }
1869
getPktTsbPdTime(uint32_t usPktTimestamp)1870 steady_clock::time_point CRcvBuffer::getPktTsbPdTime(uint32_t usPktTimestamp)
1871 {
1872 // Updating TSBPD time here is not very accurate and prevents from making the function constant.
1873 // For now preserving the existing behavior.
1874 m_tsbpd.updateTsbPdTimeBase(usPktTimestamp);
1875 return m_tsbpd.getPktTsbPdTime(usPktTimestamp);
1876 }
1877
setRcvTsbPdMode(const steady_clock::time_point & timebase,const steady_clock::duration & delay)1878 void CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const steady_clock::duration& delay)
1879 {
1880 const bool no_wrap_check = false;
1881 m_tsbpd.setTsbPdMode(timebase, no_wrap_check, delay);
1882 }
1883
addRcvTsbPdDriftSample(uint32_t timestamp_us,int rtt)1884 bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, int rtt)
1885 {
1886 return m_tsbpd.addDriftSample(timestamp_us, rtt);
1887 }
1888
readMsg(char * data,int len)1889 int CRcvBuffer::readMsg(char* data, int len)
1890 {
1891 SRT_MSGCTRL dummy = srt_msgctrl_default;
1892 return readMsg(data, len, (dummy), -1);
1893 }
1894
1895 // NOTE: The order of ref-arguments is odd because:
1896 // - data and len shall be close to one another
1897 // - upto is last because it's a kind of unusual argument that has a default value
readMsg(char * data,int len,SRT_MSGCTRL & w_msgctl,int upto)1898 int CRcvBuffer::readMsg(char* data, int len, SRT_MSGCTRL& w_msgctl, int upto)
1899 {
1900 int p = -1, q = -1;
1901 bool passack;
1902
1903 bool empty = accessMsg((p), (q), (passack), (w_msgctl.srctime), upto);
1904 if (empty)
1905 return 0;
1906
1907 // This should happen just once. By 'empty' condition
1908 // we have a guarantee that m_pUnit[p] exists and is valid.
1909 CPacket& pkt1 = m_pUnit[p]->m_Packet;
1910
1911 // This returns the sequence number and message number to
1912 // the API caller.
1913 w_msgctl.pktseq = pkt1.getSeqNo();
1914 w_msgctl.msgno = pkt1.getMsgSeq();
1915
1916 return extractData((data), len, p, q, passack);
1917 }
1918
1919 #ifdef SRT_DEBUG_TSBPD_OUTJITTER
debugTraceJitter(time_point playtime)1920 void CRcvBuffer::debugTraceJitter(time_point playtime)
1921 {
1922 uint64_t ms = count_microseconds(steady_clock::now() - playtime);
1923 if (ms / 10 < 10)
1924 m_ulPdHisto[0][ms / 10]++;
1925 else if (ms / 100 < 10)
1926 m_ulPdHisto[1][ms / 100]++;
1927 else if (ms / 1000 < 10)
1928 m_ulPdHisto[2][ms / 1000]++;
1929 else
1930 m_ulPdHisto[3][1]++;
1931 }
1932 #endif /* SRT_DEBUG_TSBPD_OUTJITTER */
1933
accessMsg(int & w_p,int & w_q,bool & w_passack,int64_t & w_playtime,int upto)1934 bool CRcvBuffer::accessMsg(int& w_p, int& w_q, bool& w_passack, int64_t& w_playtime, int upto)
1935 {
1936 // This function should do the following:
1937 // 1. Find the first packet starting the next message (or just next packet)
1938 // 2. When found something ready for extraction, return true.
1939 // 3. w_p and w_q point the index range for extraction
1940 // 4. passack decides if this range shall be removed after extraction
1941
1942 bool empty = true;
1943
1944 if (m_tsbpd.isEnabled())
1945 {
1946 w_passack = false;
1947 int seq = 0;
1948
1949 steady_clock::time_point play_time;
1950 const bool isReady = getRcvReadyMsg(play_time, (seq), upto);
1951 w_playtime = count_microseconds(play_time.time_since_epoch());
1952
1953 if (isReady)
1954 {
1955 empty = false;
1956 // In TSBPD mode you always read one message
1957 // at a time and a message always fits in one UDP packet,
1958 // so in one "unit".
1959 w_p = w_q = m_iStartPos;
1960
1961 debugTraceJitter(play_time);
1962 }
1963 }
1964 else
1965 {
1966 w_playtime = 0;
1967 if (scanMsg((w_p), (w_q), (w_passack)))
1968 empty = false;
1969 }
1970
1971 return empty;
1972 }
1973
extractData(char * data,int len,int p,int q,bool passack)1974 int CRcvBuffer::extractData(char* data, int len, int p, int q, bool passack)
1975 {
1976 SRT_ASSERT(len > 0);
1977 int rs = len > 0 ? len : 0;
1978 const int past_q = shiftFwd(q);
1979 while (p != past_q)
1980 {
1981 const int pktlen = (int)m_pUnit[p]->m_Packet.getLength();
1982 // When unitsize is less than pktlen, only a fragment is copied to the output 'data',
1983 // but still the whole packet is removed from the receiver buffer.
1984 if (pktlen > 0)
1985 countBytes(-1, -pktlen, true);
1986
1987 const int unitsize = ((rs >= 0) && (pktlen > rs)) ? rs : pktlen;
1988
1989 HLOGC(brlog.Debug, log << "readMsg: checking unit POS=" << p);
1990
1991 if (unitsize > 0)
1992 {
1993 memcpy((data), m_pUnit[p]->m_Packet.m_pcData, unitsize);
1994 data += unitsize;
1995 rs -= unitsize;
1996 IF_HEAVY_LOGGING(readMsgHeavyLogging(p));
1997 }
1998 else
1999 {
2000 HLOGC(brlog.Debug, log << CONID() << "readMsg: SKIPPED POS=" << p << " - ZERO SIZE UNIT");
2001 }
2002
2003 // Note special case for live mode (one packet per message and TSBPD=on):
2004 // - p == q (that is, this loop passes only once)
2005 // - no passack (the unit is always removed from the buffer)
2006 if (!passack)
2007 {
2008 HLOGC(brlog.Debug, log << CONID() << "readMsg: FREEING UNIT POS=" << p);
2009 freeUnitAt(p);
2010 }
2011 else
2012 {
2013 HLOGC(brlog.Debug, log << CONID() << "readMsg: PASSACK UNIT POS=" << p);
2014 m_pUnit[p]->m_iFlag = CUnit::PASSACK;
2015 }
2016
2017 p = shiftFwd(p);
2018 }
2019
2020 if (!passack)
2021 m_iStartPos = past_q;
2022
2023 HLOGC(brlog.Debug,
2024 log << "rcvBuf/extractData: begin=" << m_iStartPos << " reporting extraction size=" << (len - rs));
2025
2026 return len - rs;
2027 }
2028
debugTimeState(size_t first_n_pkts) const2029 string CRcvBuffer::debugTimeState(size_t first_n_pkts) const
2030 {
2031 stringstream ss;
2032 int ipos = m_iStartPos;
2033 for (size_t i = 0; i < first_n_pkts; ++i, ipos = CSeqNo::incseq(ipos))
2034 {
2035 const CUnit* unit = m_pUnit[ipos];
2036 if (!unit)
2037 {
2038 ss << "pkt[" << i << "] missing, ";
2039 continue;
2040 }
2041
2042 const CPacket& pkt = unit->m_Packet;
2043 ss << "pkt[" << i << "] ts=" << pkt.getMsgTimeStamp() << ", ";
2044 }
2045 return ss.str();
2046 }
2047
2048 #if ENABLE_HEAVY_LOGGING
readMsgHeavyLogging(int p)2049 void CRcvBuffer::readMsgHeavyLogging(int p)
2050 {
2051 static steady_clock::time_point prev_now;
2052 static steady_clock::time_point prev_srctime;
2053 const CPacket& pkt = m_pUnit[p]->m_Packet;
2054
2055 const int32_t seq = pkt.m_iSeqNo;
2056
2057 steady_clock::time_point nowtime = steady_clock::now();
2058 steady_clock::time_point srctime = getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp());
2059
2060 const int64_t timediff_ms = count_milliseconds(nowtime - srctime);
2061 const int64_t nowdiff_ms = is_zero(prev_now) ? count_milliseconds(nowtime - prev_now) : 0;
2062 const int64_t srctimediff_ms = is_zero(prev_srctime) ? count_milliseconds(srctime - prev_srctime) : 0;
2063
2064 const int next_p = shiftFwd(p);
2065 CUnit* u = m_pUnit[next_p];
2066 string next_playtime;
2067 if (u && u->m_iFlag == CUnit::GOOD)
2068 {
2069 next_playtime = FormatTime(getPktTsbPdTime(u->m_Packet.getMsgTimeStamp()));
2070 }
2071 else
2072 {
2073 next_playtime = "NONE";
2074 }
2075
2076 LOGC(brlog.Debug,
2077 log << CONID() << "readMsg: DELIVERED seq=" << seq << " T=" << FormatTime(srctime) << " in " << timediff_ms
2078 << "ms - TIME-PREVIOUS: PKT: " << srctimediff_ms << " LOCAL: " << nowdiff_ms << " !"
2079 << BufferStamp(pkt.data(), pkt.size()) << " NEXT pkt T=" << next_playtime);
2080
2081 prev_now = nowtime;
2082 prev_srctime = srctime;
2083 }
2084 #endif
2085
scanMsg(int & w_p,int & w_q,bool & w_passack)2086 bool CRcvBuffer::scanMsg(int& w_p, int& w_q, bool& w_passack)
2087 {
2088 // empty buffer
2089 if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0))
2090 {
2091 HLOGC(brlog.Debug, log << "scanMsg: empty buffer");
2092 return false;
2093 }
2094
2095 int rmpkts = 0;
2096 int rmbytes = 0;
2097 // skip all bad msgs at the beginning
2098 // This loop rolls until the "buffer is empty" (head == tail),
2099 // in particular, there's no unit accessible for the reader.
2100 while (m_iStartPos != m_iLastAckPos)
2101 {
2102 // Roll up to the first valid unit
2103 if (!m_pUnit[m_iStartPos])
2104 {
2105 if (++m_iStartPos == m_iSize)
2106 m_iStartPos = 0;
2107 continue;
2108 }
2109
2110 // Note: PB_FIRST | PB_LAST == PB_SOLO.
2111 // testing if boundary() & PB_FIRST tests if the msg is first OR solo.
2112 if (m_pUnit[m_iStartPos]->m_iFlag == CUnit::GOOD && m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() & PB_FIRST)
2113 {
2114 bool good = true;
2115
2116 // look ahead for the whole message
2117
2118 // We expect to see either of:
2119 // [PB_FIRST] [PB_SUBSEQUENT] [PB_SUBSEQUENT] [PB_LAST]
2120 // [PB_SOLO]
2121 // but not:
2122 // [PB_FIRST] NULL ...
2123 // [PB_FIRST] FREE/PASSACK/DROPPED...
2124 // If the message didn't look as expected, interrupt this.
2125
2126 // This begins with a message starting at m_iStartPos
2127 // up to m_iLastAckPos OR until the PB_LAST message is found.
2128 // If any of the units on this way isn't good, this OUTER loop
2129 // will be interrupted.
2130 for (int i = m_iStartPos; i != m_iLastAckPos;)
2131 {
2132 if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD)
2133 {
2134 good = false;
2135 break;
2136 }
2137
2138 // Likewise, boundary() & PB_LAST will be satisfied for last OR solo.
2139 if (m_pUnit[i]->m_Packet.getMsgBoundary() & PB_LAST)
2140 break;
2141
2142 if (++i == m_iSize)
2143 i = 0;
2144 }
2145
2146 if (good)
2147 break;
2148 }
2149
2150 rmpkts++;
2151 rmbytes += (int) freeUnitAt((size_t) m_iStartPos);
2152
2153 m_iStartPos = shiftFwd(m_iStartPos);
2154 }
2155 /* we removed bytes form receive buffer */
2156 countBytes(-rmpkts, -rmbytes, true);
2157
2158 // Not sure if this is correct, but this above 'while' loop exits
2159 // under the following conditions only:
2160 // - m_iStartPos == m_iLastAckPos (that makes passack = true)
2161 // - found at least GOOD unit with PB_FIRST and not all messages up to PB_LAST are good,
2162 // in which case it returns with m_iStartPos <% m_iLastAckPos (earlier)
2163 // Also all units that lied before m_iStartPos are removed.
2164
2165 w_p = -1; // message head
2166 w_q = m_iStartPos; // message tail
2167 w_passack = m_iStartPos == m_iLastAckPos;
2168 bool found = false;
2169
2170 // looking for the first message
2171 //>>m_pUnit[size + m_iMaxPos] is not valid
2172
2173 // XXX Would be nice to make some very thorough refactoring here.
2174
2175 // This rolls by q variable from m_iStartPos up to m_iLastAckPos,
2176 // actually from the first message up to the one with PB_LAST
2177 // or PB_SOLO boundary.
2178
2179 // The 'i' variable used in this loop is just a stub and it's
2180 // even hard to define the unit here. It is "shift towards
2181 // m_iStartPos", so the upper value is m_iMaxPos + size.
2182 // m_iMaxPos is itself relative to m_iLastAckPos, so
2183 // the upper value is m_iMaxPos + difference between
2184 // m_iLastAckPos and m_iStartPos, so that this value is relative
2185 // to m_iStartPos.
2186 //
2187 // The 'i' value isn't used anywhere, although the 'q' value rolls
2188 // in this loop in sync with 'i', with the difference that 'q' is
2189 // wrapped around, and 'i' is just incremented normally.
2190 //
2191 // This makes that this loop rolls in the range by 'q' from
2192 // m_iStartPos to m_iStartPos + UPPER,
2193 // where UPPER = m_iLastAckPos -% m_iStartPos + m_iMaxPos
2194 // This embraces the range from the current reading head up to
2195 // the last packet ever received.
2196 //
2197 // 'passack' is set to true when the 'q' has passed through
2198 // the border of m_iLastAckPos and fallen into the range
2199 // of unacknowledged packets.
2200
2201 for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i < n; ++i)
2202 {
2203 if (m_pUnit[w_q] && m_pUnit[w_q]->m_iFlag == CUnit::GOOD)
2204 {
2205 // Equivalent pseudocode:
2206 // PacketBoundary bound = m_pUnit[w_q]->m_Packet.getMsgBoundary();
2207 // if ( IsSet(bound, PB_FIRST) )
2208 // w_p = w_q;
2209 // if ( IsSet(bound, PB_LAST) && w_p != -1 )
2210 // found = true;
2211 //
2212 // Not implemented this way because it uselessly check w_p for -1
2213 // also after setting it explicitly.
2214
2215 switch (m_pUnit[w_q]->m_Packet.getMsgBoundary())
2216 {
2217 case PB_SOLO: // 11
2218 w_p = w_q;
2219 found = true;
2220 break;
2221
2222 case PB_FIRST: // 10
2223 w_p = w_q;
2224 break;
2225
2226 case PB_LAST: // 01
2227 if (w_p != -1)
2228 found = true;
2229 break;
2230
2231 case PB_SUBSEQUENT:; // do nothing (caught first, rolling for last)
2232 }
2233 }
2234 else
2235 {
2236 // a hole in this message, not valid, restart search
2237 w_p = -1;
2238 }
2239
2240 // 'found' is set when the current iteration hit a message with PB_LAST
2241 // (including PB_SOLO since the very first message).
2242 if (found)
2243 {
2244 // the msg has to be ack'ed or it is allowed to read out of order, and was not read before
2245 if (!w_passack || !m_pUnit[w_q]->m_Packet.getMsgOrderFlag())
2246 {
2247 HLOGC(brlog.Debug, log << "scanMsg: found next-to-broken message, delivering OUT OF ORDER.");
2248 break;
2249 }
2250
2251 found = false;
2252 }
2253
2254 if (++w_q == m_iSize)
2255 w_q = 0;
2256
2257 if (w_q == m_iLastAckPos)
2258 w_passack = true;
2259 }
2260
2261 // no msg found
2262 if (!found)
2263 {
2264 // NOTE:
2265 // This situation may only happen if:
2266 // - Found a packet with PB_FIRST, so w_p = w_q at the moment when it was found
2267 // - Possibly found following components of that message up to shifted w_q
2268 // - Found no terminal packet (PB_LAST) for that message.
2269
2270 // if the message is larger than the receiver buffer, return part of the message
2271 if ((w_p != -1) && (shiftFwd(w_q) == w_p))
2272 {
2273 HLOGC(brlog.Debug, log << "scanMsg: BUFFER FULL and message is INCOMPLETE. Returning PARTIAL MESSAGE.");
2274 found = true;
2275 }
2276 else
2277 {
2278 HLOGC(brlog.Debug, log << "scanMsg: PARTIAL or NO MESSAGE found: p=" << w_p << " q=" << w_q);
2279 }
2280 }
2281 else
2282 {
2283 HLOGC(brlog.Debug,
2284 log << "scanMsg: extracted message p=" << w_p << " q=" << w_q << " ("
2285 << ((w_q - w_p + m_iSize + 1) % m_iSize) << " packets)");
2286 }
2287
2288 return found;
2289 }
2290