1 #include "engine/sidechain/enginenetworkstream.h"
2 
3 #ifdef __WINDOWS__
4 #include <windows.h>
5 #include "util/performancetimer.h"
6 #else
7 #include <sys/time.h>
8 #include <unistd.h>
9 #endif
10 
11 #ifdef __WINDOWS__
12 // For GetSystemTimeAsFileTime and GetSystemTimePreciseAsFileTime
13 typedef VOID (WINAPI *PgGetSystemTimeFn)(LPFILETIME);
14 static PgGetSystemTimeFn s_pfpgGetSystemTimeFn = NULL;
15 #endif
16 
17 #include "broadcast/defs_broadcast.h"
18 #include "util/logger.h"
19 #include "util/sample.h"
20 
21 namespace {
22 const int kNetworkLatencyFrames = 8192; // 185 ms @ 44100 Hz
23 // Related chunk sizes:
24 // Mp3 frames = 1152 samples
25 // Ogg frames = 64 to 8192 samples.
26 // In Mixxx 1.11 we transmit every decoder-frames at once,
27 // Which results in case of ogg in a dynamic latency from 0.14 ms to to 185 ms
28 // Now we have switched to a fixed latency of 8192 frames (stereo samples) =
29 // which is 185 @ 44100 ms and twice the maximum of the max mixxx audio buffer
30 const int kBufferFrames = kNetworkLatencyFrames * 4; // 743 ms @ 44100 Hz
31 // normally * 2 is sufficient.
32 // We allow to buffer two extra chunks for a CPU overload case, when
33 // the broadcast thread is not scheduled in time.
34 
35 const mixxx::Logger kLogger("EngineNetworkStream");
36 } // namespace
37 
EngineNetworkStream(int numOutputChannels,int numInputChannels)38 EngineNetworkStream::EngineNetworkStream(int numOutputChannels,
39                                          int numInputChannels)
40     : m_pInputFifo(nullptr),
41       m_numOutputChannels(numOutputChannels),
42       m_numInputChannels(numInputChannels),
43       m_sampleRate(0),
44       m_inputStreamStartTimeUs(-1),
45       m_inputStreamFramesWritten(0),
46       m_inputStreamFramesRead(0),
47       m_outputWorkers(BROADCAST_MAX_CONNECTIONS) {
48     if (numInputChannels) {
49         m_pInputFifo = new FIFO<CSAMPLE>(numInputChannels * kBufferFrames);
50     }
51 
52 #ifdef __WINDOWS__
53     // Resolution:
54     // 15   ms for GetSystemTimeAsFileTime
55     //  0.4 ms for GetSystemTimePreciseAsFileTime
56     // Performance:
57     //    9 cycles for GetSystemTimeAsFileTime
58     // 2761 cycles for GetSystemTimePreciseAsFileTime
59     HMODULE kernel32_dll = LoadLibraryW(L"kernel32.dll");
60     if (kernel32_dll) {
61         // for a 0.0004 ms Resolution on Win8
62         s_pfpgGetSystemTimeFn = (PgGetSystemTimeFn)GetProcAddress(
63                 kernel32_dll, "GetSystemTimePreciseAsFileTime");
64     }
65 #endif
66 }
67 
~EngineNetworkStream()68 EngineNetworkStream::~EngineNetworkStream() {
69     if (m_inputStreamStartTimeUs >= 0) {
70         stopStream();
71     }
72 
73     delete m_pInputFifo;
74 }
75 
startStream(double sampleRate)76 void EngineNetworkStream::startStream(double sampleRate) {
77     m_sampleRate = sampleRate;
78     m_inputStreamStartTimeUs = getNetworkTimeUs();
79     m_inputStreamFramesWritten = 0;
80 
81     for (NetworkOutputStreamWorkerPtr worker : qAsConst(m_outputWorkers)) {
82         if (worker.isNull()) {
83             continue;
84         }
85 
86         worker->startStream(m_sampleRate, m_numOutputChannels);
87     }
88 }
89 
stopStream()90 void EngineNetworkStream::stopStream() {
91     m_inputStreamStartTimeUs = -1;
92 
93     for (NetworkOutputStreamWorkerPtr worker : qAsConst(m_outputWorkers)) {
94         if (worker.isNull()) {
95             continue;
96         }
97 
98         worker->stopStream();
99     }
100 }
101 
getReadExpected()102 int EngineNetworkStream::getReadExpected() {
103     return static_cast<int>(getInputStreamTimeFrames() - m_inputStreamFramesRead);
104 }
105 
read(CSAMPLE * buffer,int frames)106 void EngineNetworkStream::read(CSAMPLE* buffer, int frames) {
107     int readAvailable = m_pInputFifo->readAvailable();
108     int readRequired = frames * m_numInputChannels;
109     int copyCount = math_min(readAvailable, readRequired);
110     if (copyCount > 0) {
111         (void)m_pInputFifo->read(buffer, copyCount);
112         buffer += copyCount;
113     }
114     if (readAvailable < readRequired) {
115         // Fill missing Samples with silence
116         int silenceCount = readRequired - readAvailable;
117         kLogger.debug() << "write: flushed"
118                         << readRequired << "samples";
119         SampleUtil::clear(buffer, silenceCount);
120     }
121 }
122 
getInputStreamTimeFrames()123 qint64 EngineNetworkStream::getInputStreamTimeFrames() {
124     return static_cast<qint64>(static_cast<double>(getInputStreamTimeUs()) *
125             m_sampleRate / 1000000.0);
126 }
127 
getInputStreamTimeUs()128 qint64 EngineNetworkStream::getInputStreamTimeUs() {
129     return getNetworkTimeUs() - m_inputStreamStartTimeUs;
130 }
131 
132 // static
getNetworkTimeUs()133 qint64 EngineNetworkStream::getNetworkTimeUs() {
134     // This matches the GPL2 implementation found in
135     // https://github.com/codders/libshout/blob/a17fb84671d3732317b0353d7281cc47e2df6cf6/src/timing/timing.c
136     // Instead of ms resolution we use a us resolution to allow low latency settings
137     // will overflow > 200,000 years
138 #ifdef __WINDOWS__
139     FILETIME ft;
140     // no GetSystemTimePreciseAsFileTime available, fall
141     // back to GetSystemTimeAsFileTime. This happens before
142     // Windows 8 and Windows Server 2012
143     // GetSystemTime?AsFileTime is NTP adjusted
144     // QueryPerformanceCounter depends on the CPU crystal
145     if(s_pfpgGetSystemTimeFn) {
146         s_pfpgGetSystemTimeFn(&ft);
147         return ((qint64)ft.dwHighDateTime << 32 | ft.dwLowDateTime) / 10;
148     } else {
149         static qint64 oldNow = 0;
150         static qint64 incCount = 0;
151         static PerformanceTimer timerSinceInc;
152         GetSystemTimeAsFileTime(&ft);
153         qint64 now = ((qint64)ft.dwHighDateTime << 32 | ft.dwLowDateTime) / 10;
154         if (now == oldNow) {
155             // timer was not incremented since last call (< 15 ms)
156             // Add time since last function call after last increment
157             // This reduces the jitter < one call cycle which is sufficient
158             now += timerSinceInc.elapsed().toIntegerMicros();
159         } else {
160             // timer was incremented
161             timerSinceInc.start();
162             oldNow = now;
163         }
164         return now;
165     }
166 #elif defined(__APPLE__)
167     // clock_gettime is not implemented on OSX
168     // gettimeofday can go backward due to NTP adjusting
169     // this will work here, because we take the stream start time for reference
170     struct timeval mtv;
171     gettimeofday(&mtv, NULL);
172     return (qint64)(mtv.tv_sec) * 1000000 + mtv.tv_usec;
173 #else
174     // CLOCK_MONOTONIC is NTP adjusted
175     struct timespec ts;
176     clock_gettime(CLOCK_MONOTONIC, &ts);
177     return ts.tv_sec * 1000000LL + ts.tv_nsec / 1000;
178 #endif
179 }
180 
addOutputWorker(NetworkOutputStreamWorkerPtr pWorker)181 void EngineNetworkStream::addOutputWorker(NetworkOutputStreamWorkerPtr pWorker) {
182     if (nextOutputSlotAvailable() < 0) {
183         kLogger.warning() << "addWorker: can't add worker:"
184                           << "no free slot left in internal list";
185         return;
186     }
187 
188     if (pWorker && m_numOutputChannels) {
189         int nextNullItem = nextOutputSlotAvailable();
190         if(nextNullItem > -1) {
191             QSharedPointer<FIFO<CSAMPLE>> workerFifo(
192                     new FIFO<CSAMPLE>(m_numOutputChannels * kBufferFrames));
193             pWorker->setOutputFifo(workerFifo);
194             pWorker->startStream(m_sampleRate, m_numOutputChannels);
195             m_outputWorkers[nextNullItem] = pWorker;
196 
197             kLogger.debug() << "addWorker: worker added";
198             debugOutputSlots();
199         }
200     }
201 }
202 
removeOutputWorker(NetworkOutputStreamWorkerPtr pWorker)203 void EngineNetworkStream::removeOutputWorker(NetworkOutputStreamWorkerPtr pWorker) {
204     int index = m_outputWorkers.indexOf(pWorker);
205     if(index > -1) {
206         m_outputWorkers[index].clear();
207         kLogger.debug() << "removeWorker: worker removed";
208     } else {
209         kLogger.warning() << "removeWorker: ERROR: worker not found";
210     }
211     debugOutputSlots();
212 }
213 
setInputWorker(NetworkInputStreamWorker * pInputWorker)214 void EngineNetworkStream::setInputWorker(NetworkInputStreamWorker* pInputWorker) {
215     if (pInputWorker) {
216         pInputWorker->setSourceFifo(m_pInputFifo);
217     }
218 }
219 
nextOutputSlotAvailable()220 int EngineNetworkStream::nextOutputSlotAvailable() {
221     return m_outputWorkers.indexOf(NetworkOutputStreamWorkerPtr(nullptr));
222 }
223 
debugOutputSlots()224 void EngineNetworkStream::debugOutputSlots() {
225     int available = m_outputWorkers.count(NetworkOutputStreamWorkerPtr(nullptr));
226     int total = m_outputWorkers.size();
227     kLogger.debug() << "worker slots used:"
228                     << QString("%1 out of %2").arg(total - available).arg(total);
229 }
230