1 /*
2  * SRT - Secure, Reliable, Transport
3  * Copyright (c) 2018 Haivision Systems Inc.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  *
9  */
10 
11 /*****************************************************************************
12 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
13 All rights reserved.
14 
15 Redistribution and use in source and binary forms, with or without
16 modification, are permitted provided that the following conditions are
17 met:
18 
19 * Redistributions of source code must retain the above
20   copyright notice, this list of conditions and the
21   following disclaimer.
22 
23 * Redistributions in binary form must reproduce the
24   above copyright notice, this list of conditions
25   and the following disclaimer in the documentation
26   and/or other materials provided with the distribution.
27 
28 * Neither the name of the University of Illinois
29   nor the names of its contributors may be used to
30   endorse or promote products derived from this
31   software without specific prior written permission.
32 
33 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
34 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
35 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
36 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
37 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
38 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
39 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
40 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
41 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
42 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
43 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
44 *****************************************************************************/
45 
46 /*****************************************************************************
47 written by
48    Yunhong Gu, last updated 03/12/2011
49 modified by
50    Haivision Systems Inc.
51 *****************************************************************************/
52 
53 #include "platform_sys.h"
54 
55 #include <cstring>
56 #include <cmath>
57 #include "buffer.h"
58 #include "packet.h"
59 #include "core.h" // provides some constants
60 #include "logging.h"
61 
62 using namespace std;
63 using namespace srt_logging;
64 using namespace srt;
65 using namespace srt::sync;
66 
67 // You can change this value at build config by using "ENFORCE" options.
68 #if !defined(SRT_MAVG_SAMPLING_RATE)
69 #define SRT_MAVG_SAMPLING_RATE 40
70 #endif
71 
isTimeToUpdate(const time_point & now) const72 bool AvgBufSize::isTimeToUpdate(const time_point& now) const
73 {
74     const int      usMAvgBasePeriod = 1000000; // 1s in microseconds
75     const int      us2ms            = 1000;
76     const int      msMAvgPeriod     = (usMAvgBasePeriod / SRT_MAVG_SAMPLING_RATE) / us2ms;
77     const uint64_t elapsed_ms       = count_milliseconds(now - m_tsLastSamplingTime); // ms since last sampling
78     return (elapsed_ms >= msMAvgPeriod);
79 }
80 
update(const steady_clock::time_point & now,int pkts,int bytes,int timespan_ms)81 void AvgBufSize::update(const steady_clock::time_point& now, int pkts, int bytes, int timespan_ms)
82 {
83     const uint64_t elapsed_ms       = count_milliseconds(now - m_tsLastSamplingTime); // ms since last sampling
84     m_tsLastSamplingTime            = now;
85     const uint64_t one_second_in_ms = 1000;
86     if (elapsed_ms > one_second_in_ms)
87     {
88         // No sampling in last 1 sec, initialize average
89         m_dCountMAvg      = pkts;
90         m_dBytesCountMAvg = bytes;
91         m_dTimespanMAvg   = timespan_ms;
92         return;
93     }
94 
95     //
96     // weight last average value between -1 sec and last sampling time (LST)
97     // and new value between last sampling time and now
98     //                                      |elapsed_ms|
99     //   +----------------------------------+-------+
100     //  -1                                 LST      0(now)
101     //
102     m_dCountMAvg      = avg_iir_w<1000, double>(m_dCountMAvg, pkts, elapsed_ms);
103     m_dBytesCountMAvg = avg_iir_w<1000, double>(m_dBytesCountMAvg, bytes, elapsed_ms);
104     m_dTimespanMAvg   = avg_iir_w<1000, double>(m_dTimespanMAvg, timespan_ms, elapsed_ms);
105 }
106 
round_val(double val)107 int round_val(double val)
108 {
109     return static_cast<int>(round(val));
110 }
111 
CSndBuffer(int size,int mss)112 CSndBuffer::CSndBuffer(int size, int mss)
113     : m_BufLock()
114     , m_pBlock(NULL)
115     , m_pFirstBlock(NULL)
116     , m_pCurrBlock(NULL)
117     , m_pLastBlock(NULL)
118     , m_pBuffer(NULL)
119     , m_iNextMsgNo(1)
120     , m_iSize(size)
121     , m_iMSS(mss)
122     , m_iCount(0)
123     , m_iBytesCount(0)
124     , m_iInRatePktsCount(0)
125     , m_iInRateBytesCount(0)
126     , m_InRatePeriod(INPUTRATE_FAST_START_US) // 0.5 sec (fast start)
127     , m_iInRateBps(INPUTRATE_INITIAL_BYTESPS)
128 {
129     // initial physical buffer of "size"
130     m_pBuffer           = new Buffer;
131     m_pBuffer->m_pcData = new char[m_iSize * m_iMSS];
132     m_pBuffer->m_iSize  = m_iSize;
133     m_pBuffer->m_pNext  = NULL;
134 
135     // circular linked list for out bound packets
136     m_pBlock  = new Block;
137     Block* pb = m_pBlock;
138     for (int i = 1; i < m_iSize; ++i)
139     {
140         pb->m_pNext        = new Block;
141         pb->m_iMsgNoBitset = 0;
142         pb                 = pb->m_pNext;
143     }
144     pb->m_pNext = m_pBlock;
145 
146     pb       = m_pBlock;
147     char* pc = m_pBuffer->m_pcData;
148     for (int i = 0; i < m_iSize; ++i)
149     {
150         pb->m_pcData = pc;
151         pb           = pb->m_pNext;
152         pc += m_iMSS;
153     }
154 
155     m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;
156 
157     setupMutex(m_BufLock, "Buf");
158 }
159 
~CSndBuffer()160 CSndBuffer::~CSndBuffer()
161 {
162     Block* pb = m_pBlock->m_pNext;
163     while (pb != m_pBlock)
164     {
165         Block* temp = pb;
166         pb          = pb->m_pNext;
167         delete temp;
168     }
169     delete m_pBlock;
170 
171     while (m_pBuffer != NULL)
172     {
173         Buffer* temp = m_pBuffer;
174         m_pBuffer    = m_pBuffer->m_pNext;
175         delete[] temp->m_pcData;
176         delete temp;
177     }
178 
179     releaseMutex(m_BufLock);
180 }
181 
addBuffer(const char * data,int len,SRT_MSGCTRL & w_mctrl)182 void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
183 {
184     int32_t&  w_msgno   = w_mctrl.msgno;
185     int32_t&  w_seqno   = w_mctrl.pktseq;
186     int64_t& w_srctime  = w_mctrl.srctime;
187     const int& ttl      = w_mctrl.msgttl;
188     int       size      = len / m_iMSS;
189     if ((len % m_iMSS) != 0)
190         size++;
191 
192     HLOGC(bslog.Debug,
193           log << "addBuffer: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for "
194               << len << " bytes");
195 
196     // dynamically increase sender buffer
197     while (size + m_iCount >= m_iSize)
198     {
199         HLOGC(bslog.Debug, log << "addBuffer: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
200         increase();
201     }
202 
203     const steady_clock::time_point time = steady_clock::now();
204     const int32_t inorder = w_mctrl.inorder ? MSGNO_PACKET_INORDER::mask : 0;
205 
206     HLOGC(bslog.Debug,
207           log << CONID() << "addBuffer: adding " << size << " packets (" << len << " bytes) to send, msgno="
208               << (w_msgno > 0 ? w_msgno : m_iNextMsgNo) << (inorder ? "" : " NOT") << " in order");
209 
210     // The sequence number passed to this function is the sequence number
211     // that the very first packet from the packet series should get here.
212     // If there's more than one packet, this function must increase it by itself
213     // and then return the accordingly modified sequence number in the reference.
214 
215     Block* s = m_pLastBlock;
216 
217     if (w_msgno == SRT_MSGNO_NONE) // DEFAULT-UNCHANGED msgno supplied
218     {
219         HLOGC(bslog.Debug, log << "addBuffer: using internally managed msgno=" << m_iNextMsgNo);
220         w_msgno = m_iNextMsgNo;
221     }
222     else
223     {
224         HLOGC(bslog.Debug, log << "addBuffer: OVERWRITTEN by msgno supplied by caller: msgno=" << w_msgno);
225         m_iNextMsgNo = w_msgno;
226     }
227 
228     for (int i = 0; i < size; ++i)
229     {
230         int pktlen = len - i * m_iMSS;
231         if (pktlen > m_iMSS)
232             pktlen = m_iMSS;
233 
234         HLOGC(bslog.Debug,
235               log << "addBuffer: %" << w_seqno << " #" << w_msgno << " spreading from=" << (i * m_iMSS)
236                   << " size=" << pktlen << " TO BUFFER:" << (void*)s->m_pcData);
237         memcpy((s->m_pcData), data + i * m_iMSS, pktlen);
238         s->m_iLength = pktlen;
239 
240         s->m_iSeqNo = w_seqno;
241         w_seqno     = CSeqNo::incseq(w_seqno);
242 
243         s->m_iMsgNoBitset = m_iNextMsgNo | inorder;
244         if (i == 0)
245             s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
246         if (i == size - 1)
247             s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
248         // NOTE: if i is neither 0 nor size-1, it resuls with PB_SUBSEQUENT.
249         //       if i == 0 == size-1, it results with PB_SOLO.
250         // Packets assigned to one message can be:
251         // [PB_FIRST] [PB_SUBSEQUENT] [PB_SUBSEQUENT] [PB_LAST] - 4 packets per message
252         // [PB_FIRST] [PB_LAST] - 2 packets per message
253         // [PB_SOLO] - 1 packet per message
254 
255         s->m_llSourceTime_us = w_srctime;
256         s->m_tsOriginTime = time;
257         s->m_tsRexmitTime = time_point();
258         s->m_iTTL = ttl;
259         // Rewrite the actual sending time back into w_srctime
260         // so that the calling facilities can reuse it
261         if (!w_srctime)
262             w_srctime = count_microseconds(s->m_tsOriginTime.time_since_epoch());
263 
264         // XXX unchecked condition: s->m_pNext == NULL.
265         // Should never happen, as the call to increase() should ensure enough buffers.
266         SRT_ASSERT(s->m_pNext);
267         s = s->m_pNext;
268     }
269     m_pLastBlock = s;
270 
271     enterCS(m_BufLock);
272     m_iCount += size;
273 
274     m_iBytesCount += len;
275     m_tsLastOriginTime = time;
276 
277     updateInputRate(time, size, len);
278 
279     updAvgBufSize(time);
280 
281     leaveCS(m_BufLock);
282 
283     // MSGNO_SEQ::mask has a form: 00000011111111...
284     // At least it's known that it's from some index inside til the end (to bit 0).
285     // If this value has been reached in a step of incrementation, it means that the
286     // maximum value has been reached. Casting to int32_t to ensure the same sign
287     // in comparison, although it's far from reaching the sign bit.
288 
289     const int nextmsgno = ++MsgNo(m_iNextMsgNo);
290     HLOGC(bslog.Debug, log << "CSndBuffer::addBuffer: updating msgno: #" << m_iNextMsgNo << " -> #" << nextmsgno);
291     m_iNextMsgNo = nextmsgno;
292 }
293 
setInputRateSmpPeriod(int period)294 void CSndBuffer::setInputRateSmpPeriod(int period)
295 {
296     m_InRatePeriod = (uint64_t)period; //(usec) 0=no input rate calculation
297 }
298 
updateInputRate(const steady_clock::time_point & time,int pkts,int bytes)299 void CSndBuffer::updateInputRate(const steady_clock::time_point& time, int pkts, int bytes)
300 {
301     // no input rate calculation
302     if (m_InRatePeriod == 0)
303         return;
304 
305     if (is_zero(m_tsInRateStartTime))
306     {
307         m_tsInRateStartTime = time;
308         return;
309     }
310 
311     m_iInRatePktsCount += pkts;
312     m_iInRateBytesCount += bytes;
313 
314     // Trigger early update in fast start mode
315     const bool early_update = (m_InRatePeriod < INPUTRATE_RUNNING_US) && (m_iInRatePktsCount > INPUTRATE_MAX_PACKETS);
316 
317     const uint64_t period_us = count_microseconds(time - m_tsInRateStartTime);
318     if (early_update || period_us > m_InRatePeriod)
319     {
320         // Required Byte/sec rate (payload + headers)
321         m_iInRateBytesCount += (m_iInRatePktsCount * srt::CPacket::SRT_DATA_HDR_SIZE);
322         m_iInRateBps = (int)(((int64_t)m_iInRateBytesCount * 1000000) / period_us);
323         HLOGC(bslog.Debug,
324               log << "updateInputRate: pkts:" << m_iInRateBytesCount << " bytes:" << m_iInRatePktsCount
325                   << " rate=" << (m_iInRateBps * 8) / 1000 << "kbps interval=" << period_us);
326         m_iInRatePktsCount  = 0;
327         m_iInRateBytesCount = 0;
328         m_tsInRateStartTime = time;
329 
330         setInputRateSmpPeriod(INPUTRATE_RUNNING_US);
331     }
332 }
333 
addBufferFromFile(fstream & ifs,int len)334 int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
335 {
336     int size = len / m_iMSS;
337     if ((len % m_iMSS) != 0)
338         size++;
339 
340     HLOGC(bslog.Debug,
341           log << "addBufferFromFile: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size
342               << " buffers for " << len << " bytes");
343 
344     // dynamically increase sender buffer
345     while (size + m_iCount >= m_iSize)
346     {
347         HLOGC(bslog.Debug,
348               log << "addBufferFromFile: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
349         increase();
350     }
351 
352     HLOGC(bslog.Debug,
353           log << CONID() << "addBufferFromFile: adding " << size << " packets (" << len
354               << " bytes) to send, msgno=" << m_iNextMsgNo);
355 
356     Block* s     = m_pLastBlock;
357     int    total = 0;
358     for (int i = 0; i < size; ++i)
359     {
360         if (ifs.bad() || ifs.fail() || ifs.eof())
361             break;
362 
363         int pktlen = len - i * m_iMSS;
364         if (pktlen > m_iMSS)
365             pktlen = m_iMSS;
366 
367         HLOGC(bslog.Debug,
368               log << "addBufferFromFile: reading from=" << (i * m_iMSS) << " size=" << pktlen
369                   << " TO BUFFER:" << (void*)s->m_pcData);
370         ifs.read(s->m_pcData, pktlen);
371         if ((pktlen = int(ifs.gcount())) <= 0)
372             break;
373 
374         // currently file transfer is only available in streaming mode, message is always in order, ttl = infinite
375         s->m_iMsgNoBitset = m_iNextMsgNo | MSGNO_PACKET_INORDER::mask;
376         if (i == 0)
377             s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
378         if (i == size - 1)
379             s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
380         // NOTE: PB_FIRST | PB_LAST == PB_SOLO.
381         // none of PB_FIRST & PB_LAST == PB_SUBSEQUENT.
382 
383         s->m_iLength = pktlen;
384         s->m_iTTL    = SRT_MSGTTL_INF;
385         s            = s->m_pNext;
386 
387         total += pktlen;
388     }
389     m_pLastBlock = s;
390 
391     enterCS(m_BufLock);
392     m_iCount += size;
393     m_iBytesCount += total;
394 
395     leaveCS(m_BufLock);
396 
397     m_iNextMsgNo++;
398     if (m_iNextMsgNo == int32_t(MSGNO_SEQ::mask))
399         m_iNextMsgNo = 1;
400 
401     return total;
402 }
403 
getSourceTime(const CSndBuffer::Block & block)404 steady_clock::time_point CSndBuffer::getSourceTime(const CSndBuffer::Block& block)
405 {
406     if (block.m_llSourceTime_us)
407     {
408         return steady_clock::time_point() + microseconds_from(block.m_llSourceTime_us);
409     }
410 
411     return block.m_tsOriginTime;
412 }
413 
readData(srt::CPacket & w_packet,steady_clock::time_point & w_srctime,int kflgs)414 int CSndBuffer::readData(srt::CPacket& w_packet, steady_clock::time_point& w_srctime, int kflgs)
415 {
416     // No data to read
417     if (m_pCurrBlock == m_pLastBlock)
418         return 0;
419 
420     // Make the packet REFLECT the data stored in the buffer.
421     w_packet.m_pcData = m_pCurrBlock->m_pcData;
422     int readlen       = m_pCurrBlock->m_iLength;
423     w_packet.setLength(readlen);
424     w_packet.m_iSeqNo = m_pCurrBlock->m_iSeqNo;
425 
426     // XXX This is probably done because the encryption should happen
427     // just once, and so this sets the encryption flags to both msgno bitset
428     // IN THE PACKET and IN THE BLOCK. This is probably to make the encryption
429     // happen at the time when scheduling a new packet to send, but the packet
430     // must remain in the send buffer until it's ACKed. For the case of rexmit
431     // the packet will be taken "as is" (that is, already encrypted).
432     //
433     // The problem is in the order of things:
434     // 0. When the application stores the data, some of the flags for PH_MSGNO are set.
435     // 1. The readData() is called to get the original data sent by the application.
436     // 2. The data are original and must be encrypted. They WILL BE encrypted, later.
437     // 3. So far we are in readData() so the encryption flags must be updated NOW because
438     //    later we won't have access to the block's data.
439     // 4. After exiting from readData(), the packet is being encrypted. It's immediately
440     //    sent, however the data must remain in the sending buffer until they are ACKed.
441     // 5. In case when rexmission is needed, the second overloaded version of readData
442     //    is being called, and the buffer + PH_MSGNO value is extracted. All interesting
443     //    flags must be present and correct at that time.
444     //
445     // The only sensible way to fix this problem is to encrypt the packet not after
446     // extracting from here, but when the packet is stored into CSndBuffer. The appropriate
447     // flags for PH_MSGNO will be applied directly there. Then here the value for setting
448     // PH_MSGNO will be set as is.
449 
450     if (kflgs == -1)
451     {
452         HLOGC(bslog.Debug, log << CONID() << " CSndBuffer: ERROR: encryption required and not possible. NOT SENDING.");
453         readlen = 0;
454     }
455     else
456     {
457         m_pCurrBlock->m_iMsgNoBitset |= MSGNO_ENCKEYSPEC::wrap(kflgs);
458     }
459 
460     w_packet.m_iMsgNo = m_pCurrBlock->m_iMsgNoBitset;
461     w_srctime         = getSourceTime(*m_pCurrBlock);
462     m_pCurrBlock      = m_pCurrBlock->m_pNext;
463 
464     HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send");
465 
466     return readlen;
467 }
468 
getMsgNoAt(const int offset)469 int32_t CSndBuffer::getMsgNoAt(const int offset)
470 {
471     ScopedLock bufferguard(m_BufLock);
472 
473     Block* p = m_pFirstBlock;
474 
475     if (p)
476     {
477         HLOGC(bslog.Debug,
478               log << "CSndBuffer::getMsgNoAt: FIRST MSG: size=" << p->m_iLength << " %" << p->m_iSeqNo << " #"
479                   << p->getMsgSeq() << " !" << BufferStamp(p->m_pcData, p->m_iLength));
480     }
481 
482     if (offset >= m_iCount)
483     {
484         // Prevent accessing the last "marker" block
485         LOGC(bslog.Error,
486              log << "CSndBuffer::getMsgNoAt: IPE: offset=" << offset << " not found, max offset=" << m_iCount);
487         return SRT_MSGNO_CONTROL;
488     }
489 
490     // XXX Suboptimal procedure to keep the blocks identifiable
491     // by sequence number. Consider using some circular buffer.
492     int       i;
493     Block* ee SRT_ATR_UNUSED = 0;
494     for (i = 0; i < offset && p; ++i)
495     {
496         ee = p;
497         p  = p->m_pNext;
498     }
499 
500     if (!p)
501     {
502         LOGC(bslog.Error,
503              log << "CSndBuffer::getMsgNoAt: IPE: offset=" << offset << " not found, stopped at " << i << " with #"
504                  << (ee ? ee->getMsgSeq() : SRT_MSGNO_NONE));
505         return SRT_MSGNO_CONTROL;
506     }
507 
508     HLOGC(bslog.Debug,
509           log << "CSndBuffer::getMsgNoAt: offset=" << offset << " found, size=" << p->m_iLength << " %" << p->m_iSeqNo
510               << " #" << p->getMsgSeq() << " !" << BufferStamp(p->m_pcData, p->m_iLength));
511 
512     return p->getMsgSeq();
513 }
514 
readData(const int offset,srt::CPacket & w_packet,steady_clock::time_point & w_srctime,int & w_msglen)515 int CSndBuffer::readData(const int offset, srt::CPacket& w_packet, steady_clock::time_point& w_srctime, int& w_msglen)
516 {
517     int32_t& msgno_bitset = w_packet.m_iMsgNo;
518 
519     ScopedLock bufferguard(m_BufLock);
520 
521     Block* p = m_pFirstBlock;
522 
523     // XXX Suboptimal procedure to keep the blocks identifiable
524     // by sequence number. Consider using some circular buffer.
525     for (int i = 0; i < offset && p != m_pLastBlock; ++i)
526     {
527         p = p->m_pNext;
528     }
529     if (p == m_pLastBlock)
530     {
531         LOGC(qslog.Error, log << "CSndBuffer::readData: offset " << offset << " too large!");
532         return 0;
533     }
534 #if ENABLE_HEAVY_LOGGING
535     const int32_t first_seq = p->m_iSeqNo;
536     int32_t last_seq = p->m_iSeqNo;
537 #endif
538 
539     // Check if the block that is the next candidate to send (m_pCurrBlock pointing) is stale.
540 
541     // If so, then inform the caller that it should first take care of the whole
542     // message (all blocks with that message id). Shift the m_pCurrBlock pointer
543     // to the position past the last of them. Then return -1 and set the
544     // msgno_bitset return reference to the message id that should be dropped as
545     // a whole.
546 
547     // After taking care of that, the caller should immediately call this function again,
548     // this time possibly in order to find the real data to be sent.
549 
550     // if found block is stale
551     // (This is for messages that have declared TTL - messages that fail to be sent
552     // before the TTL defined time comes, will be dropped).
553 
554     if ((p->m_iTTL >= 0) && (count_milliseconds(steady_clock::now() - p->m_tsOriginTime) > p->m_iTTL))
555     {
556         int32_t msgno = p->getMsgSeq();
557         w_msglen      = 1;
558         p             = p->m_pNext;
559         bool move     = false;
560         while (p != m_pLastBlock && msgno == p->getMsgSeq())
561         {
562 #if ENABLE_HEAVY_LOGGING
563             last_seq = p->m_iSeqNo;
564 #endif
565             if (p == m_pCurrBlock)
566                 move = true;
567             p = p->m_pNext;
568             if (move)
569                 m_pCurrBlock = p;
570             w_msglen++;
571         }
572 
573         HLOGC(qslog.Debug,
574               log << "CSndBuffer::readData: due to TTL exceeded, SEQ " << first_seq << " - " << last_seq << ", "
575                   << w_msglen << " packets to drop, msgno=" << msgno);
576 
577         // If readData returns -1, then msgno_bitset is understood as a Message ID to drop.
578         // This means that in this case it should be written by the message sequence value only
579         // (not the whole 4-byte bitset written at PH_MSGNO).
580         msgno_bitset = msgno;
581         return -1;
582     }
583 
584     w_packet.m_pcData = p->m_pcData;
585     int readlen       = p->m_iLength;
586     w_packet.setLength(readlen);
587 
588     // XXX Here the value predicted to be applied to PH_MSGNO field is extracted.
589     // As this function is predicted to extract the data to send as a rexmited packet,
590     // the packet must be in the form ready to send - so, in case of encryption,
591     // encrypted, and with all ENC flags already set. So, the first call to send
592     // the packet originally (the other overload of this function) must set these
593     // flags.
594     w_packet.m_iMsgNo = p->m_iMsgNoBitset;
595     w_srctime = getSourceTime(*p);
596 
597     // This function is called when packet retransmission is triggered.
598     // Therefore we are setting the rexmit time.
599     p->m_tsRexmitTime = steady_clock::now();
600 
601     HLOGC(qslog.Debug,
602           log << CONID() << "CSndBuffer: getting packet %" << p->m_iSeqNo << " as per %" << w_packet.m_iSeqNo
603               << " size=" << readlen << " to send [REXMIT]");
604 
605     return readlen;
606 }
607 
getPacketRexmitTime(const int offset)608 srt::sync::steady_clock::time_point CSndBuffer::getPacketRexmitTime(const int offset)
609 {
610     ScopedLock bufferguard(m_BufLock);
611     const Block* p = m_pFirstBlock;
612 
613     // XXX Suboptimal procedure to keep the blocks identifiable
614     // by sequence number. Consider using some circular buffer.
615     for (int i = 0; i < offset; ++i)
616     {
617         SRT_ASSERT(p);
618         p = p->m_pNext;
619     }
620 
621     SRT_ASSERT(p);
622     return p->m_tsRexmitTime;
623 }
624 
ackData(int offset)625 void CSndBuffer::ackData(int offset)
626 {
627     ScopedLock bufferguard(m_BufLock);
628 
629     bool move = false;
630     for (int i = 0; i < offset; ++i)
631     {
632         m_iBytesCount -= m_pFirstBlock->m_iLength;
633         if (m_pFirstBlock == m_pCurrBlock)
634             move = true;
635         m_pFirstBlock = m_pFirstBlock->m_pNext;
636     }
637     if (move)
638         m_pCurrBlock = m_pFirstBlock;
639 
640     m_iCount -= offset;
641 
642     updAvgBufSize(steady_clock::now());
643 }
644 
getCurrBufSize() const645 int CSndBuffer::getCurrBufSize() const
646 {
647     return m_iCount;
648 }
649 
getAvgBufSize(int & w_bytes,int & w_tsp)650 int CSndBuffer::getAvgBufSize(int& w_bytes, int& w_tsp)
651 {
652     ScopedLock bufferguard(m_BufLock); /* Consistency of pkts vs. bytes vs. spantime */
653 
654     /* update stats in case there was no add/ack activity lately */
655     updAvgBufSize(steady_clock::now());
656 
657     // Average number of packets and timespan could be small,
658     // so rounding is beneficial, while for the number of
659     // bytes in the buffer is a higher value, so rounding can be omitted,
660     // but probably better to round all three values.
661     w_bytes = round_val(m_mavg.bytes());
662     w_tsp   = round_val(m_mavg.timespan_ms());
663     return round_val(m_mavg.pkts());
664 }
665 
updAvgBufSize(const steady_clock::time_point & now)666 void CSndBuffer::updAvgBufSize(const steady_clock::time_point& now)
667 {
668     if (!m_mavg.isTimeToUpdate(now))
669         return;
670 
671     int       bytes       = 0;
672     int       timespan_ms = 0;
673     const int pkts        = getCurrBufSize((bytes), (timespan_ms));
674     m_mavg.update(now, pkts, bytes, timespan_ms);
675 }
676 
getCurrBufSize(int & w_bytes,int & w_timespan)677 int CSndBuffer::getCurrBufSize(int& w_bytes, int& w_timespan)
678 {
679     w_bytes = m_iBytesCount;
680     /*
681      * Timespan can be less then 1000 us (1 ms) if few packets.
682      * Also, if there is only one pkt in buffer, the time difference will be 0.
683      * Therefore, always add 1 ms if not empty.
684      */
685     w_timespan = 0 < m_iCount ? count_milliseconds(m_tsLastOriginTime - m_pFirstBlock->m_tsOriginTime) + 1 : 0;
686 
687     return m_iCount;
688 }
689 
dropLateData(int & w_bytes,int32_t & w_first_msgno,const steady_clock::time_point & too_late_time)690 int CSndBuffer::dropLateData(int& w_bytes, int32_t& w_first_msgno, const steady_clock::time_point& too_late_time)
691 {
692     int     dpkts  = 0;
693     int     dbytes = 0;
694     bool    move   = false;
695     int32_t msgno  = 0;
696 
697     ScopedLock bufferguard(m_BufLock);
698     for (int i = 0; i < m_iCount && m_pFirstBlock->m_tsOriginTime < too_late_time; ++i)
699     {
700         dpkts++;
701         dbytes += m_pFirstBlock->m_iLength;
702         msgno = m_pFirstBlock->getMsgSeq();
703 
704         if (m_pFirstBlock == m_pCurrBlock)
705             move = true;
706         m_pFirstBlock = m_pFirstBlock->m_pNext;
707     }
708 
709     if (move)
710     {
711         m_pCurrBlock = m_pFirstBlock;
712     }
713     m_iCount -= dpkts;
714 
715     m_iBytesCount -= dbytes;
716     w_bytes = dbytes;
717 
718     // We report the increased number towards the last ever seen
719     // by the loop, as this last one is the last received. So remained
720     // (even if "should remain") is the first after the last removed one.
721     w_first_msgno = ++MsgNo(msgno);
722 
723     updAvgBufSize(steady_clock::now());
724 
725     return (dpkts);
726 }
727 
increase()728 void CSndBuffer::increase()
729 {
730     int unitsize = m_pBuffer->m_iSize;
731 
732     // new physical buffer
733     Buffer* nbuf = NULL;
734     try
735     {
736         nbuf           = new Buffer;
737         nbuf->m_pcData = new char[unitsize * m_iMSS];
738     }
739     catch (...)
740     {
741         delete nbuf;
742         throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
743     }
744     nbuf->m_iSize = unitsize;
745     nbuf->m_pNext = NULL;
746 
747     // insert the buffer at the end of the buffer list
748     Buffer* p = m_pBuffer;
749     while (p->m_pNext != NULL)
750         p = p->m_pNext;
751     p->m_pNext = nbuf;
752 
753     // new packet blocks
754     Block* nblk = NULL;
755     try
756     {
757         nblk = new Block;
758     }
759     catch (...)
760     {
761         delete nblk;
762         throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
763     }
764     Block* pb = nblk;
765     for (int i = 1; i < unitsize; ++i)
766     {
767         pb->m_pNext = new Block;
768         pb          = pb->m_pNext;
769     }
770 
771     // insert the new blocks onto the existing one
772     pb->m_pNext           = m_pLastBlock->m_pNext;
773     m_pLastBlock->m_pNext = nblk;
774 
775     pb       = nblk;
776     char* pc = nbuf->m_pcData;
777     for (int i = 0; i < unitsize; ++i)
778     {
779         pb->m_pcData = pc;
780         pb           = pb->m_pNext;
781         pc += m_iMSS;
782     }
783 
784     m_iSize += unitsize;
785 
786     HLOGC(bslog.Debug,
787           log << "CSndBuffer: BUFFER FULL - adding " << (unitsize * m_iMSS) << " bytes spread to " << unitsize
788               << " blocks"
789               << " (total size: " << m_iSize << " bytes)");
790 }
791 
792 ////////////////////////////////////////////////////////////////////////////////
793 
794 /*
795  *   RcvBuffer (circular buffer):
796  *
797  *   |<------------------- m_iSize ----------------------------->|
798  *   |       |<--- acked pkts -->|<--- m_iMaxPos --->|           |
799  *   |       |                   |                   |           |
800  *   +---+---+---+---+---+---+---+---+---+---+---+---+---+   +---+
801  *   | 0 | 0 | 1 | 1 | 1 | 0 | 1 | 1 | 1 | 1 | 0 | 1 | 0 |...| 0 | m_pUnit[]
802  *   +---+---+---+---+---+---+---+---+---+---+---+---+---+   +---+
803  *             |                 | |               |
804  *             |                   |               \__last pkt received
805  *             |                   \___ m_iLastAckPos: last ack sent
806  *             \___ m_iStartPos: first message to read
807  *
808  *   m_pUnit[i]->m_iFlag: 0:free, 1:good, 2:passack, 3:dropped
809  *
810  *   thread safety:
811  *    m_iStartPos:   CUDT::m_RecvLock
812  *    m_iLastAckPos: CUDT::m_AckLock
813  *    m_iMaxPos:     none? (modified on add and ack
814  */
815 
816 // XXX Init values moved to in-class.
817 // const uint32_t CRcvBuffer::TSBPD_WRAP_PERIOD = (30*1000000);    //30 seconds (in usec)
818 // const int CRcvBuffer::TSBPD_DRIFT_MAX_VALUE   = 5000;  // usec
819 // const int CRcvBuffer::TSBPD_DRIFT_MAX_SAMPLES = 1000;  // ACK-ACK packets
820 #ifdef SRT_DEBUG_TSBPD_DRIFT
821 // const int CRcvBuffer::TSBPD_DRIFT_PRT_SAMPLES = 200;   // ACK-ACK packets
822 #endif
823 
CRcvBuffer(CUnitQueue * queue,int bufsize_pkts)824 CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize_pkts)
825     : m_pUnit(NULL)
826     , m_iSize(bufsize_pkts)
827     , m_pUnitQueue(queue)
828     , m_iStartPos(0)
829     , m_iLastAckPos(0)
830     , m_iMaxPos(0)
831     , m_iNotch(0)
832     , m_BytesCountLock()
833     , m_iBytesCount(0)
834     , m_iAckedPktsCount(0)
835     , m_iAckedBytesCount(0)
836     , m_uAvgPayloadSz(7 * 188)
837 {
838     m_pUnit = new CUnit*[m_iSize];
839     for (int i = 0; i < m_iSize; ++i)
840         m_pUnit[i] = NULL;
841 
842 #ifdef SRT_DEBUG_TSBPD_DRIFT
843     memset(m_TsbPdDriftHisto100us, 0, sizeof(m_TsbPdDriftHisto100us));
844     memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms));
845 #endif
846 
847     setupMutex(m_BytesCountLock, "BytesCount");
848 }
849 
~CRcvBuffer()850 CRcvBuffer::~CRcvBuffer()
851 {
852     for (int i = 0; i < m_iSize; ++i)
853     {
854         if (m_pUnit[i] != NULL)
855         {
856             m_pUnitQueue->makeUnitFree(m_pUnit[i]);
857         }
858     }
859 
860     delete[] m_pUnit;
861 
862     releaseMutex(m_BytesCountLock);
863 }
864 
countBytes(int pkts,int bytes,bool acked)865 void CRcvBuffer::countBytes(int pkts, int bytes, bool acked)
866 {
867     /*
868      * Byte counter changes from both sides (Recv & Ack) of the buffer
869      * so the higher level lock is not enough for thread safe op.
870      *
871      * pkts are...
872      *  added (bytes>0, acked=false),
873      *  acked (bytes>0, acked=true),
874      *  removed (bytes<0, acked=n/a)
875      */
876     ScopedLock cg(m_BytesCountLock);
877 
878     if (!acked) // adding new pkt in RcvBuffer
879     {
880         m_iBytesCount += bytes; /* added or removed bytes from rcv buffer */
881         if (bytes > 0)          /* Assuming one pkt when adding bytes */
882             m_uAvgPayloadSz = ((m_uAvgPayloadSz * (100 - 1)) + bytes) / 100;
883     }
884     else // acking/removing pkts to/from buffer
885     {
886         m_iAckedPktsCount += pkts;   /* acked or removed pkts from rcv buffer */
887         m_iAckedBytesCount += bytes; /* acked or removed bytes from rcv buffer */
888 
889         if (bytes < 0)
890             m_iBytesCount += bytes; /* removed bytes from rcv buffer */
891     }
892 }
893 
addData(CUnit * unit,int offset)894 int CRcvBuffer::addData(CUnit* unit, int offset)
895 {
896     SRT_ASSERT(unit != NULL);
897     if (offset >= getAvailBufSize())
898         return -1;
899 
900     const int pos = (m_iLastAckPos + offset) % m_iSize;
901     if (offset >= m_iMaxPos)
902         m_iMaxPos = offset + 1;
903 
904     if (m_pUnit[pos] != NULL)
905     {
906         HLOGC(qrlog.Debug, log << "addData: unit %" << unit->m_Packet.m_iSeqNo << " rejected, already exists");
907         return -1;
908     }
909     m_pUnit[pos] = unit;
910     countBytes(1, (int)unit->m_Packet.getLength());
911 
912     m_pUnitQueue->makeUnitGood(unit);
913 
914     HLOGC(qrlog.Debug,
915           log << "addData: unit %" << unit->m_Packet.m_iSeqNo << " accepted, off=" << offset << " POS=" << pos);
916     return 0;
917 }
918 
readBuffer(char * data,int len)919 int CRcvBuffer::readBuffer(char* data, int len)
920 {
921     int p       = m_iStartPos;
922     int lastack = m_iLastAckPos;
923     int rs      = len;
924     IF_HEAVY_LOGGING(char* begin = data);
925 
926     const bool bTsbPdEnabled = m_tsbpd.isEnabled();
927     const steady_clock::time_point now = (bTsbPdEnabled ? steady_clock::now() : steady_clock::time_point());
928 
929     HLOGC(brlog.Debug, log << CONID() << "readBuffer: start=" << p << " lastack=" << lastack);
930     while ((p != lastack) && (rs > 0))
931     {
932         if (m_pUnit[p] == NULL)
933         {
934             LOGC(brlog.Error, log << CONID() << " IPE readBuffer on null packet pointer");
935             return -1;
936         }
937 
938         const srt::CPacket& pkt = m_pUnit[p]->m_Packet;
939 
940         if (bTsbPdEnabled)
941         {
942             HLOGC(brlog.Debug,
943                   log << CONID() << "readBuffer: chk if time2play:"
944                       << " NOW=" << FormatTime(now)
945                       << " PKT TS=" << FormatTime(getPktTsbPdTime(pkt.getMsgTimeStamp())));
946 
947             if ((getPktTsbPdTime(pkt.getMsgTimeStamp()) > now))
948                 break; /* too early for this unit, return whatever was copied */
949         }
950 
951         const int pktlen = (int) pkt.getLength();
952         const int remain_pktlen = pktlen - m_iNotch;
953 
954         const int unitsize = std::min(remain_pktlen, rs);
955 
956         HLOGC(brlog.Debug,
957               log << CONID() << "readBuffer: copying buffer #" << p << " targetpos=" << int(data - begin)
958                   << " sourcepos=" << m_iNotch << " size=" << unitsize << " left=" << (unitsize - rs));
959         memcpy((data), pkt.m_pcData + m_iNotch, unitsize);
960 
961         data += unitsize;
962 
963         if (rs >= remain_pktlen)
964         {
965             freeUnitAt(p);
966             p = shiftFwd(p);
967 
968             m_iNotch = 0;
969         }
970         else
971             m_iNotch += rs;
972 
973         rs -= unitsize;
974     }
975 
976     /* we removed acked bytes form receive buffer */
977     countBytes(-1, -(len - rs), true);
978     m_iStartPos = p;
979 
980     return len - rs;
981 }
982 
readBufferToFile(fstream & ofs,int len)983 int CRcvBuffer::readBufferToFile(fstream& ofs, int len)
984 {
985     int p       = m_iStartPos;
986     int lastack = m_iLastAckPos;
987     int rs      = len;
988 
989     int32_t trace_seq SRT_ATR_UNUSED = SRT_SEQNO_NONE;
990     int trace_shift SRT_ATR_UNUSED = -1;
991 
992     while ((p != lastack) && (rs > 0))
993     {
994 #if ENABLE_LOGGING
995         ++trace_shift;
996 #endif
997         // Skip empty units. Note that this shouldn't happen
998         // in case of a file transfer.
999         if (!m_pUnit[p])
1000         {
1001             p = shiftFwd(p);
1002             LOGC(brlog.Error, log << "readBufferToFile: IPE: NULL unit found in file transmission, last good %"
1003                     << trace_seq << " + " << trace_shift);
1004             continue;
1005         }
1006 
1007         const srt::CPacket& pkt = m_pUnit[p]->m_Packet;
1008 
1009 #if ENABLE_LOGGING
1010         trace_seq = pkt.getSeqNo();
1011 #endif
1012         const int pktlen = (int) pkt.getLength();
1013         const int remain_pktlen = pktlen - m_iNotch;
1014 
1015         const int unitsize = std::min(remain_pktlen, rs);
1016 
1017         ofs.write(pkt.m_pcData + m_iNotch, unitsize);
1018         if (ofs.fail())
1019             break;
1020 
1021         if (rs >= remain_pktlen)
1022         {
1023             freeUnitAt(p);
1024             p = shiftFwd(p);
1025 
1026             m_iNotch = 0;
1027         }
1028         else
1029             m_iNotch += rs;
1030 
1031         rs -= unitsize;
1032     }
1033 
1034     /* we removed acked bytes form receive buffer */
1035     countBytes(-1, -(len - rs), true);
1036     m_iStartPos = p;
1037 
1038     return len - rs;
1039 }
1040 
ackData(int len)1041 int CRcvBuffer::ackData(int len)
1042 {
1043     SRT_ASSERT(len < m_iSize);
1044     SRT_ASSERT(len > 0);
1045     int end = shift(m_iLastAckPos, len);
1046 
1047     {
1048         int pkts  = 0;
1049         int bytes = 0;
1050         for (int i = m_iLastAckPos; i != end; i = shiftFwd(i))
1051         {
1052             if (m_pUnit[i] == NULL)
1053                 continue;
1054 
1055             pkts++;
1056             bytes += (int)m_pUnit[i]->m_Packet.getLength();
1057         }
1058         if (pkts > 0)
1059             countBytes(pkts, bytes, true);
1060     }
1061 
1062     HLOGC(brlog.Debug,
1063           log << "ackData: shift by " << len << ", start=" << m_iStartPos << " end=" << m_iLastAckPos << " -> " << end);
1064 
1065     m_iLastAckPos = end;
1066     m_iMaxPos -= len;
1067     if (m_iMaxPos < 0)
1068         m_iMaxPos = 0;
1069 
1070     // Returned value is the distance towards the starting
1071     // position from m_iLastAckPos, which is in sync with CUDT::m_iRcvLastSkipAck.
1072     // This should help determine the sequence number at first read-ready position.
1073 
1074     const int dist = m_iLastAckPos - m_iStartPos;
1075     if (dist < 0)
1076         return dist + m_iSize;
1077     return dist;
1078 }
1079 
skipData(int len)1080 void CRcvBuffer::skipData(int len)
1081 {
1082     /*
1083      * Caller need protect both AckLock and RecvLock
1084      * to move both m_iStartPos and m_iLastAckPost
1085      */
1086     if (m_iStartPos == m_iLastAckPos)
1087         m_iStartPos = (m_iStartPos + len) % m_iSize;
1088     m_iLastAckPos = (m_iLastAckPos + len) % m_iSize;
1089     m_iMaxPos -= len;
1090     if (m_iMaxPos < 0)
1091         m_iMaxPos = 0;
1092 }
1093 
dropData(int len)1094 size_t CRcvBuffer::dropData(int len)
1095 {
1096     // This function does the same as skipData, although skipData
1097     // should work in the condition of absence of data, so no need
1098     // to force the units in the range to be freed. This function
1099     // works in more general condition where we don't know if there
1100     // are any data in the given range, but want to remove these
1101     // "sequence positions" from the buffer, whether there are data
1102     // at them or not.
1103 
1104     size_t stats_bytes = 0;
1105 
1106     int p      = m_iStartPos;
1107     int past_q = shift(p, len);
1108     while (p != past_q)
1109     {
1110         if (m_pUnit[p] && m_pUnit[p]->m_iFlag == CUnit::GOOD)
1111         {
1112             stats_bytes += m_pUnit[p]->m_Packet.getLength();
1113             freeUnitAt(p);
1114         }
1115 
1116         p = shiftFwd(p);
1117     }
1118 
1119     m_iStartPos = past_q;
1120     return stats_bytes;
1121 }
1122 
getRcvFirstMsg(steady_clock::time_point & w_tsbpdtime,bool & w_passack,int32_t & w_skipseqno,int32_t & w_curpktseq,int32_t base_seq)1123 bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
1124                                 bool&                     w_passack,
1125                                 int32_t&                  w_skipseqno,
1126                                 int32_t&                  w_curpktseq,
1127                                 int32_t                   base_seq)
1128 {
1129     HLOGC(brlog.Debug, log << "getRcvFirstMsg: base_seq=" << base_seq);
1130     w_skipseqno = SRT_SEQNO_NONE;
1131     w_passack   = false;
1132     // tsbpdtime will be retrieved by the below call
1133     // Returned values:
1134     // - tsbpdtime: real time when the packet is ready to play (whether ready to play or not)
1135     // - w_passack: false (the report concerns a packet with an exactly next sequence)
1136     // - w_skipseqno == SRT_SEQNO_NONE: no packets to skip towards the first RTP
1137     // - w_curpktseq: that exactly packet that is reported (for debugging purposes)
1138     // - @return: whether the reported packet is ready to play
1139 
1140     /* Check the acknowledged packets */
1141     // getRcvReadyMsg returns true if the time to play for the first message
1142     // that larger than base_seq is in the past.
1143     if (getRcvReadyMsg((w_tsbpdtime), (w_curpktseq), -1, base_seq))
1144     {
1145         HLOGC(brlog.Debug, log << "getRcvFirstMsg: ready CONTIG packet: %" << w_curpktseq);
1146         return true;
1147     }
1148     else if (!is_zero(w_tsbpdtime))
1149     {
1150         HLOGC(brlog.Debug, log << "getRcvFirstMsg: packets found, but in future");
1151         // This means that a message next to be played, has been found,
1152         // but the time to play is in future.
1153         return false;
1154     }
1155 
1156     // Falling here means that there are NO PACKETS in the ACK-ed region
1157     // (m_iStartPos - m_iLastAckPos), but we may have something in the
1158     // region (m_iLastAckPos - (m_iLastAckPos+m_iMaxPos)), that is, packets
1159     // that may be separated from the last ACK-ed by lost ones.
1160 
1161     // Below this line we have only two options:
1162     // - m_iMaxPos == 0, which means that no more packets are in the buffer
1163     //    - returned: tsbpdtime=0, w_passack=true, w_skipseqno=SRT_SEQNO_NONE, w_curpktseq=<unchanged>, @return false
1164     // - m_iMaxPos > 0, which means that there are packets arrived after a lost packet:
1165     //    - returned: tsbpdtime=PKT.TS, w_passack=true, w_skipseqno=PKT.SEQ, w_curpktseq=PKT, @return LOCAL(PKT.TS) <=
1166     //    NOW
1167 
1168     /*
1169      * No acked packets ready but caller want to know next packet to wait for
1170      * Check the not yet acked packets that may be stuck by missing packet(s).
1171      */
1172     bool                     haslost        = false;
1173     int                      last_ready_pos = -1;
1174     steady_clock::time_point tsbpdtime      = steady_clock::time_point();
1175     w_tsbpdtime                             = steady_clock::time_point();
1176     w_passack                               = true;
1177 
1178     // XXX SUSPECTED ISSUE with this algorithm:
1179     // The above call to getRcvReadyMsg() should report as to whether:
1180     // - there is an EXACTLY NEXT SEQUENCE packet
1181     // - this packet is ready to play.
1182     //
1183     // Situations handled after the call are when:
1184     // - there's the next sequence packet available and it is ready to play
1185     // - there are no packets at all, ready to play or not
1186     //
1187     // So, the remaining situation is that THERE ARE PACKETS that follow
1188     // the current sequence, but they are not ready to play. This includes
1189     // packets that have the exactly next sequence and packets that jump
1190     // over a lost packet.
1191     //
1192     // As the getRcvReadyMsg() function walks through the incoming units
1193     // to see if there's anything that satisfies these conditions, it *SHOULD*
1194     // be also capable of checking if the next available packet, if it is
1195     // there, is the next sequence packet or not. Retrieving this exactly
1196     // packet would be most useful, as the test for play-readiness and
1197     // sequentiality can be done on it directly.
1198     //
1199     // When done so, the below loop would be completely unnecessary.
1200 
1201     // Logical description of the below algorithm:
1202     // 1. update w_tsbpdtime and w_curpktseq if found one packet ready to play
1203     //    - keep check the next packet if still smaller than base_seq
1204     // 2. set w_skipseqno if found packets before w_curpktseq lost
1205     // if no packets larger than base_seq ready to play, return the largest RTP
1206     // else return the first one that larger than base_seq and rady to play
1207 
1208     for (int i = m_iLastAckPos, n = shift(m_iLastAckPos, m_iMaxPos); i != n; i = shiftFwd(i))
1209     {
1210         if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD)
1211         {
1212             /* There are packets in the sequence not received yet */
1213             haslost = true;
1214             HLOGC(brlog.Debug, log << "getRcvFirstMsg: empty hole at *" << i);
1215         }
1216         else
1217         {
1218             tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
1219             /* Packet ready to play */
1220             if (tsbpdtime <= steady_clock::now())
1221             {
1222                 // If the last ready-to-play packet exists, free it.
1223                 if (!is_zero(w_tsbpdtime)) {
1224                     HLOGC(brlog.Debug,
1225                           log << "getRcvFirstMsg: found next ready packet, free last %"
1226                               << w_curpktseq << " POS=" << last_ready_pos);
1227                     SRT_ASSERT(w_curpktseq != SRT_SEQNO_NONE);
1228                     freeUnitAt(last_ready_pos);
1229                 }
1230                 w_tsbpdtime    = tsbpdtime;
1231                 w_curpktseq    = m_pUnit[i]->m_Packet.m_iSeqNo;
1232                 last_ready_pos = i;
1233                 if (haslost)
1234                     w_skipseqno = w_curpktseq;
1235 
1236                 if (base_seq != SRT_SEQNO_NONE && CSeqNo::seqcmp(w_curpktseq, base_seq) <= 0)
1237                 {
1238                     HLOGC(brlog.Debug,
1239                           log << "getRcvFirstMsg: found ready packet %" << w_curpktseq
1240                               << " but not larger than base_seq, try next");
1241                     continue;
1242                 }
1243 
1244                 HLOGC(brlog.Debug,
1245                       log << "getRcvFirstMsg: found ready packet, nSKIPPED: "
1246                           << ((i - m_iLastAckPos + m_iSize) % m_iSize));
1247 
1248                 // NOTE: if haslost is not set, it means that this is the VERY FIRST
1249                 // packet, that is, packet currently at pos = m_iLastAckPos. There's no
1250                 // possibility that it is so otherwise because:
1251                 // - if this first good packet is ready to play, THIS HERE RETURNS NOW.
1252                 // ...
1253                 return true;
1254             }
1255 
1256             if (!is_zero(w_tsbpdtime)) {
1257                 return true;
1258             }
1259             HLOGC(brlog.Debug,
1260                   log << "getRcvFirstMsg: found NOT READY packet, nSKIPPED: "
1261                       << ((i - m_iLastAckPos + m_iSize) % m_iSize));
1262             // ... and if this first good packet WASN'T ready to play, THIS HERE RETURNS NOW, TOO,
1263             // just states that there's no ready packet to play.
1264             // ...
1265             return false;
1266         }
1267         // ... and if this first packet WASN'T GOOD, the loop continues, however since now
1268         // the 'haslost' is set, which means that it continues only to find the first valid
1269         // packet after stating that the very first packet isn't valid.
1270     }
1271     if (!is_zero(w_tsbpdtime)) {
1272         return true;
1273     }
1274     HLOGC(brlog.Debug, log << "getRcvFirstMsg: found NO PACKETS");
1275     return false;
1276 }
1277 
debugGetDeliveryTime(int offset)1278 steady_clock::time_point CRcvBuffer::debugGetDeliveryTime(int offset)
1279 {
1280     int i;
1281     if (offset > 0)
1282         i = shift(m_iStartPos, offset);
1283     else
1284         i = m_iStartPos;
1285 
1286     CUnit* u = m_pUnit[i];
1287     if (!u || u->m_iFlag != CUnit::GOOD)
1288         return steady_clock::time_point();
1289 
1290     return getPktTsbPdTime(u->m_Packet.getMsgTimeStamp());
1291 }
1292 
getTopMsgno() const1293 int32_t CRcvBuffer::getTopMsgno() const
1294 {
1295     if (m_iStartPos == m_iLastAckPos)
1296         return SRT_MSGNO_NONE; // No message is waiting
1297 
1298     if (!m_pUnit[m_iStartPos])
1299         return SRT_MSGNO_NONE; // pity
1300 
1301     return m_pUnit[m_iStartPos]->m_Packet.getMsgSeq();
1302 }
1303 
getRcvReadyMsg(steady_clock::time_point & w_tsbpdtime,int32_t & w_curpktseq,int upto,int base_seq)1304 bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto, int base_seq)
1305 {
1306     const bool havelimit = upto != -1;
1307     int        end = -1, past_end = -1;
1308     if (havelimit)
1309     {
1310         int stretch = (m_iSize + m_iStartPos - m_iLastAckPos) % m_iSize;
1311         if (upto > stretch)
1312         {
1313             HLOGC(brlog.Debug, log << "position back " << upto << " exceeds stretch " << stretch);
1314             // Do nothing. This position is already gone.
1315             return false;
1316         }
1317 
1318         end = m_iLastAckPos - upto;
1319         if (end < 0)
1320             end += m_iSize;
1321         past_end = shiftFwd(end); // For in-loop comparison
1322         HLOGC(brlog.Debug, log << "getRcvReadyMsg: will read from position " << end);
1323     }
1324 
1325     // NOTE: position m_iLastAckPos in the buffer represents the sequence number of
1326     // CUDT::m_iRcvLastSkipAck. Therefore 'upto' contains a positive value that should
1327     // be decreased from m_iLastAckPos to get the position in the buffer that represents
1328     // the sequence number up to which we'd like to read.
1329     IF_HEAVY_LOGGING(const char* reason = "NOT RECEIVED");
1330 
1331     for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = shiftFwd(i))
1332     {
1333         // In case when we want to read only up to given sequence number, stop
1334         // the loop if this number was reached. This number must be extracted from
1335         // the buffer and any following must wait here for "better times". Note
1336         // that the unit that points to the requested sequence must remain in
1337         // the buffer, unless there is no valid packet at that position, in which
1338         // case it is allowed to point to the NEXT sequence towards it, however
1339         // if it does, this cell must remain in the buffer for prospective recovery.
1340         if (havelimit && i == past_end)
1341             break;
1342 
1343         bool freeunit = false;
1344 
1345         /* Skip any invalid skipped/dropped packets */
1346         if (m_pUnit[i] == NULL)
1347         {
1348             HLOGC(brlog.Debug,
1349                   log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
1350                       << " SKIPPED - no unit there");
1351             m_iStartPos = shiftFwd(m_iStartPos);
1352             continue;
1353         }
1354 
1355         w_curpktseq = m_pUnit[i]->m_Packet.getSeqNo();
1356 
1357         if (m_pUnit[i]->m_iFlag != CUnit::GOOD)
1358         {
1359             HLOGC(brlog.Debug,
1360                   log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
1361                       << " SKIPPED - unit not good");
1362             freeunit = true;
1363         }
1364         else
1365         {
1366             // This does:
1367             // 1. Get the TSBPD time of the unit. Stop and return false if this unit
1368             //    is not yet ready to play.
1369             // 2. If it's ready to play, check also if it's decrypted. If not, skip it.
1370             // 3. Check also if it's larger than base_seq, if not, skip it.
1371             // 4. If it's ready to play, decrypted and larger than base, stop and return it.
1372             if (!havelimit)
1373             {
1374                 w_tsbpdtime                         = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
1375                 const steady_clock::duration towait = (w_tsbpdtime - steady_clock::now());
1376                 if (towait.count() > 0)
1377                 {
1378                     HLOGC(brlog.Debug,
1379                           log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
1380                               << " pkt %" << w_curpktseq << " NOT ready to play (only in " << count_milliseconds(towait)
1381                               << "ms)");
1382                     return false;
1383                 }
1384 
1385                 if (m_pUnit[i]->m_Packet.getMsgCryptoFlags() != EK_NOENC)
1386                 {
1387                     IF_HEAVY_LOGGING(reason = "DECRYPTION FAILED");
1388                     freeunit = true; /* packet not decrypted */
1389                 }
1390                 else if (base_seq != SRT_SEQNO_NONE && CSeqNo::seqcmp(w_curpktseq, base_seq) <= 0)
1391                 {
1392                     IF_HEAVY_LOGGING(reason = "smaller than base_seq");
1393                     w_tsbpdtime = steady_clock::time_point();
1394                     freeunit = true;
1395                 }
1396                 else
1397                 {
1398                     HLOGC(brlog.Debug,
1399                           log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
1400                               << " pkt %" << w_curpktseq << " ready to play (delayed " << count_milliseconds(towait)
1401                               << "ms)");
1402                     return true;
1403                 }
1404             }
1405             // In this case:
1406             // 1. We don't even look into the packet if this is not the requested sequence.
1407             //    All packets that are earlier than the required sequence will be dropped.
1408             // 2. When found the packet with expected sequence number, and the condition for
1409             //    good unit is passed, we get the timestamp.
1410             // 3. If the packet is not decrypted, we allow it to be removed
1411             // 4. If we reached the required sequence, and the packet is good, KEEP IT in the buffer,
1412             //    and return with the pointer pointing to this very buffer. Only then return true.
1413             else
1414             {
1415                 // We have a limit up to which the reading will be done,
1416                 // no matter if the time has come or not - although retrieve it.
1417                 if (i == end)
1418                 {
1419                     HLOGC(brlog.Debug, log << "CAUGHT required seq position " << i);
1420                     // We have the packet we need. Extract its data.
1421                     w_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
1422 
1423                     // If we have a decryption failure, allow the unit to be released.
1424                     if (m_pUnit[i]->m_Packet.getMsgCryptoFlags() != EK_NOENC)
1425                     {
1426                         IF_HEAVY_LOGGING(reason = "DECRYPTION FAILED");
1427                         freeunit = true; /* packet not decrypted */
1428                     }
1429                     else
1430                     {
1431                         // Stop here and keep the packet in the buffer, so it will be
1432                         // next extracted.
1433                         HLOGC(brlog.Debug,
1434                               log << "getRcvReadyMsg: packet seq=" << w_curpktseq << " ready for extraction");
1435                         return true;
1436                     }
1437                 }
1438                 else
1439                 {
1440                     HLOGC(brlog.Debug, log << "SKIPPING position " << i);
1441                     // Continue the loop and remove the current packet because
1442                     // its sequence number is too old.
1443                     freeunit = true;
1444                 }
1445             }
1446         }
1447 
1448         if (freeunit)
1449         {
1450             HLOGC(brlog.Debug, log << "getRcvReadyMsg: POS=" << i << " FREED: " << reason);
1451             /* removed skipped, dropped, undecryptable bytes from rcv buffer */
1452             const int rmbytes = (int)m_pUnit[i]->m_Packet.getLength();
1453             countBytes(-1, -rmbytes, true);
1454 
1455             freeUnitAt(i);
1456             m_iStartPos = shiftFwd(m_iStartPos);
1457         }
1458     }
1459 
1460     HLOGC(brlog.Debug, log << "getRcvReadyMsg: nothing to deliver: " << reason);
1461     return false;
1462 }
1463 
1464 /*
1465  * Return receivable data status (packet timestamp_us ready to play if TsbPd mode)
1466  * Return playtime (tsbpdtime) of 1st packet in queue, ready to play or not
1467  *
1468  * Return data ready to be received (packet timestamp_us ready to play if TsbPd mode)
1469  * Using getRcvDataSize() to know if there is something to read as it was widely
1470  * used in the code (core.cpp) is expensive in TsbPD mode, hence this simpler function
1471  * that only check if first packet in queue is ready.
1472  */
isRcvDataReady(steady_clock::time_point & w_tsbpdtime,int32_t & w_curpktseq,int32_t seqdistance)1473 bool CRcvBuffer::isRcvDataReady(steady_clock::time_point& w_tsbpdtime, int32_t& w_curpktseq, int32_t seqdistance)
1474 {
1475     w_tsbpdtime = steady_clock::time_point();
1476 
1477     if (m_tsbpd.isEnabled())
1478     {
1479         const srt::CPacket* pkt = getRcvReadyPacket(seqdistance);
1480         if (!pkt)
1481         {
1482             HLOGC(brlog.Debug, log << "isRcvDataReady: packet NOT extracted.");
1483             return false;
1484         }
1485 
1486         /*
1487          * Acknowledged data is available,
1488          * Only say ready if time to deliver.
1489          * Report the timestamp, ready or not.
1490          */
1491         w_curpktseq = pkt->getSeqNo();
1492         w_tsbpdtime = getPktTsbPdTime(pkt->getMsgTimeStamp());
1493 
1494         // If seqdistance was passed, then return true no matter what the
1495         // TSBPD time states.
1496         if (seqdistance != -1 || w_tsbpdtime <= steady_clock::now())
1497         {
1498             HLOGC(brlog.Debug,
1499                   log << "isRcvDataReady: packet extracted seqdistance=" << seqdistance
1500                       << " TsbPdTime=" << FormatTime(w_tsbpdtime));
1501             return true;
1502         }
1503 
1504         HLOGC(brlog.Debug, log << "isRcvDataReady: packet extracted, but NOT READY");
1505         return false;
1506     }
1507 
1508     return isRcvDataAvailable();
1509 }
1510 
1511 // XXX This function may be called only after checking
1512 // if m_bTsbPdMode.
getRcvReadyPacket(int32_t seqdistance)1513 CPacket* CRcvBuffer::getRcvReadyPacket(int32_t seqdistance)
1514 {
1515     // If asked for readiness of a packet at given sequence distance
1516     // (that is, we need to extract the packet with given sequence number),
1517     // only check if this cell is occupied in the buffer, and if so,
1518     // if it's occupied with a "good" unit. That's all. It doesn't
1519     // matter whether it's ready to play.
1520     if (seqdistance != -1)
1521     {
1522         // Note: seqdistance is the value to to go BACKWARDS from m_iLastAckPos,
1523         // which is the position that is in sync with CUDT::m_iRcvLastSkipAck. This
1524         // position is the sequence number of a packet that is NOT received, but it's
1525         // expected to be received as next. So the minimum value of seqdistance is 1.
1526 
1527         // SANITY CHECK
1528         if (seqdistance == 0)
1529         {
1530             LOGC(brlog.Fatal, log << "IPE: trying to extract packet past the last ACK-ed!");
1531             return 0;
1532         }
1533 
1534         if (seqdistance > getRcvDataSize())
1535         {
1536             HLOGC(brlog.Debug,
1537                   log << "getRcvReadyPacket: Sequence offset=" << seqdistance
1538                       << " is in the past (start=" << m_iStartPos << " end=" << m_iLastAckPos << ")");
1539             return 0;
1540         }
1541 
1542         int i = shift(m_iLastAckPos, -seqdistance);
1543         if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD)
1544         {
1545             HLOGC(brlog.Debug, log << "getRcvReadyPacket: FOUND PACKET %" << m_pUnit[i]->m_Packet.getSeqNo());
1546             return &m_pUnit[i]->m_Packet;
1547         }
1548 
1549         HLOGC(brlog.Debug, log << "getRcvReadyPacket: Sequence offset=" << seqdistance << " IS NOT RECEIVED.");
1550         return 0;
1551     }
1552 
1553     IF_HEAVY_LOGGING(int nskipped = 0);
1554     for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = shiftFwd(i))
1555     {
1556         /*
1557          * Skip missing packets that did not arrive in time.
1558          */
1559         if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD)
1560         {
1561             HLOGC(brlog.Debug,
1562                 log << "getRcvReadyPacket: Found next packet seq=%" << m_pUnit[i]->m_Packet.getSeqNo() << " ("
1563                 << nskipped << " empty cells skipped)");
1564             return &m_pUnit[i]->m_Packet;
1565         }
1566         IF_HEAVY_LOGGING(++nskipped);
1567     }
1568 
1569     return 0;
1570 }
1571 
1572 #if ENABLE_HEAVY_LOGGING
1573 // This function is for debug purposes only and it's called only
1574 // from within HLOG* macros.
reportBufferStats() const1575 void CRcvBuffer::reportBufferStats() const
1576 {
1577     int     nmissing = 0;
1578     int32_t low_seq = SRT_SEQNO_NONE, high_seq = SRT_SEQNO_NONE;
1579     int32_t low_ts = 0, high_ts = 0;
1580 
1581     for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = (i + 1) % m_iSize)
1582     {
1583         if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD)
1584         {
1585             low_seq = m_pUnit[i]->m_Packet.m_iSeqNo;
1586             low_ts  = m_pUnit[i]->m_Packet.m_iTimeStamp;
1587             break;
1588         }
1589         ++nmissing;
1590     }
1591 
1592     // Not sure if a packet MUST BE at the last ack pos position, so check, just in case.
1593     int n = m_iLastAckPos;
1594     if (m_pUnit[n] && m_pUnit[n]->m_iFlag == CUnit::GOOD)
1595     {
1596         high_ts  = m_pUnit[n]->m_Packet.m_iTimeStamp;
1597         high_seq = m_pUnit[n]->m_Packet.m_iSeqNo;
1598     }
1599     else
1600     {
1601         // Possibilities are:
1602         // m_iStartPos == m_iLastAckPos, high_ts == low_ts, defined.
1603         // No packet: low_ts == 0, so high_ts == 0, too.
1604         high_ts = low_ts;
1605     }
1606     // The 32-bit timestamps are relative and roll over oftten; what
1607     // we really need is the timestamp difference. The only place where
1608     // we can ask for the time base is the upper time because when trying
1609     // to receive the time base for the lower time we'd break the requirement
1610     // for monotonic clock.
1611 
1612     uint64_t upper_time = high_ts;
1613     uint64_t lower_time = low_ts;
1614 
1615     if (lower_time > upper_time)
1616         upper_time += uint64_t(srt::CPacket::MAX_TIMESTAMP) + 1;
1617 
1618     int32_t timespan = upper_time - lower_time;
1619     int     seqspan  = 0;
1620     if (low_seq != SRT_SEQNO_NONE && high_seq != SRT_SEQNO_NONE)
1621     {
1622         seqspan = CSeqNo::seqoff(low_seq, high_seq);
1623     }
1624 
1625     LOGC(brlog.Debug,
1626          log << "RCV BUF STATS: seqspan=%(" << low_seq << "-" << high_seq << ":" << seqspan << ") missing=" << nmissing
1627              << "pkts");
1628     LOGC(brlog.Debug,
1629          log << "RCV BUF STATS: timespan=" << timespan << "us (lo=" << lower_time << " hi=" << upper_time << ")");
1630 }
1631 
1632 #endif // ENABLE_HEAVY_LOGGING
1633 
isRcvDataReady()1634 bool CRcvBuffer::isRcvDataReady()
1635 {
1636     steady_clock::time_point tsbpdtime;
1637     int32_t                  seq;
1638 
1639     return isRcvDataReady((tsbpdtime), (seq), -1);
1640 }
1641 
getAvailBufSize() const1642 int CRcvBuffer::getAvailBufSize() const
1643 {
1644     // One slot must be empty in order to tell the difference between "empty buffer" and "full buffer"
1645     return m_iSize - getRcvDataSize() - 1;
1646 }
1647 
getRcvDataSize() const1648 int CRcvBuffer::getRcvDataSize() const
1649 {
1650     if (m_iLastAckPos >= m_iStartPos)
1651         return m_iLastAckPos - m_iStartPos;
1652 
1653     return m_iSize + m_iLastAckPos - m_iStartPos;
1654 }
1655 
debugGetSize() const1656 int CRcvBuffer::debugGetSize() const
1657 {
1658     // Does exactly the same as getRcvDataSize, but
1659     // it should be used FOR INFORMATIONAL PURPOSES ONLY.
1660     // The source values might be changed in another thread
1661     // during the calculation, although worst case the
1662     // resulting value may differ to the real buffer size by 1.
1663     int from = m_iStartPos, to = m_iLastAckPos;
1664     int size = to - from;
1665     if (size < 0)
1666         size += m_iSize;
1667 
1668     return size;
1669 }
1670 
1671 /* Return moving average of acked data pkts, bytes, and timespan (ms) of the receive buffer */
getRcvAvgDataSize(int & bytes,int & timespan)1672 int CRcvBuffer::getRcvAvgDataSize(int& bytes, int& timespan)
1673 {
1674     // Average number of packets and timespan could be small,
1675     // so rounding is beneficial, while for the number of
1676     // bytes in the buffer is a higher value, so rounding can be omitted,
1677     // but probably better to round all three values.
1678     timespan = round_val(m_mavg.timespan_ms());
1679     bytes    = round_val(m_mavg.bytes());
1680     return round_val(m_mavg.pkts());
1681 }
1682 
1683 /* Update moving average of acked data pkts, bytes, and timespan (ms) of the receive buffer */
updRcvAvgDataSize(const steady_clock::time_point & now)1684 void CRcvBuffer::updRcvAvgDataSize(const steady_clock::time_point& now)
1685 {
1686     if (!m_mavg.isTimeToUpdate(now))
1687         return;
1688 
1689     int       bytes       = 0;
1690     int       timespan_ms = 0;
1691     const int pkts        = getRcvDataSize(bytes, timespan_ms);
1692     m_mavg.update(now, pkts, bytes, timespan_ms);
1693 }
1694 
1695 /* Return acked data pkts, bytes, and timespan (ms) of the receive buffer */
getRcvDataSize(int & bytes,int & timespan)1696 int CRcvBuffer::getRcvDataSize(int& bytes, int& timespan)
1697 {
1698     timespan = 0;
1699     if (m_tsbpd.isEnabled())
1700     {
1701         // Get a valid startpos.
1702         // Skip invalid entries in the beginning, if any.
1703         int startpos = m_iStartPos;
1704         for (; startpos != m_iLastAckPos; startpos = shiftFwd(startpos))
1705         {
1706             if ((NULL != m_pUnit[startpos]) && (CUnit::GOOD == m_pUnit[startpos]->m_iFlag))
1707                 break;
1708         }
1709 
1710         int endpos = m_iLastAckPos;
1711 
1712         if (m_iLastAckPos != startpos)
1713         {
1714             /*
1715              *     |<--- DataSpan ---->|<- m_iMaxPos ->|
1716              * +---+---+---+---+---+---+---+---+---+---+---+---
1717              * |   | 1 | 1 | 1 | 0 | 0 | 1 | 1 | 0 | 1 |   |     m_pUnits[]
1718              * +---+---+---+---+---+---+---+---+---+---+---+---
1719              *       |                   |
1720              *       \_ m_iStartPos      \_ m_iLastAckPos
1721              *
1722              * m_pUnits[startpos] shall be valid (->m_iFlag==CUnit::GOOD).
1723              * If m_pUnits[m_iLastAckPos-1] is not valid (NULL or ->m_iFlag!=CUnit::GOOD),
1724              * it means m_pUnits[m_iLastAckPos] is valid since a valid unit is needed to skip.
1725              * Favor m_pUnits[m_iLastAckPos] if valid over [m_iLastAckPos-1] to include the whole acked interval.
1726              */
1727             if ((m_iMaxPos <= 0) || (!m_pUnit[m_iLastAckPos]) || (m_pUnit[m_iLastAckPos]->m_iFlag != CUnit::GOOD))
1728             {
1729                 endpos = (m_iLastAckPos == 0 ? m_iSize - 1 : m_iLastAckPos - 1);
1730             }
1731 
1732             if ((NULL != m_pUnit[endpos]) && (NULL != m_pUnit[startpos]))
1733             {
1734                 const steady_clock::time_point startstamp =
1735                     getPktTsbPdTime(m_pUnit[startpos]->m_Packet.getMsgTimeStamp());
1736                 const steady_clock::time_point endstamp = getPktTsbPdTime(m_pUnit[endpos]->m_Packet.getMsgTimeStamp());
1737                 /*
1738                  * There are sampling conditions where spantime is < 0 (big unsigned value).
1739                  * It has been observed after changing the SRT latency from 450 to 200 on the sender.
1740                  *
1741                  * Possible packet order corruption when dropping packet,
1742                  * cause by bad thread protection when adding packet in queue
1743                  * was later discovered and fixed. Security below kept.
1744                  *
1745                  * DateTime                 RecvRate LostRate DropRate AvailBw     RTT   RecvBufs PdDelay
1746                  * 2014-12-08T15:04:25-0500     4712      110        0   96509  33.710        393     450
1747                  * 2014-12-08T15:04:35-0500     4512       95        0  107771  33.493 1496542976     200
1748                  * 2014-12-08T15:04:40-0500     4213      106        3  107352  53.657    9499425     200
1749                  * 2014-12-08T15:04:45-0500     4575      104        0  102194  53.614      59666     200
1750                  * 2014-12-08T15:04:50-0500     4475      124        0  100543  53.526        505     200
1751                  */
1752                 if (endstamp > startstamp)
1753                     timespan = count_milliseconds(endstamp - startstamp);
1754             }
1755             /*
1756              * Timespan can be less then 1000 us (1 ms) if few packets.
1757              * Also, if there is only one pkt in buffer, the time difference will be 0.
1758              * Therefore, always add 1 ms if not empty.
1759              */
1760             if (0 < m_iAckedPktsCount)
1761                 timespan += 1;
1762         }
1763     }
1764     HLOGF(brlog.Debug, "getRcvDataSize: %6d %6d %6d ms\n", m_iAckedPktsCount, m_iAckedBytesCount, timespan);
1765     bytes = m_iAckedBytesCount;
1766     return m_iAckedPktsCount;
1767 }
1768 
getRcvAvgPayloadSize() const1769 unsigned CRcvBuffer::getRcvAvgPayloadSize() const
1770 {
1771     return m_uAvgPayloadSz;
1772 }
1773 
debugGetReadingState() const1774 CRcvBuffer::ReadingState CRcvBuffer::debugGetReadingState() const
1775 {
1776     ReadingState readstate;
1777 
1778     readstate.iNumAcknowledged = 0;
1779     readstate.iNumUnacknowledged = m_iMaxPos;
1780 
1781     if ((NULL != m_pUnit[m_iStartPos]) && (m_pUnit[m_iStartPos]->m_iFlag == CUnit::GOOD))
1782     {
1783         if (m_tsbpd.isEnabled())
1784             readstate.tsStart = m_tsbpd.getPktTsbPdTime(m_pUnit[m_iStartPos]->m_Packet.getMsgTimeStamp());
1785 
1786         readstate.iNumAcknowledged = m_iLastAckPos > m_iStartPos
1787             ? m_iLastAckPos - m_iStartPos
1788             : m_iLastAckPos + (m_iSize - m_iStartPos);
1789     }
1790 
1791     // All further stats are valid if TSBPD is enabled.
1792     if (!m_tsbpd.isEnabled())
1793         return readstate;
1794 
1795     // m_iLastAckPos points to the first unacknowledged packet
1796     const int iLastAckPos = (m_iLastAckPos - 1) % m_iSize;
1797     if (m_iLastAckPos != m_iStartPos && (NULL != m_pUnit[iLastAckPos]) && (m_pUnit[iLastAckPos]->m_iFlag == CUnit::GOOD))
1798     {
1799         readstate.tsLastAck = m_tsbpd.getPktTsbPdTime(m_pUnit[iLastAckPos]->m_Packet.getMsgTimeStamp());
1800     }
1801 
1802     const int iEndPos = (m_iLastAckPos + m_iMaxPos - 1) % m_iSize;
1803     if (m_iMaxPos == 0)
1804     {
1805         readstate.tsEnd = readstate.tsLastAck;
1806     }
1807     else if ((NULL != m_pUnit[iEndPos]) && (m_pUnit[iEndPos]->m_iFlag == CUnit::GOOD))
1808     {
1809         readstate.tsEnd = m_tsbpd.getPktTsbPdTime(m_pUnit[iEndPos]->m_Packet.getMsgTimeStamp());
1810     }
1811 
1812     return readstate;
1813 }
1814 
strFullnessState(const time_point & tsNow) const1815 string CRcvBuffer::strFullnessState(const time_point& tsNow) const
1816 {
1817     const ReadingState bufstate = debugGetReadingState();
1818     stringstream ss;
1819 
1820     ss << "Space avail " << getAvailBufSize() << "/" << m_iSize;
1821     ss << " pkts. Packets ACKed: " << bufstate.iNumAcknowledged;
1822     if (!is_zero(bufstate.tsStart) && !is_zero(bufstate.tsLastAck))
1823     {
1824         ss << " (TSBPD ready in ";
1825         ss << count_milliseconds(bufstate.tsStart - tsNow);
1826         ss << " : ";
1827         ss << count_milliseconds(bufstate.tsLastAck - tsNow);
1828         ss << " ms)";
1829     }
1830 
1831     ss << ", not ACKed: " << bufstate.iNumUnacknowledged;
1832     if (!is_zero(bufstate.tsStart) && !is_zero(bufstate.tsEnd))
1833     {
1834         ss << ", timespan ";
1835         ss << count_milliseconds(bufstate.tsEnd - bufstate.tsStart);
1836         ss << " ms";
1837     }
1838 
1839     ss << ". " SRT_SYNC_CLOCK_STR " drift " << getDrift() / 1000 << " ms.";
1840     return ss.str();
1841 }
1842 
dropMsg(int32_t msgno,bool using_rexmit_flag)1843 void CRcvBuffer::dropMsg(int32_t msgno, bool using_rexmit_flag)
1844 {
1845     for (int i = m_iStartPos, n = shift(m_iLastAckPos, m_iMaxPos); i != n; i = shiftFwd(i))
1846         if ((m_pUnit[i] != NULL) && (m_pUnit[i]->m_Packet.getMsgSeq(using_rexmit_flag) == msgno))
1847             m_pUnit[i]->m_iFlag = CUnit::DROPPED;
1848 }
1849 
applyGroupTime(const steady_clock::time_point & timebase,bool wrp,uint32_t delay,const steady_clock::duration & udrift)1850 void CRcvBuffer::applyGroupTime(const steady_clock::time_point& timebase,
1851                                 bool                            wrp,
1852                                 uint32_t                        delay,
1853                                 const steady_clock::duration&   udrift)
1854 {
1855     m_tsbpd.applyGroupTime(timebase, wrp, delay, udrift);
1856 }
1857 
applyGroupDrift(const steady_clock::time_point & timebase,bool wrp,const steady_clock::duration & udrift)1858 void CRcvBuffer::applyGroupDrift(const steady_clock::time_point& timebase,
1859                                  bool                            wrp,
1860                                  const steady_clock::duration&   udrift)
1861 {
1862     m_tsbpd.applyGroupDrift(timebase, wrp, udrift);
1863 }
1864 
getInternalTimeBase(steady_clock::time_point & w_timebase,bool & w_wrp,steady_clock::duration & w_udrift)1865 void CRcvBuffer::getInternalTimeBase(steady_clock::time_point& w_timebase, bool& w_wrp, steady_clock::duration& w_udrift)
1866 {
1867     return m_tsbpd.getInternalTimeBase(w_timebase, w_wrp, w_udrift);
1868 }
1869 
getPktTsbPdTime(uint32_t usPktTimestamp)1870 steady_clock::time_point CRcvBuffer::getPktTsbPdTime(uint32_t usPktTimestamp)
1871 {
1872     // Updating TSBPD time here is not very accurate and prevents from making the function constant.
1873     // For now preserving the existing behavior.
1874     m_tsbpd.updateTsbPdTimeBase(usPktTimestamp);
1875     return m_tsbpd.getPktTsbPdTime(usPktTimestamp);
1876 }
1877 
setRcvTsbPdMode(const steady_clock::time_point & timebase,const steady_clock::duration & delay)1878 void CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const steady_clock::duration& delay)
1879 {
1880     const bool no_wrap_check = false;
1881     m_tsbpd.setTsbPdMode(timebase, no_wrap_check, delay);
1882 }
1883 
addRcvTsbPdDriftSample(uint32_t timestamp_us,int rtt)1884 bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, int rtt)
1885 {
1886     return m_tsbpd.addDriftSample(timestamp_us, rtt);
1887 }
1888 
readMsg(char * data,int len)1889 int CRcvBuffer::readMsg(char* data, int len)
1890 {
1891     SRT_MSGCTRL dummy = srt_msgctrl_default;
1892     return readMsg(data, len, (dummy), -1);
1893 }
1894 
1895 // NOTE: The order of ref-arguments is odd because:
1896 // - data and len shall be close to one another
1897 // - upto is last because it's a kind of unusual argument that has a default value
readMsg(char * data,int len,SRT_MSGCTRL & w_msgctl,int upto)1898 int CRcvBuffer::readMsg(char* data, int len, SRT_MSGCTRL& w_msgctl, int upto)
1899 {
1900     int  p = -1, q = -1;
1901     bool passack;
1902 
1903     bool empty = accessMsg((p), (q), (passack), (w_msgctl.srctime), upto);
1904     if (empty)
1905         return 0;
1906 
1907     // This should happen just once. By 'empty' condition
1908     // we have a guarantee that m_pUnit[p] exists and is valid.
1909     CPacket& pkt1 = m_pUnit[p]->m_Packet;
1910 
1911     // This returns the sequence number and message number to
1912     // the API caller.
1913     w_msgctl.pktseq = pkt1.getSeqNo();
1914     w_msgctl.msgno  = pkt1.getMsgSeq();
1915 
1916     return extractData((data), len, p, q, passack);
1917 }
1918 
1919 #ifdef SRT_DEBUG_TSBPD_OUTJITTER
debugTraceJitter(time_point playtime)1920 void CRcvBuffer::debugTraceJitter(time_point playtime)
1921 {
1922     uint64_t ms = count_microseconds(steady_clock::now() - playtime);
1923     if (ms / 10 < 10)
1924         m_ulPdHisto[0][ms / 10]++;
1925     else if (ms / 100 < 10)
1926         m_ulPdHisto[1][ms / 100]++;
1927     else if (ms / 1000 < 10)
1928         m_ulPdHisto[2][ms / 1000]++;
1929     else
1930         m_ulPdHisto[3][1]++;
1931 }
1932 #endif /* SRT_DEBUG_TSBPD_OUTJITTER */
1933 
accessMsg(int & w_p,int & w_q,bool & w_passack,int64_t & w_playtime,int upto)1934 bool CRcvBuffer::accessMsg(int& w_p, int& w_q, bool& w_passack, int64_t& w_playtime, int upto)
1935 {
1936     // This function should do the following:
1937     // 1. Find the first packet starting the next message (or just next packet)
1938     // 2. When found something ready for extraction, return true.
1939     // 3. w_p and w_q point the index range for extraction
1940     // 4. passack decides if this range shall be removed after extraction
1941 
1942     bool empty = true;
1943 
1944     if (m_tsbpd.isEnabled())
1945     {
1946         w_passack = false;
1947         int seq   = 0;
1948 
1949         steady_clock::time_point play_time;
1950         const bool               isReady = getRcvReadyMsg(play_time, (seq), upto);
1951         w_playtime                       = count_microseconds(play_time.time_since_epoch());
1952 
1953         if (isReady)
1954         {
1955             empty = false;
1956             // In TSBPD mode you always read one message
1957             // at a time and a message always fits in one UDP packet,
1958             // so in one "unit".
1959             w_p = w_q = m_iStartPos;
1960 
1961             debugTraceJitter(play_time);
1962         }
1963     }
1964     else
1965     {
1966         w_playtime = 0;
1967         if (scanMsg((w_p), (w_q), (w_passack)))
1968             empty = false;
1969     }
1970 
1971     return empty;
1972 }
1973 
extractData(char * data,int len,int p,int q,bool passack)1974 int CRcvBuffer::extractData(char* data, int len, int p, int q, bool passack)
1975 {
1976     SRT_ASSERT(len > 0);
1977     int       rs     = len > 0 ? len : 0;
1978     const int past_q = shiftFwd(q);
1979     while (p != past_q)
1980     {
1981         const int pktlen = (int)m_pUnit[p]->m_Packet.getLength();
1982         // When unitsize is less than pktlen, only a fragment is copied to the output 'data',
1983         // but still the whole packet is removed from the receiver buffer.
1984         if (pktlen > 0)
1985             countBytes(-1, -pktlen, true);
1986 
1987         const int unitsize = ((rs >= 0) && (pktlen > rs)) ? rs : pktlen;
1988 
1989         HLOGC(brlog.Debug, log << "readMsg: checking unit POS=" << p);
1990 
1991         if (unitsize > 0)
1992         {
1993             memcpy((data), m_pUnit[p]->m_Packet.m_pcData, unitsize);
1994             data += unitsize;
1995             rs -= unitsize;
1996             IF_HEAVY_LOGGING(readMsgHeavyLogging(p));
1997         }
1998         else
1999         {
2000             HLOGC(brlog.Debug, log << CONID() << "readMsg: SKIPPED POS=" << p << " - ZERO SIZE UNIT");
2001         }
2002 
2003         // Note special case for live mode (one packet per message and TSBPD=on):
2004         //  - p == q (that is, this loop passes only once)
2005         //  - no passack (the unit is always removed from the buffer)
2006         if (!passack)
2007         {
2008             HLOGC(brlog.Debug, log << CONID() << "readMsg: FREEING UNIT POS=" << p);
2009             freeUnitAt(p);
2010         }
2011         else
2012         {
2013             HLOGC(brlog.Debug, log << CONID() << "readMsg: PASSACK UNIT POS=" << p);
2014             m_pUnit[p]->m_iFlag = CUnit::PASSACK;
2015         }
2016 
2017         p = shiftFwd(p);
2018     }
2019 
2020     if (!passack)
2021         m_iStartPos = past_q;
2022 
2023     HLOGC(brlog.Debug,
2024           log << "rcvBuf/extractData: begin=" << m_iStartPos << " reporting extraction size=" << (len - rs));
2025 
2026     return len - rs;
2027 }
2028 
debugTimeState(size_t first_n_pkts) const2029 string CRcvBuffer::debugTimeState(size_t first_n_pkts) const
2030 {
2031     stringstream ss;
2032     int          ipos = m_iStartPos;
2033     for (size_t i = 0; i < first_n_pkts; ++i, ipos = CSeqNo::incseq(ipos))
2034     {
2035         const CUnit* unit = m_pUnit[ipos];
2036         if (!unit)
2037         {
2038             ss << "pkt[" << i << "] missing, ";
2039             continue;
2040         }
2041 
2042         const CPacket& pkt = unit->m_Packet;
2043         ss << "pkt[" << i << "] ts=" << pkt.getMsgTimeStamp() << ", ";
2044     }
2045     return ss.str();
2046 }
2047 
2048 #if ENABLE_HEAVY_LOGGING
readMsgHeavyLogging(int p)2049 void CRcvBuffer::readMsgHeavyLogging(int p)
2050 {
2051     static steady_clock::time_point prev_now;
2052     static steady_clock::time_point prev_srctime;
2053     const CPacket&                  pkt = m_pUnit[p]->m_Packet;
2054 
2055     const int32_t seq = pkt.m_iSeqNo;
2056 
2057     steady_clock::time_point nowtime = steady_clock::now();
2058     steady_clock::time_point srctime = getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp());
2059 
2060     const int64_t timediff_ms    = count_milliseconds(nowtime - srctime);
2061     const int64_t nowdiff_ms     = is_zero(prev_now) ? count_milliseconds(nowtime - prev_now) : 0;
2062     const int64_t srctimediff_ms = is_zero(prev_srctime) ? count_milliseconds(srctime - prev_srctime) : 0;
2063 
2064     const int next_p = shiftFwd(p);
2065     CUnit*    u      = m_pUnit[next_p];
2066     string    next_playtime;
2067     if (u && u->m_iFlag == CUnit::GOOD)
2068     {
2069         next_playtime = FormatTime(getPktTsbPdTime(u->m_Packet.getMsgTimeStamp()));
2070     }
2071     else
2072     {
2073         next_playtime = "NONE";
2074     }
2075 
2076     LOGC(brlog.Debug,
2077          log << CONID() << "readMsg: DELIVERED seq=" << seq << " T=" << FormatTime(srctime) << " in " << timediff_ms
2078              << "ms - TIME-PREVIOUS: PKT: " << srctimediff_ms << " LOCAL: " << nowdiff_ms << " !"
2079              << BufferStamp(pkt.data(), pkt.size()) << " NEXT pkt T=" << next_playtime);
2080 
2081     prev_now     = nowtime;
2082     prev_srctime = srctime;
2083 }
2084 #endif
2085 
scanMsg(int & w_p,int & w_q,bool & w_passack)2086 bool CRcvBuffer::scanMsg(int& w_p, int& w_q, bool& w_passack)
2087 {
2088     // empty buffer
2089     if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0))
2090     {
2091         HLOGC(brlog.Debug, log << "scanMsg: empty buffer");
2092         return false;
2093     }
2094 
2095     int rmpkts  = 0;
2096     int rmbytes = 0;
2097     // skip all bad msgs at the beginning
2098     // This loop rolls until the "buffer is empty" (head == tail),
2099     // in particular, there's no unit accessible for the reader.
2100     while (m_iStartPos != m_iLastAckPos)
2101     {
2102         // Roll up to the first valid unit
2103         if (!m_pUnit[m_iStartPos])
2104         {
2105             if (++m_iStartPos == m_iSize)
2106                 m_iStartPos = 0;
2107             continue;
2108         }
2109 
2110         // Note: PB_FIRST | PB_LAST == PB_SOLO.
2111         // testing if boundary() & PB_FIRST tests if the msg is first OR solo.
2112         if (m_pUnit[m_iStartPos]->m_iFlag == CUnit::GOOD && m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() & PB_FIRST)
2113         {
2114             bool good = true;
2115 
2116             // look ahead for the whole message
2117 
2118             // We expect to see either of:
2119             // [PB_FIRST] [PB_SUBSEQUENT] [PB_SUBSEQUENT] [PB_LAST]
2120             // [PB_SOLO]
2121             // but not:
2122             // [PB_FIRST] NULL ...
2123             // [PB_FIRST] FREE/PASSACK/DROPPED...
2124             // If the message didn't look as expected, interrupt this.
2125 
2126             // This begins with a message starting at m_iStartPos
2127             // up to m_iLastAckPos OR until the PB_LAST message is found.
2128             // If any of the units on this way isn't good, this OUTER loop
2129             // will be interrupted.
2130             for (int i = m_iStartPos; i != m_iLastAckPos;)
2131             {
2132                 if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD)
2133                 {
2134                     good = false;
2135                     break;
2136                 }
2137 
2138                 // Likewise, boundary() & PB_LAST will be satisfied for last OR solo.
2139                 if (m_pUnit[i]->m_Packet.getMsgBoundary() & PB_LAST)
2140                     break;
2141 
2142                 if (++i == m_iSize)
2143                     i = 0;
2144             }
2145 
2146             if (good)
2147                 break;
2148         }
2149 
2150         rmpkts++;
2151         rmbytes += (int) freeUnitAt((size_t) m_iStartPos);
2152 
2153         m_iStartPos = shiftFwd(m_iStartPos);
2154     }
2155     /* we removed bytes form receive buffer */
2156     countBytes(-rmpkts, -rmbytes, true);
2157 
2158     // Not sure if this is correct, but this above 'while' loop exits
2159     // under the following conditions only:
2160     // - m_iStartPos == m_iLastAckPos (that makes passack = true)
2161     // - found at least GOOD unit with PB_FIRST and not all messages up to PB_LAST are good,
2162     //   in which case it returns with m_iStartPos <% m_iLastAckPos (earlier)
2163     // Also all units that lied before m_iStartPos are removed.
2164 
2165     w_p        = -1;          // message head
2166     w_q        = m_iStartPos; // message tail
2167     w_passack  = m_iStartPos == m_iLastAckPos;
2168     bool found = false;
2169 
2170     // looking for the first message
2171     //>>m_pUnit[size + m_iMaxPos] is not valid
2172 
2173     // XXX Would be nice to make some very thorough refactoring here.
2174 
2175     // This rolls by q variable from m_iStartPos up to m_iLastAckPos,
2176     // actually from the first message up to the one with PB_LAST
2177     // or PB_SOLO boundary.
2178 
2179     // The 'i' variable used in this loop is just a stub and it's
2180     // even hard to define the unit here. It is "shift towards
2181     // m_iStartPos", so the upper value is m_iMaxPos + size.
2182     // m_iMaxPos is itself relative to m_iLastAckPos, so
2183     // the upper value is m_iMaxPos + difference between
2184     // m_iLastAckPos and m_iStartPos, so that this value is relative
2185     // to m_iStartPos.
2186     //
2187     // The 'i' value isn't used anywhere, although the 'q' value rolls
2188     // in this loop in sync with 'i', with the difference that 'q' is
2189     // wrapped around, and 'i' is just incremented normally.
2190     //
2191     // This makes that this loop rolls in the range by 'q' from
2192     // m_iStartPos to m_iStartPos + UPPER,
2193     // where UPPER = m_iLastAckPos -% m_iStartPos + m_iMaxPos
2194     // This embraces the range from the current reading head up to
2195     // the last packet ever received.
2196     //
2197     // 'passack' is set to true when the 'q' has passed through
2198     // the border of m_iLastAckPos and fallen into the range
2199     // of unacknowledged packets.
2200 
2201     for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i < n; ++i)
2202     {
2203         if (m_pUnit[w_q] && m_pUnit[w_q]->m_iFlag == CUnit::GOOD)
2204         {
2205             // Equivalent pseudocode:
2206             // PacketBoundary bound = m_pUnit[w_q]->m_Packet.getMsgBoundary();
2207             // if ( IsSet(bound, PB_FIRST) )
2208             //     w_p = w_q;
2209             // if ( IsSet(bound, PB_LAST) && w_p != -1 )
2210             //     found = true;
2211             //
2212             // Not implemented this way because it uselessly check w_p for -1
2213             // also after setting it explicitly.
2214 
2215             switch (m_pUnit[w_q]->m_Packet.getMsgBoundary())
2216             {
2217             case PB_SOLO: // 11
2218                 w_p   = w_q;
2219                 found = true;
2220                 break;
2221 
2222             case PB_FIRST: // 10
2223                 w_p = w_q;
2224                 break;
2225 
2226             case PB_LAST: // 01
2227                 if (w_p != -1)
2228                     found = true;
2229                 break;
2230 
2231             case PB_SUBSEQUENT:; // do nothing (caught first, rolling for last)
2232             }
2233         }
2234         else
2235         {
2236             // a hole in this message, not valid, restart search
2237             w_p = -1;
2238         }
2239 
2240         // 'found' is set when the current iteration hit a message with PB_LAST
2241         // (including PB_SOLO since the very first message).
2242         if (found)
2243         {
2244             // the msg has to be ack'ed or it is allowed to read out of order, and was not read before
2245             if (!w_passack || !m_pUnit[w_q]->m_Packet.getMsgOrderFlag())
2246             {
2247                 HLOGC(brlog.Debug, log << "scanMsg: found next-to-broken message, delivering OUT OF ORDER.");
2248                 break;
2249             }
2250 
2251             found = false;
2252         }
2253 
2254         if (++w_q == m_iSize)
2255             w_q = 0;
2256 
2257         if (w_q == m_iLastAckPos)
2258             w_passack = true;
2259     }
2260 
2261     // no msg found
2262     if (!found)
2263     {
2264         // NOTE:
2265         // This situation may only happen if:
2266         // - Found a packet with PB_FIRST, so w_p = w_q at the moment when it was found
2267         // - Possibly found following components of that message up to shifted w_q
2268         // - Found no terminal packet (PB_LAST) for that message.
2269 
2270         // if the message is larger than the receiver buffer, return part of the message
2271         if ((w_p != -1) && (shiftFwd(w_q) == w_p))
2272         {
2273             HLOGC(brlog.Debug, log << "scanMsg: BUFFER FULL and message is INCOMPLETE. Returning PARTIAL MESSAGE.");
2274             found = true;
2275         }
2276         else
2277         {
2278             HLOGC(brlog.Debug, log << "scanMsg: PARTIAL or NO MESSAGE found: p=" << w_p << " q=" << w_q);
2279         }
2280     }
2281     else
2282     {
2283         HLOGC(brlog.Debug,
2284               log << "scanMsg: extracted message p=" << w_p << " q=" << w_q << " ("
2285                   << ((w_q - w_p + m_iSize + 1) % m_iSize) << " packets)");
2286     }
2287 
2288     return found;
2289 }
2290