1 /***
2 This file is part of snapcast
3 Copyright (C) 2014-2021 Johannes Pohl
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 ***/
18
19 #ifndef NOMINMAX
20 #define NOMINMAX
21 #endif // NOMINMAX
22
23 #include "stream.hpp"
24 #include "common/aixlog.hpp"
25 #include "common/snap_exception.hpp"
26 #include "common/str_compat.hpp"
27 #include "common/utils/logging.hpp"
28 #include "time_provider.hpp"
29 #include <cmath>
30 #include <cstring>
31 #include <iostream>
32
33
34 using namespace std;
35 namespace cs = chronos;
36
37 static constexpr auto LOG_TAG = "Stream";
38 static constexpr auto kCorrectionBegin = 100us;
39
40 // #define LOG_LATENCIES
41
Stream(const SampleFormat & in_format,const SampleFormat & out_format)42 Stream::Stream(const SampleFormat& in_format, const SampleFormat& out_format)
43 : in_format_(in_format), median_(0), shortMedian_(0), lastUpdate_(0), playedFrames_(0), correctAfterXFrames_(0), bufferMs_(cs::msec(500)), frame_delta_(0),
44 hard_sync_(true)
45 {
46 buffer_.setSize(500);
47 shortBuffer_.setSize(100);
48 miniBuffer_.setSize(20);
49 latencies_.setSize(100);
50
51 format_ = in_format_;
52 if (out_format.isInitialized())
53 {
54 format_.setFormat(out_format.rate() != 0 ? out_format.rate() : format_.rate(), out_format.bits() != 0 ? out_format.bits() : format_.bits(),
55 out_format.channels() != 0 ? out_format.channels() : format_.channels());
56 }
57
58 /*
59 48000 x
60 ------- = -----
61 47999,2 x - 1
62
63 x = 1,000016667 / (1,000016667 - 1)
64 */
65 // setRealSampleRate(format_.rate());
66 resampler_ = std::make_unique<Resampler>(in_format_, format_);
67 }
68
69
setRealSampleRate(double sampleRate)70 void Stream::setRealSampleRate(double sampleRate)
71 {
72 if (sampleRate == format_.rate())
73 {
74 correctAfterXFrames_ = 0;
75 }
76 else
77 {
78 correctAfterXFrames_ = static_cast<int32_t>(round((format_.rate() / sampleRate) / (format_.rate() / sampleRate - 1.)));
79 // LOG(TRACE, LOG_TAG) << "Correct after X: " << correctAfterXFrames_ << " (Real rate: " << sampleRate << ", rate: " << format_.rate() << ")\n";
80 }
81 }
82
83
setBufferLen(size_t bufferLenMs)84 void Stream::setBufferLen(size_t bufferLenMs)
85 {
86 bufferMs_ = cs::msec(bufferLenMs);
87 }
88
89
clearChunks()90 void Stream::clearChunks()
91 {
92 std::lock_guard<std::mutex> lock(mutex_);
93 while (!chunks_.empty())
94 chunks_.pop();
95 resetBuffers();
96 }
97
98
addChunk(unique_ptr<msg::PcmChunk> chunk)99 void Stream::addChunk(unique_ptr<msg::PcmChunk> chunk)
100 {
101 // drop chunk if it's too old. Just in case, this shouldn't happen.
102 auto age = std::chrono::duration_cast<cs::msec>(TimeProvider::serverNow() - chunk->start());
103 if (age > 5s + bufferMs_)
104 return;
105
106 auto resampled = resampler_->resample(std::move(chunk));
107 if (resampled)
108 {
109 std::lock_guard<std::mutex> lock(mutex_);
110 recent_ = resampled;
111 chunks_.push(resampled);
112
113 std::shared_ptr<msg::PcmChunk> front_;
114 while (chunks_.front_copy(front_))
115 {
116 age = std::chrono::duration_cast<cs::msec>(TimeProvider::serverNow() - front_->start());
117 if ((age > 5s + bufferMs_) && chunks_.try_pop(front_))
118 LOG(TRACE, LOG_TAG) << "Oldest chunk too old: " << age.count() << " ms, removing. Chunks in queue left: " << chunks_.size() << "\n";
119 else
120 break;
121 }
122 }
123 // LOG(TRACE, LOG_TAG) << "new chunk: " << chunk->durationMs() << " ms, age: " << age.count() << " ms, Chunks: " << chunks_.size() << "\n";
124 }
125
126
waitForChunk(const std::chrono::milliseconds & timeout) const127 bool Stream::waitForChunk(const std::chrono::milliseconds& timeout) const
128 {
129 return chunks_.wait_for(timeout);
130 }
131
132
getSilentPlayerChunk(void * outputBuffer,uint32_t frames) const133 void Stream::getSilentPlayerChunk(void* outputBuffer, uint32_t frames) const
134 {
135 memset(outputBuffer, 0, frames * format_.frameSize());
136 }
137
138
getNextPlayerChunk(void * outputBuffer,uint32_t frames)139 cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, uint32_t frames)
140 {
141 if (!chunk_ && !chunks_.try_pop(chunk_))
142 throw SnapException("No chunks available, requested frames: " + cpt::to_string(frames));
143
144 cs::time_point_clk tp = chunk_->start();
145 uint32_t read = 0;
146 while (read < frames)
147 {
148 read += chunk_->readFrames(static_cast<char*>(outputBuffer) + read * format_.frameSize(), frames - read);
149 if ((read < frames) && chunk_->isEndOfChunk() && !chunks_.try_pop(chunk_))
150 throw SnapException("Not enough frames available, requested frames: " + cpt::to_string(frames) + ", available: " + cpt::to_string(read));
151 }
152 return tp;
153 }
154
155
getNextPlayerChunk(void * outputBuffer,uint32_t frames,int32_t framesCorrection)156 cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, uint32_t frames, int32_t framesCorrection)
157 {
158 if (framesCorrection < 0 && frames + framesCorrection <= 0)
159 {
160 // Avoid underflow in new char[] constructor.
161 framesCorrection = -static_cast<int32_t>(frames) + 1;
162 }
163
164 if (framesCorrection == 0)
165 return getNextPlayerChunk(outputBuffer, frames);
166
167 frame_delta_ -= framesCorrection;
168
169 uint32_t toRead = frames + framesCorrection;
170 if (toRead * format_.frameSize() > read_buffer_.size())
171 read_buffer_.resize(toRead * format_.frameSize());
172 cs::time_point_clk tp = getNextPlayerChunk(read_buffer_.data(), toRead);
173
174 const auto max = framesCorrection < 0 ? frames : toRead;
175 // Divide the buffer into one more slice than frames that need to be dropped.
176 // We will drop/repeat 0 frames from the first slice, 1 frame from the second, ..., and framesCorrection frames from the last slice.
177 size_t slices = abs(framesCorrection) + 1;
178 if (slices > max)
179 {
180 // We cannot have more slices than frames, because that would cause
181 // size = 0 -> pos = 0 -> pos - n < 0 in the loop below
182 // Overwriting slices effectively corrects less frames than asked for in framesCorrection.
183 slices = max;
184 }
185 // Size of each slice. The last slice may be bigger.
186 auto size = max / slices;
187
188 // LOG(TRACE, LOG_TAG) << "getNextPlayerChunk, frames: " << frames << ", correction: " << framesCorrection << " (" << toRead << "), slices: " << slices
189 // << "\n";
190
191 size_t pos = 0;
192 for (size_t n = 0; n < slices; ++n)
193 {
194 // Adjust size in the last iteration, because the last slice may be bigger
195 if (n + 1 == slices)
196 size = max - pos;
197
198 if (framesCorrection < 0)
199 {
200 // Read one frame less per slice from the input, but write a duplicated frame per slice to the output
201 // LOG(TRACE, LOG_TAG) << "duplicate - requested: " << frames << ", read: " << toRead << ", slice: " << n << ", size: " << size << ", out pos: " <<
202 // pos << ", source pos: " << pos - n << "\n";
203 memcpy(static_cast<char*>(outputBuffer) + pos * format_.frameSize(), read_buffer_.data() + (pos - n) * format_.frameSize(),
204 size * format_.frameSize());
205 }
206 else
207 {
208 // Read all input frames, but skip a frame per slice when writing to the output.
209 // LOG(TRACE, LOG_TAG) << "remove - requested: " << frames << ", read: " << toRead << ", slice: " << n << ", size: " << size << ", out pos: " << pos
210 // - n << ", source pos: " << pos << "\n";
211 memcpy(static_cast<char*>(outputBuffer) + (pos - n) * format_.frameSize(), read_buffer_.data() + pos * format_.frameSize(),
212 size * format_.frameSize());
213 }
214 pos += size;
215 }
216
217 return tp;
218 }
219
220
updateBuffers(chronos::usec::rep age)221 void Stream::updateBuffers(chronos::usec::rep age)
222 {
223 buffer_.add(age);
224 miniBuffer_.add(age);
225 shortBuffer_.add(age);
226 }
227
228
resetBuffers()229 void Stream::resetBuffers()
230 {
231 buffer_.clear();
232 miniBuffer_.clear();
233 shortBuffer_.clear();
234 }
235
236
getPlayerChunk(void * outputBuffer,const cs::usec & outputBufferDacTime,uint32_t frames)237 bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacTime, uint32_t frames)
238 {
239 if (outputBufferDacTime > bufferMs_)
240 {
241 LOG(INFO, LOG_TAG) << "outputBufferDacTime > bufferMs: " << cs::duration<cs::msec>(outputBufferDacTime) << " > " << cs::duration<cs::msec>(bufferMs_)
242 << "\n";
243 return false;
244 }
245
246 std::lock_guard<std::mutex> lock(mutex_);
247 time_t now = time(nullptr);
248 if (!chunk_ && !chunks_.try_pop(chunk_))
249 {
250 if (now != lastUpdate_)
251 {
252 lastUpdate_ = now;
253 LOG(INFO, LOG_TAG) << "No chunks available\n";
254 }
255 return false;
256 }
257
258 #ifdef LOG_LATENCIES
259 // calculate the estimated end to end latency
260 if (recent_)
261 {
262 cs::nsec req_chunk_duration = cs::nsec(static_cast<cs::nsec::rep>(frames / format_.nsRate()));
263 auto youngest = recent_->end() - req_chunk_duration;
264 cs::msec age = std::chrono::duration_cast<cs::msec>(TimeProvider::serverNow() - youngest + outputBufferDacTime);
265 latencies_.add(age.count());
266 }
267 #endif
268
269 /// we have a chunk
270 /// age = chunk age (server now - rec time: some positive value) - buffer (e.g. 1000ms) + time to DAC
271 /// age = 0 => play now
272 /// age < 0 => play in -age => wait for a while, play silence in the meantime
273 /// age > 0 => too old => throw them away
274
275 try
276 {
277 if (hard_sync_)
278 {
279 cs::nsec req_chunk_duration = cs::nsec(static_cast<cs::nsec::rep>(frames / format_.nsRate()));
280 cs::usec age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime;
281 // LOG(INFO, LOG_TAG) << "age: " << age.count() / 1000 << ", buffer: " <<
282 // std::chrono::duration_cast<chrono::milliseconds>(req_chunk_duration).count() << "\n";
283 if (age < -req_chunk_duration)
284 {
285 // the oldest chunk (top of the stream) is too young for the buffer
286 // e.g. age = -100ms (=> should be played in 100ms)
287 // but the requested chunk duration is 50ms, so there is not data in this iteration available
288 getSilentPlayerChunk(outputBuffer, frames);
289 return true;
290 }
291 else
292 {
293 if (age.count() > 0)
294 {
295 LOG(DEBUG, LOG_TAG) << "age > 0: " << age.count() / 1000 << "ms, dropping old chunks\n";
296 // age > 0: the top of the stream is too old. We must fast foward.
297 // delete the current chunk, it's too old. This will avoid an endless loop if there is no chunk in the queue.
298 chunk_ = nullptr;
299 while (chunks_.try_pop(chunk_))
300 {
301 age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime;
302 LOG(DEBUG, LOG_TAG) << "age: " << age.count() / 1000 << ", requested chunk_duration: "
303 << std::chrono::duration_cast<std::chrono::milliseconds>(req_chunk_duration).count()
304 << ", duration: " << chunk_->duration<std::chrono::milliseconds>().count() << "\n";
305 // check if the current chunk's end is older than age => can be player
306 if ((age.count() > 0) && (age < chunk_->duration<cs::usec>()))
307 {
308 // fast forward by "age" to get in sync, i.e. age = 0
309 chunk_->seek(static_cast<uint32_t>(chunk_->format.nsRate() * std::chrono::duration_cast<cs::nsec>(age).count()));
310 age = 0s;
311 }
312 if (age.count() <= 0)
313 break;
314 }
315 }
316
317 if (age.count() <= 0)
318 {
319 // the oldest chunk (top of the stream) can be played in this iteration
320 // e.g. age = -20ms (=> should be played in 20ms)
321 // and the current chunk duration is 50ms, so we need to play 20ms silence (as we don't have data)
322 // and can play 30ms of the stream
323 uint32_t silent_frames = static_cast<uint32_t>(-chunk_->format.nsRate() * std::chrono::duration_cast<cs::nsec>(age).count());
324 bool result = (silent_frames <= frames);
325 silent_frames = std::min(silent_frames, frames);
326 if (silent_frames > 0)
327 {
328 LOG(DEBUG, LOG_TAG) << "Silent frames: " << silent_frames << ", frames: " << frames
329 << ", age: " << std::chrono::duration_cast<cs::usec>(age).count() / 1000. << "\n";
330 getSilentPlayerChunk(outputBuffer, silent_frames);
331 }
332 getNextPlayerChunk(static_cast<char*>(outputBuffer) + (chunk_->format.frameSize() * silent_frames), frames - silent_frames);
333
334 if (result)
335 {
336 hard_sync_ = false;
337 resetBuffers();
338 }
339 return true;
340 }
341 return false;
342 }
343 }
344
345 // sample rate correction
346 // framesCorrection = number of frames to be read more or less to get in-sync
347 int32_t framesCorrection = 0;
348 if (correctAfterXFrames_ != 0)
349 {
350 playedFrames_ += frames;
351 if (playedFrames_ >= static_cast<uint32_t>(abs(correctAfterXFrames_)))
352 {
353 framesCorrection = static_cast<int32_t>(playedFrames_) / correctAfterXFrames_;
354 playedFrames_ %= abs(correctAfterXFrames_);
355 }
356 }
357
358 cs::usec age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, frames, framesCorrection) - bufferMs_ +
359 outputBufferDacTime);
360
361 setRealSampleRate(format_.rate());
362 // check if we need a hard sync
363 if (buffer_.full() && (cs::usec(abs(median_)) > cs::msec(2)) && (cs::abs(age) > cs::usec(500)))
364 {
365 LOG(INFO, LOG_TAG) << "pBuffer->full() && (abs(median_) > 2): " << median_ << "\n";
366 hard_sync_ = true;
367 }
368 else if (shortBuffer_.full() && (cs::usec(abs(shortMedian_)) > cs::msec(5)) && (cs::abs(age) > cs::usec(500)))
369 {
370 LOG(INFO, LOG_TAG) << "pShortBuffer->full() && (abs(shortMedian_) > 5): " << shortMedian_ << "\n";
371 hard_sync_ = true;
372 }
373 else if (miniBuffer_.full() && (cs::usec(abs(miniBuffer_.median())) > cs::msec(50)) && (cs::abs(age) > cs::usec(500)))
374 {
375 LOG(INFO, LOG_TAG) << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer_.median() << "\n";
376 hard_sync_ = true;
377 }
378 else if (cs::abs(age) > 500ms)
379 {
380 LOG(INFO, LOG_TAG) << "abs(age > 500): " << cs::abs(age).count() << "\n";
381 hard_sync_ = true;
382 }
383 else if (shortBuffer_.full())
384 {
385 // No hard sync needed
386 // Check if we need a samplerate correction (change playback speed (soft sync))
387 auto miniMedian = miniBuffer_.median();
388 if ((cs::usec(shortMedian_) > kCorrectionBegin) && (cs::usec(miniMedian) > cs::usec(50)) && (cs::usec(age) > cs::usec(50)))
389 {
390 double rate = (shortMedian_ / 100) * 0.00005;
391 rate = 1.0 - std::min(rate, 0.0005);
392 // LOG(INFO, LOG_TAG) << "Rate: " << rate << "\n";
393 // we are late (age > 0), this means we are not playing fast enough
394 // => the real sample rate seems to be lower, we have to drop some frames
395 setRealSampleRate(format_.rate() * rate); // 0.9999);
396 }
397 else if ((cs::usec(shortMedian_) < -kCorrectionBegin) && (cs::usec(miniMedian) < -cs::usec(50)) && (cs::usec(age) < -cs::usec(50)))
398 {
399 double rate = (-shortMedian_ / 100) * 0.00005;
400 rate = 1.0 + std::min(rate, 0.0005);
401 // LOG(INFO, LOG_TAG) << "Rate: " << rate << "\n";
402 // we are early (age > 0), this means we are playing too fast
403 // => the real sample rate seems to be higher, we have to insert some frames
404 setRealSampleRate(format_.rate() * rate); // 1.0001);
405 }
406 }
407
408 updateBuffers(age.count());
409
410 // update median_ and shortMedian_ and print sync stats
411 if (now != lastUpdate_)
412 {
413 // log buffer stats
414 lastUpdate_ = now;
415 median_ = buffer_.median();
416 shortMedian_ = shortBuffer_.median();
417 LOG(DEBUG, "Stats") << "Chunk: " << age.count() / 100 << "\t" << miniBuffer_.median() / 100 << "\t" << shortMedian_ / 100 << "\t" << median_ / 100
418 << "\t" << buffer_.size() << "\t" << cs::duration<cs::msec>(outputBufferDacTime) << "\t" << frame_delta_ << "\n";
419 frame_delta_ = 0;
420
421 #ifdef LOG_LATENCIES
422 // log latencies
423 std::array<uint8_t, 5> percents = {100, 99, 95, 50, 5};
424 auto percentiles = latencies_.percentiles(percents);
425 std::stringstream ss;
426 for (std::size_t n = 0; n < percents.size(); ++n)
427 ss << ((n > 0) ? ", " : "") << (int)percents[n] << "%: " << percentiles[n];
428 LOG(DEBUG, "Latency") << ss.str() << "\n";
429 #endif
430 }
431 return (abs(cs::duration<cs::msec>(age)) < 500);
432 }
433 catch (const std::exception& e)
434 {
435 LOG(INFO, LOG_TAG) << "Exception: " << e.what() << "\n";
436 hard_sync_ = true;
437 return false;
438 }
439 }
440
441
getPlayerChunkOrSilence(void * outputBuffer,const chronos::usec & outputBufferDacTime,uint32_t frames)442 bool Stream::getPlayerChunkOrSilence(void* outputBuffer, const chronos::usec& outputBufferDacTime, uint32_t frames)
443 {
444 bool result = getPlayerChunk(outputBuffer, outputBufferDacTime, frames);
445 if (!result)
446 {
447 static utils::logging::TimeConditional cond(1s);
448 LOG(DEBUG, LOG_TAG) << cond << "Failed to get chunk, returning silence\n";
449 getSilentPlayerChunk(outputBuffer, frames);
450 }
451 return result;
452 }
453