1 //*****************************************************************
2 /*
3 JackTrip: A System for High-Quality Audio Network Performance
4 over the Internet
5
6 Copyright (c) 2020 Juan-Pablo Caceres, Chris Chafe.
7 SoundWIRE group at CCRMA, Stanford University.
8
9 Permission is hereby granted, free of charge, to any person
10 obtaining a copy of this software and associated documentation
11 files (the "Software"), to deal in the Software without
12 restriction, including without limitation the rights to use,
13 copy, modify, merge, publish, distribute, sublicense, and/or sell
14 copies of the Software, and to permit persons to whom the
15 Software is furnished to do so, subject to the following
16 conditions:
17
18 The above copyright notice and this permission notice shall be
19 included in all copies or substantial portions of the Software.
20
21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
23 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
25 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
26 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
27 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
28 OTHER DEALINGS IN THE SOFTWARE.
29 */
30 //*****************************************************************
31
32 /**
33 * \file JitterBuffer.cpp
34 * \author Anton Runov
35 * \date June 2020
36 */
37
38
39 #include "JitterBuffer.h"
40
41 #include <iostream>
42 #include <cstring>
43 #include <cstdlib>
44 #include <stdexcept>
45 #include <cmath>
46
47 using std::cout; using std::endl;
48
49
50 //*******************************************************************************
JitterBuffer(int buf_samples,int qlen,int sample_rate,int strategy,int bcast_qlen,int channels,int bit_res)51 JitterBuffer::JitterBuffer(int buf_samples, int qlen, int sample_rate, int strategy,
52 int bcast_qlen, int channels, int bit_res) :
53 RingBuffer(0, 0)
54 {
55 int total_size = sample_rate * channels * bit_res * 2; // 2 secs of audio
56 int slot_size = buf_samples * channels * bit_res;
57 mSlotSize = slot_size;
58 mInSlotSize = slot_size;
59 if (0 < qlen) {
60 mMaxLatency = qlen * slot_size;
61 mAutoQueue = 0;
62 }
63 else {
64 // AutoQueue
65 mMaxLatency = 3*slot_size;
66 mAutoQueue = 1;
67 }
68 mTotalSize = total_size;
69 mBroadcastLatency = bcast_qlen * mSlotSize;
70 mNumChannels = channels;
71 mAudioBitRes = bit_res;
72 mMinStepSize = channels * bit_res;
73 mFPP = buf_samples;
74 mSampleRate = sample_rate;
75 mActive = false;
76
77 // Defaults for zero strategy
78 mUnderrunIncTolerance = -10 * mSlotSize;
79 mCorrIncTolerance = 100*mMaxLatency; // should be greater than mUnderrunIncTolerance
80 mOverflowDecTolerance = 100*mMaxLatency;
81 mWritePosition = mMaxLatency;
82 mStatUnit = mSlotSize;
83 mLevelDownRate = std::min(256, mFPP) / (5.0*sample_rate) * mSlotSize;
84 mOverflowDropStep = mMaxLatency / 2;
85 mLevelCur = mMaxLatency;
86 mLevel = mLevelCur;
87 mMinLevelThreshold = 1.9 * mSlotSize;
88 mBroadcastPosition = 0;
89 mBroadcastPositionCorr = 0.0;
90 mLastCorrCounter = 0;
91 mLastCorrDirection = 0;
92
93 switch (strategy) {
94 case 1:
95 mOverflowDropStep = mSlotSize;
96 break;
97 case 2:
98 mUnderrunIncTolerance = 1.1 * mSlotSize;
99 mCorrIncTolerance = 1.9 * mSlotSize; // should be greater than mUnderrunIncTolerance
100 mOverflowDecTolerance = 0.1*mSlotSize;
101 mOverflowDropStep = mSlotSize;
102 break;
103 }
104
105 mRingBuffer = new int8_t[mTotalSize];
106 std::memset(mRingBuffer, 0, mTotalSize);
107
108 mAutoQueueCorr = 2*mSlotSize;
109 if (0 > qlen) {
110 mAutoQFactor = 1.0/-qlen;
111 }
112 else {
113 mAutoQFactor = 1.0/500;
114 }
115 mAutoQRate = mSlotSize * 0.5;
116 mAutoQRateMin = mSlotSize * 0.0005;
117 mAutoQRateDecay = 1.0 - std::min(mFPP*1.2e-6, 0.0005);
118 }
119
120 //*******************************************************************************
insertSlotNonBlocking(const int8_t * ptrToSlot,int len,int lostLen)121 bool JitterBuffer::insertSlotNonBlocking(const int8_t* ptrToSlot, int len, int lostLen)
122 {
123 if (0 == len) {
124 len = mSlotSize;
125 }
126 QMutexLocker locker(&mMutex);
127 mInSlotSize = len;
128 if (!mActive) {
129 mActive = true;
130 }
131 if (mMaxLatency < len + mSlotSize) {
132 mMaxLatency = len + mSlotSize;
133 }
134 if (0 < lostLen) {
135 processPacketLoss(lostLen);
136 }
137 mSkewRaw += mReadsNew - len;
138 mReadsNew = 0;
139 mUnderruns += mUnderrunsNew;
140 mUnderrunsNew = 0;
141 mLevel = mSlotSize*std::ceil(mLevelCur/mSlotSize);
142
143 // Update positions if necessary
144 int32_t available = mWritePosition - mReadPosition;
145
146 int delta = 0;
147 if (available < -10*mMaxLatency) {
148 delta = available;
149 mBufIncUnderrun += -delta;
150 mLevelCur = len;
151 //cout << "reset" << endl;
152 }
153 else if (available + len > mMaxLatency) {
154 delta = mOverflowDropStep;
155 mOverflows += delta;
156 mBufDecOverflow += delta;
157 mLevelCur = mMaxLatency;
158 }
159 else if (0 > available &&
160 mLevelCur < std::max(mInSlotSize + mMinLevelThreshold,
161 mMaxLatency - mUnderrunIncTolerance - 2*mSlotSize*lastCorrFactor())) {
162 delta = -std::min(-available, mSlotSize);
163 mBufIncUnderrun += -delta;
164 }
165 else if (mLevelCur < mMaxLatency - mCorrIncTolerance - 6*mSlotSize*lastCorrFactor()) {
166 delta = -mSlotSize;
167 mUnderruns += -delta;
168 mBufIncCompensate += -delta;
169 }
170
171 if (0 != delta) {
172 mReadPosition += delta;
173 mLastCorrCounter = 0;
174 mLastCorrDirection = 0 < delta ? 1 : -1;
175 }
176 else {
177 ++mLastCorrCounter;
178 }
179
180 int wpos = mWritePosition % mTotalSize;
181 int n = std::min(mTotalSize - wpos, len);
182 std::memcpy(mRingBuffer+wpos, ptrToSlot, n);
183 if (n < len) {
184 //cout << "split write: " << len << "-" << n << endl;
185 std::memcpy(mRingBuffer, ptrToSlot+n, len-n);
186 }
187 mWritePosition += len;
188
189 return true;
190 }
191
192 //*******************************************************************************
readSlotNonBlocking(int8_t * ptrToReadSlot)193 void JitterBuffer::readSlotNonBlocking(int8_t* ptrToReadSlot)
194 {
195 int len = mSlotSize;
196 QMutexLocker locker(&mMutex);
197 if (!mActive) {
198 std::memset(ptrToReadSlot, 0, len);
199 return;
200 }
201 mReadsNew += len;
202 int32_t available = mWritePosition - mReadPosition;
203 if (available < mLevelCur) {
204 mLevelCur = std::max((double)available, mLevelCur-mLevelDownRate);
205 }
206 else {
207 mLevelCur = available;
208 }
209
210 // auto queue correction
211 if (0 > available + mAutoQueueCorr - mLevelCur) {
212 mAutoQueueCorr += mAutoQRate;
213 }
214 else if (mInSlotSize + mSlotSize < mAutoQueueCorr) {
215 mAutoQueueCorr -= mAutoQRate * mAutoQFactor;
216 }
217 if (mAutoQRate > mAutoQRateMin) {
218 mAutoQRate *= mAutoQRateDecay;
219 }
220 if (0 != mAutoQueue) {
221 int PPS = mSampleRate / mFPP;
222 if (2*PPS == mAutoQueue++ % (4*PPS)) {
223 double k = 1.0 + 1e-5/mAutoQFactor;
224 if (12*PPS > mAutoQueue ||
225 std::abs(mAutoQueueCorr*k - mMaxLatency + mSlotSize/2) > 0.6*mSlotSize) {
226 mMaxLatency = mSlotSize * std::ceil(mAutoQueueCorr*k/mSlotSize);
227 cout << "AutoQueue: " << mMaxLatency / mSlotSize << endl;
228 }
229 }
230 }
231
232 int read_len = qBound(0, available, len);
233 int rpos = mReadPosition % mTotalSize;
234 int n = std::min(mTotalSize - rpos, read_len);
235 std::memcpy(ptrToReadSlot, mRingBuffer+rpos, n);
236 if (n < read_len) {
237 //cout << "split read: " << read_len << "-" << n << endl;
238 std::memcpy(ptrToReadSlot+n, mRingBuffer, read_len-n);
239 }
240 if (read_len < len) {
241 std::memset(ptrToReadSlot+read_len, 0, len-read_len);
242 mUnderrunsNew += len-read_len;
243 }
244 mReadPosition += len;
245 }
246
247 //*******************************************************************************
readBroadcastSlot(int8_t * ptrToReadSlot)248 void JitterBuffer::readBroadcastSlot(int8_t* ptrToReadSlot)
249 {
250 int len = mSlotSize;
251 QMutexLocker locker(&mMutex);
252 if (mBroadcastLatency + len > mReadPosition) {
253 std::memset(ptrToReadSlot, 0, len);
254 return;
255 }
256 // latency correction
257 int32_t d = mReadPosition - mBroadcastLatency - mBroadcastPosition - len;
258 if (std::abs(d) > mBroadcastLatency / 2) {
259 mBroadcastPosition = mReadPosition - mBroadcastLatency - len;
260 mBroadcastPositionCorr = 0.0;
261 mBroadcastSkew += d / mMinStepSize;
262 }
263 else {
264 mBroadcastPositionCorr += 0.0003 * d;
265 int delta = mBroadcastPositionCorr / mMinStepSize;
266 if (0 != delta) {
267 mBroadcastPositionCorr -= delta * mMinStepSize;
268 if (2 == mAudioBitRes && (int32_t)(mWritePosition - mBroadcastPosition) > len) {
269 // interpolate
270 len += delta * mMinStepSize;
271 }
272 else {
273 // skip
274 mBroadcastPosition += delta * mMinStepSize;
275 }
276 mBroadcastSkew += delta;
277 }
278 }
279 mBroadcastDelta = d / mMinStepSize;
280 int32_t available = mWritePosition - mBroadcastPosition;
281 int read_len = qBound(0, available, len);
282 if (len == mSlotSize) {
283 int rpos = mBroadcastPosition % mTotalSize;
284 int n = std::min(mTotalSize - rpos, read_len);
285 std::memcpy(ptrToReadSlot, mRingBuffer+rpos, n);
286 if (n < read_len) {
287 //cout << "split read: " << read_len << "-" << n << endl;
288 std::memcpy(ptrToReadSlot+n, mRingBuffer, read_len-n);
289 }
290 if (read_len < len) {
291 std::memset(ptrToReadSlot+read_len, 0, len-read_len);
292 }
293 }
294 else {
295 // interpolation len => mSlotSize
296 double K = 1.0 * len / mSlotSize;
297 for (int c=0; c < mMinStepSize; c+=sizeof(int16_t)) {
298 for (int j=0; j < mSlotSize/mMinStepSize; ++j) {
299 int j1 = std::floor(j*K);
300 double a = j*K - j1;
301 int rpos = (mBroadcastPosition + j1*mMinStepSize + c) % mTotalSize;
302 int16_t v1 = *(int16_t*)(mRingBuffer + rpos);
303 rpos = (rpos + mMinStepSize) % mTotalSize;
304 int16_t v2 = *(int16_t*)(mRingBuffer + rpos);
305 *(int16_t*)(ptrToReadSlot + j*mMinStepSize + c) = std::round((1-a)*v1 + a*v2);
306 }
307 }
308 }
309 mBroadcastPosition += len;
310 }
311
312
313 //*******************************************************************************
processPacketLoss(int lostLen)314 void JitterBuffer::processPacketLoss(int lostLen)
315 {
316 mSkewRaw -= lostLen;
317
318 int32_t available = mWritePosition - mReadPosition;
319 int delta = std::min(available + mInSlotSize + lostLen - mMaxLatency, lostLen);
320 if (0 < delta) {
321 lostLen -= delta;
322 mBufDecPktLoss += delta;
323 mLevelCur = mMaxLatency;
324 mLastCorrCounter = 0;
325 mLastCorrDirection = 1;
326 }
327 else if (mSlotSize < available + lostLen && (
328 mOverflowDecTolerance > mMaxLatency // for strategies 0,1
329 || (0 < mLastCorrDirection && mLevelCur >
330 mMaxLatency - mOverflowDecTolerance*(1.1 - lastCorrFactor()))
331 )) {
332 delta = std::min(lostLen, mSlotSize);
333 lostLen -= delta;
334 mBufDecPktLoss += delta;
335 mLevelCur -= delta;
336 mLastCorrCounter = 0;
337 mLastCorrDirection = 1;
338 }
339 if (lostLen >= mTotalSize) {
340 std::memset(mRingBuffer, 0, mTotalSize);
341 mUnderruns += std::max(0, lostLen - std::max(0, -available));
342 }
343 else if (0 < lostLen) {
344 int wpos = mWritePosition % mTotalSize;
345 int n = std::min(mTotalSize - wpos, lostLen);
346 std::memset(mRingBuffer+wpos, 0, n);
347 if (n < lostLen) {
348 //cout << "split write: " << lostLen << "-" << n << endl;
349 std::memset(mRingBuffer, 0, lostLen-n);
350 }
351 mUnderruns += std::max(0, lostLen - std::max(0, -available));
352 }
353 mWritePosition += lostLen;
354 }
355
356 //*******************************************************************************
getStats(RingBuffer::IOStat * stat,bool reset)357 bool JitterBuffer::getStats(RingBuffer::IOStat* stat, bool reset)
358 {
359 QMutexLocker locker(&mMutex);
360 if (reset) {
361 mUnderruns = 0;
362 mOverflows = 0;
363 mSkew0 = mLevel;
364 mSkewRaw = 0;
365 mBufDecOverflow = 0;
366 mBufDecPktLoss = 0;
367 mBufIncUnderrun = 0;
368 mBufIncCompensate = 0;
369 mBroadcastSkew = 0;
370 }
371 stat->underruns = mUnderruns / mStatUnit;
372 stat->overflows = mOverflows / mStatUnit;
373 stat->skew = (int32_t)((mSkew0 - mLevel + mBufIncUnderrun + mBufIncCompensate
374 - mBufDecOverflow - mBufDecPktLoss)) / mStatUnit;
375 stat->skew_raw = mSkewRaw / mStatUnit;
376 stat->level = mLevel / mStatUnit;
377
378 stat->buf_dec_overflows = mBufDecOverflow / mStatUnit;
379 stat->buf_dec_pktloss = mBufDecPktLoss / mStatUnit;
380 stat->buf_inc_underrun = mBufIncUnderrun / mStatUnit;
381 stat->buf_inc_compensate = mBufIncCompensate / mStatUnit;
382 stat->broadcast_skew = mBroadcastSkew;
383 stat->broadcast_delta = mBroadcastDelta;
384
385 stat->autoq_corr = mAutoQueueCorr / mStatUnit * 10;
386 stat->autoq_rate = mAutoQRate / mStatUnit * 1000;
387 return true;
388 }
389
390