1 /***
2     This file is part of snapcast
3     Copyright (C) 2014-2021  Johannes Pohl
4 
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9 
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14 
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 ***/
18 
19 #ifndef NOMINMAX
20 #define NOMINMAX
21 #endif // NOMINMAX
22 
23 #include "stream.hpp"
24 #include "common/aixlog.hpp"
25 #include "common/snap_exception.hpp"
26 #include "common/str_compat.hpp"
27 #include "common/utils/logging.hpp"
28 #include "time_provider.hpp"
29 #include <cmath>
30 #include <cstring>
31 #include <iostream>
32 
33 
34 using namespace std;
35 namespace cs = chronos;
36 
37 static constexpr auto LOG_TAG = "Stream";
38 static constexpr auto kCorrectionBegin = 100us;
39 
40 // #define LOG_LATENCIES
41 
Stream(const SampleFormat & in_format,const SampleFormat & out_format)42 Stream::Stream(const SampleFormat& in_format, const SampleFormat& out_format)
43     : in_format_(in_format), median_(0), shortMedian_(0), lastUpdate_(0), playedFrames_(0), correctAfterXFrames_(0), bufferMs_(cs::msec(500)), frame_delta_(0),
44       hard_sync_(true)
45 {
46     buffer_.setSize(500);
47     shortBuffer_.setSize(100);
48     miniBuffer_.setSize(20);
49     latencies_.setSize(100);
50 
51     format_ = in_format_;
52     if (out_format.isInitialized())
53     {
54         format_.setFormat(out_format.rate() != 0 ? out_format.rate() : format_.rate(), out_format.bits() != 0 ? out_format.bits() : format_.bits(),
55                           out_format.channels() != 0 ? out_format.channels() : format_.channels());
56     }
57 
58     /*
59     48000     x
60     ------- = -----
61     47999,2   x - 1
62 
63     x = 1,000016667 / (1,000016667 - 1)
64     */
65     // setRealSampleRate(format_.rate());
66     resampler_ = std::make_unique<Resampler>(in_format_, format_);
67 }
68 
69 
setRealSampleRate(double sampleRate)70 void Stream::setRealSampleRate(double sampleRate)
71 {
72     if (sampleRate == format_.rate())
73     {
74         correctAfterXFrames_ = 0;
75     }
76     else
77     {
78         correctAfterXFrames_ = static_cast<int32_t>(round((format_.rate() / sampleRate) / (format_.rate() / sampleRate - 1.)));
79         // LOG(TRACE, LOG_TAG) << "Correct after X: " << correctAfterXFrames_ << " (Real rate: " << sampleRate << ", rate: " << format_.rate() << ")\n";
80     }
81 }
82 
83 
setBufferLen(size_t bufferLenMs)84 void Stream::setBufferLen(size_t bufferLenMs)
85 {
86     bufferMs_ = cs::msec(bufferLenMs);
87 }
88 
89 
clearChunks()90 void Stream::clearChunks()
91 {
92     std::lock_guard<std::mutex> lock(mutex_);
93     while (!chunks_.empty())
94         chunks_.pop();
95     resetBuffers();
96 }
97 
98 
addChunk(unique_ptr<msg::PcmChunk> chunk)99 void Stream::addChunk(unique_ptr<msg::PcmChunk> chunk)
100 {
101     // drop chunk if it's too old. Just in case, this shouldn't happen.
102     auto age = std::chrono::duration_cast<cs::msec>(TimeProvider::serverNow() - chunk->start());
103     if (age > 5s + bufferMs_)
104         return;
105 
106     auto resampled = resampler_->resample(std::move(chunk));
107     if (resampled)
108     {
109         std::lock_guard<std::mutex> lock(mutex_);
110         recent_ = resampled;
111         chunks_.push(resampled);
112 
113         std::shared_ptr<msg::PcmChunk> front_;
114         while (chunks_.front_copy(front_))
115         {
116             age = std::chrono::duration_cast<cs::msec>(TimeProvider::serverNow() - front_->start());
117             if ((age > 5s + bufferMs_) && chunks_.try_pop(front_))
118                 LOG(TRACE, LOG_TAG) << "Oldest chunk too old: " << age.count() << " ms, removing. Chunks in queue left: " << chunks_.size() << "\n";
119             else
120                 break;
121         }
122     }
123     // LOG(TRACE, LOG_TAG) << "new chunk: " << chunk->durationMs() << " ms, age: " << age.count() << " ms, Chunks: " << chunks_.size() << "\n";
124 }
125 
126 
waitForChunk(const std::chrono::milliseconds & timeout) const127 bool Stream::waitForChunk(const std::chrono::milliseconds& timeout) const
128 {
129     return chunks_.wait_for(timeout);
130 }
131 
132 
getSilentPlayerChunk(void * outputBuffer,uint32_t frames) const133 void Stream::getSilentPlayerChunk(void* outputBuffer, uint32_t frames) const
134 {
135     memset(outputBuffer, 0, frames * format_.frameSize());
136 }
137 
138 
getNextPlayerChunk(void * outputBuffer,uint32_t frames)139 cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, uint32_t frames)
140 {
141     if (!chunk_ && !chunks_.try_pop(chunk_))
142         throw SnapException("No chunks available, requested frames: " + cpt::to_string(frames));
143 
144     cs::time_point_clk tp = chunk_->start();
145     uint32_t read = 0;
146     while (read < frames)
147     {
148         read += chunk_->readFrames(static_cast<char*>(outputBuffer) + read * format_.frameSize(), frames - read);
149         if ((read < frames) && chunk_->isEndOfChunk() && !chunks_.try_pop(chunk_))
150             throw SnapException("Not enough frames available, requested frames: " + cpt::to_string(frames) + ", available: " + cpt::to_string(read));
151     }
152     return tp;
153 }
154 
155 
getNextPlayerChunk(void * outputBuffer,uint32_t frames,int32_t framesCorrection)156 cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, uint32_t frames, int32_t framesCorrection)
157 {
158     if (framesCorrection < 0 && frames + framesCorrection <= 0)
159     {
160         // Avoid underflow in new char[] constructor.
161         framesCorrection = -static_cast<int32_t>(frames) + 1;
162     }
163 
164     if (framesCorrection == 0)
165         return getNextPlayerChunk(outputBuffer, frames);
166 
167     frame_delta_ -= framesCorrection;
168 
169     uint32_t toRead = frames + framesCorrection;
170     if (toRead * format_.frameSize() > read_buffer_.size())
171         read_buffer_.resize(toRead * format_.frameSize());
172     cs::time_point_clk tp = getNextPlayerChunk(read_buffer_.data(), toRead);
173 
174     const auto max = framesCorrection < 0 ? frames : toRead;
175     // Divide the buffer into one more slice than frames that need to be dropped.
176     // We will drop/repeat 0 frames from the first slice, 1 frame from the second, ..., and framesCorrection frames from the last slice.
177     size_t slices = abs(framesCorrection) + 1;
178     if (slices > max)
179     {
180         // We cannot have more slices than frames, because that would cause
181         // size = 0 -> pos = 0 -> pos - n < 0 in the loop below
182         // Overwriting slices effectively corrects less frames than asked for in framesCorrection.
183         slices = max;
184     }
185     // Size of each slice. The last slice may be bigger.
186     auto size = max / slices;
187 
188     // LOG(TRACE, LOG_TAG) << "getNextPlayerChunk, frames: " << frames << ", correction: " << framesCorrection << " (" << toRead << "), slices: " << slices
189     // << "\n";
190 
191     size_t pos = 0;
192     for (size_t n = 0; n < slices; ++n)
193     {
194         // Adjust size in the last iteration, because the last slice may be bigger
195         if (n + 1 == slices)
196             size = max - pos;
197 
198         if (framesCorrection < 0)
199         {
200             // Read one frame less per slice from the input, but write a duplicated frame per slice to the output
201             // LOG(TRACE, LOG_TAG) << "duplicate - requested: " << frames << ", read: " << toRead << ", slice: " << n << ", size: " << size << ", out pos: " <<
202             // pos << ", source pos: " << pos - n << "\n";
203             memcpy(static_cast<char*>(outputBuffer) + pos * format_.frameSize(), read_buffer_.data() + (pos - n) * format_.frameSize(),
204                    size * format_.frameSize());
205         }
206         else
207         {
208             // Read all input frames, but skip a frame per slice when writing to the output.
209             // LOG(TRACE, LOG_TAG) << "remove - requested: " << frames << ", read: " << toRead << ", slice: " << n << ", size: " << size << ", out pos: " << pos
210             // - n << ", source pos: " << pos << "\n";
211             memcpy(static_cast<char*>(outputBuffer) + (pos - n) * format_.frameSize(), read_buffer_.data() + pos * format_.frameSize(),
212                    size * format_.frameSize());
213         }
214         pos += size;
215     }
216 
217     return tp;
218 }
219 
220 
updateBuffers(chronos::usec::rep age)221 void Stream::updateBuffers(chronos::usec::rep age)
222 {
223     buffer_.add(age);
224     miniBuffer_.add(age);
225     shortBuffer_.add(age);
226 }
227 
228 
resetBuffers()229 void Stream::resetBuffers()
230 {
231     buffer_.clear();
232     miniBuffer_.clear();
233     shortBuffer_.clear();
234 }
235 
236 
getPlayerChunk(void * outputBuffer,const cs::usec & outputBufferDacTime,uint32_t frames)237 bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacTime, uint32_t frames)
238 {
239     if (outputBufferDacTime > bufferMs_)
240     {
241         LOG(INFO, LOG_TAG) << "outputBufferDacTime > bufferMs: " << cs::duration<cs::msec>(outputBufferDacTime) << " > " << cs::duration<cs::msec>(bufferMs_)
242                            << "\n";
243         return false;
244     }
245 
246     std::lock_guard<std::mutex> lock(mutex_);
247     time_t now = time(nullptr);
248     if (!chunk_ && !chunks_.try_pop(chunk_))
249     {
250         if (now != lastUpdate_)
251         {
252             lastUpdate_ = now;
253             LOG(INFO, LOG_TAG) << "No chunks available\n";
254         }
255         return false;
256     }
257 
258 #ifdef LOG_LATENCIES
259     // calculate the estimated end to end latency
260     if (recent_)
261     {
262         cs::nsec req_chunk_duration = cs::nsec(static_cast<cs::nsec::rep>(frames / format_.nsRate()));
263         auto youngest = recent_->end() - req_chunk_duration;
264         cs::msec age = std::chrono::duration_cast<cs::msec>(TimeProvider::serverNow() - youngest + outputBufferDacTime);
265         latencies_.add(age.count());
266     }
267 #endif
268 
269     /// we have a chunk
270     /// age = chunk age (server now - rec time: some positive value) - buffer (e.g. 1000ms) + time to DAC
271     /// age = 0 => play now
272     /// age < 0 => play in -age => wait for a while, play silence in the meantime
273     /// age > 0 => too old      => throw them away
274 
275     try
276     {
277         if (hard_sync_)
278         {
279             cs::nsec req_chunk_duration = cs::nsec(static_cast<cs::nsec::rep>(frames / format_.nsRate()));
280             cs::usec age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime;
281             // LOG(INFO, LOG_TAG) << "age: " << age.count() / 1000 << ", buffer: " <<
282             // std::chrono::duration_cast<chrono::milliseconds>(req_chunk_duration).count() << "\n";
283             if (age < -req_chunk_duration)
284             {
285                 // the oldest chunk (top of the stream) is too young for the buffer
286                 // e.g. age = -100ms (=> should be played in 100ms)
287                 // but the requested chunk duration is 50ms, so there is not data in this iteration available
288                 getSilentPlayerChunk(outputBuffer, frames);
289                 return true;
290             }
291             else
292             {
293                 if (age.count() > 0)
294                 {
295                     LOG(DEBUG, LOG_TAG) << "age > 0: " << age.count() / 1000 << "ms, dropping old chunks\n";
296                     // age > 0: the top of the stream is too old. We must fast foward.
297                     // delete the current chunk, it's too old. This will avoid an endless loop if there is no chunk in the queue.
298                     chunk_ = nullptr;
299                     while (chunks_.try_pop(chunk_))
300                     {
301                         age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime;
302                         LOG(DEBUG, LOG_TAG) << "age: " << age.count() / 1000 << ", requested chunk_duration: "
303                                             << std::chrono::duration_cast<std::chrono::milliseconds>(req_chunk_duration).count()
304                                             << ", duration: " << chunk_->duration<std::chrono::milliseconds>().count() << "\n";
305                         // check if the current chunk's end is older than age => can be player
306                         if ((age.count() > 0) && (age < chunk_->duration<cs::usec>()))
307                         {
308                             // fast forward by "age" to get in sync, i.e. age = 0
309                             chunk_->seek(static_cast<uint32_t>(chunk_->format.nsRate() * std::chrono::duration_cast<cs::nsec>(age).count()));
310                             age = 0s;
311                         }
312                         if (age.count() <= 0)
313                             break;
314                     }
315                 }
316 
317                 if (age.count() <= 0)
318                 {
319                     // the oldest chunk (top of the stream) can be played in this iteration
320                     // e.g. age = -20ms (=> should be played in 20ms)
321                     // and the current chunk duration is 50ms, so we need to play 20ms silence (as we don't have data)
322                     // and can play 30ms of the stream
323                     uint32_t silent_frames = static_cast<uint32_t>(-chunk_->format.nsRate() * std::chrono::duration_cast<cs::nsec>(age).count());
324                     bool result = (silent_frames <= frames);
325                     silent_frames = std::min(silent_frames, frames);
326                     if (silent_frames > 0)
327                     {
328                         LOG(DEBUG, LOG_TAG) << "Silent frames: " << silent_frames << ", frames: " << frames
329                                             << ", age: " << std::chrono::duration_cast<cs::usec>(age).count() / 1000. << "\n";
330                         getSilentPlayerChunk(outputBuffer, silent_frames);
331                     }
332                     getNextPlayerChunk(static_cast<char*>(outputBuffer) + (chunk_->format.frameSize() * silent_frames), frames - silent_frames);
333 
334                     if (result)
335                     {
336                         hard_sync_ = false;
337                         resetBuffers();
338                     }
339                     return true;
340                 }
341                 return false;
342             }
343         }
344 
345         // sample rate correction
346         // framesCorrection = number of frames to be read more or less to get in-sync
347         int32_t framesCorrection = 0;
348         if (correctAfterXFrames_ != 0)
349         {
350             playedFrames_ += frames;
351             if (playedFrames_ >= static_cast<uint32_t>(abs(correctAfterXFrames_)))
352             {
353                 framesCorrection = static_cast<int32_t>(playedFrames_) / correctAfterXFrames_;
354                 playedFrames_ %= abs(correctAfterXFrames_);
355             }
356         }
357 
358         cs::usec age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, frames, framesCorrection) - bufferMs_ +
359                                                             outputBufferDacTime);
360 
361         setRealSampleRate(format_.rate());
362         // check if we need a hard sync
363         if (buffer_.full() && (cs::usec(abs(median_)) > cs::msec(2)) && (cs::abs(age) > cs::usec(500)))
364         {
365             LOG(INFO, LOG_TAG) << "pBuffer->full() && (abs(median_) > 2): " << median_ << "\n";
366             hard_sync_ = true;
367         }
368         else if (shortBuffer_.full() && (cs::usec(abs(shortMedian_)) > cs::msec(5)) && (cs::abs(age) > cs::usec(500)))
369         {
370             LOG(INFO, LOG_TAG) << "pShortBuffer->full() && (abs(shortMedian_) > 5): " << shortMedian_ << "\n";
371             hard_sync_ = true;
372         }
373         else if (miniBuffer_.full() && (cs::usec(abs(miniBuffer_.median())) > cs::msec(50)) && (cs::abs(age) > cs::usec(500)))
374         {
375             LOG(INFO, LOG_TAG) << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer_.median() << "\n";
376             hard_sync_ = true;
377         }
378         else if (cs::abs(age) > 500ms)
379         {
380             LOG(INFO, LOG_TAG) << "abs(age > 500): " << cs::abs(age).count() << "\n";
381             hard_sync_ = true;
382         }
383         else if (shortBuffer_.full())
384         {
385             // No hard sync needed
386             // Check if we need a samplerate correction (change playback speed (soft sync))
387             auto miniMedian = miniBuffer_.median();
388             if ((cs::usec(shortMedian_) > kCorrectionBegin) && (cs::usec(miniMedian) > cs::usec(50)) && (cs::usec(age) > cs::usec(50)))
389             {
390                 double rate = (shortMedian_ / 100) * 0.00005;
391                 rate = 1.0 - std::min(rate, 0.0005);
392                 // LOG(INFO, LOG_TAG) << "Rate: " << rate << "\n";
393                 // we are late (age > 0), this means we are not playing fast enough
394                 // => the real sample rate seems to be lower, we have to drop some frames
395                 setRealSampleRate(format_.rate() * rate); // 0.9999);
396             }
397             else if ((cs::usec(shortMedian_) < -kCorrectionBegin) && (cs::usec(miniMedian) < -cs::usec(50)) && (cs::usec(age) < -cs::usec(50)))
398             {
399                 double rate = (-shortMedian_ / 100) * 0.00005;
400                 rate = 1.0 + std::min(rate, 0.0005);
401                 // LOG(INFO, LOG_TAG) << "Rate: " << rate << "\n";
402                 // we are early (age > 0), this means we are playing too fast
403                 // => the real sample rate seems to be higher, we have to insert some frames
404                 setRealSampleRate(format_.rate() * rate); // 1.0001);
405             }
406         }
407 
408         updateBuffers(age.count());
409 
410         // update median_ and shortMedian_ and print sync stats
411         if (now != lastUpdate_)
412         {
413             // log buffer stats
414             lastUpdate_ = now;
415             median_ = buffer_.median();
416             shortMedian_ = shortBuffer_.median();
417             LOG(DEBUG, "Stats") << "Chunk: " << age.count() / 100 << "\t" << miniBuffer_.median() / 100 << "\t" << shortMedian_ / 100 << "\t" << median_ / 100
418                                 << "\t" << buffer_.size() << "\t" << cs::duration<cs::msec>(outputBufferDacTime) << "\t" << frame_delta_ << "\n";
419             frame_delta_ = 0;
420 
421 #ifdef LOG_LATENCIES
422             // log latencies
423             std::array<uint8_t, 5> percents = {100, 99, 95, 50, 5};
424             auto percentiles = latencies_.percentiles(percents);
425             std::stringstream ss;
426             for (std::size_t n = 0; n < percents.size(); ++n)
427                 ss << ((n > 0) ? ", " : "") << (int)percents[n] << "%: " << percentiles[n];
428             LOG(DEBUG, "Latency") << ss.str() << "\n";
429 #endif
430         }
431         return (abs(cs::duration<cs::msec>(age)) < 500);
432     }
433     catch (const std::exception& e)
434     {
435         LOG(INFO, LOG_TAG) << "Exception: " << e.what() << "\n";
436         hard_sync_ = true;
437         return false;
438     }
439 }
440 
441 
getPlayerChunkOrSilence(void * outputBuffer,const chronos::usec & outputBufferDacTime,uint32_t frames)442 bool Stream::getPlayerChunkOrSilence(void* outputBuffer, const chronos::usec& outputBufferDacTime, uint32_t frames)
443 {
444     bool result = getPlayerChunk(outputBuffer, outputBufferDacTime, frames);
445     if (!result)
446     {
447         static utils::logging::TimeConditional cond(1s);
448         LOG(DEBUG, LOG_TAG) << cond << "Failed to get chunk, returning silence\n";
449         getSilentPlayerChunk(outputBuffer, frames);
450     }
451     return result;
452 }
453