1 #include "engine/sidechain/enginenetworkstream.h"
2
3 #ifdef __WINDOWS__
4 #include <windows.h>
5 #include "util/performancetimer.h"
6 #else
7 #include <sys/time.h>
8 #include <unistd.h>
9 #endif
10
11 #ifdef __WINDOWS__
12 // For GetSystemTimeAsFileTime and GetSystemTimePreciseAsFileTime
13 typedef VOID (WINAPI *PgGetSystemTimeFn)(LPFILETIME);
14 static PgGetSystemTimeFn s_pfpgGetSystemTimeFn = NULL;
15 #endif
16
17 #include "broadcast/defs_broadcast.h"
18 #include "util/logger.h"
19 #include "util/sample.h"
20
21 namespace {
22 const int kNetworkLatencyFrames = 8192; // 185 ms @ 44100 Hz
23 // Related chunk sizes:
24 // Mp3 frames = 1152 samples
25 // Ogg frames = 64 to 8192 samples.
26 // In Mixxx 1.11 we transmit every decoder-frames at once,
27 // Which results in case of ogg in a dynamic latency from 0.14 ms to to 185 ms
28 // Now we have switched to a fixed latency of 8192 frames (stereo samples) =
29 // which is 185 @ 44100 ms and twice the maximum of the max mixxx audio buffer
30 const int kBufferFrames = kNetworkLatencyFrames * 4; // 743 ms @ 44100 Hz
31 // normally * 2 is sufficient.
32 // We allow to buffer two extra chunks for a CPU overload case, when
33 // the broadcast thread is not scheduled in time.
34
35 const mixxx::Logger kLogger("EngineNetworkStream");
36 } // namespace
37
EngineNetworkStream(int numOutputChannels,int numInputChannels)38 EngineNetworkStream::EngineNetworkStream(int numOutputChannels,
39 int numInputChannels)
40 : m_pInputFifo(nullptr),
41 m_numOutputChannels(numOutputChannels),
42 m_numInputChannels(numInputChannels),
43 m_sampleRate(0),
44 m_inputStreamStartTimeUs(-1),
45 m_inputStreamFramesWritten(0),
46 m_inputStreamFramesRead(0),
47 m_outputWorkers(BROADCAST_MAX_CONNECTIONS) {
48 if (numInputChannels) {
49 m_pInputFifo = new FIFO<CSAMPLE>(numInputChannels * kBufferFrames);
50 }
51
52 #ifdef __WINDOWS__
53 // Resolution:
54 // 15 ms for GetSystemTimeAsFileTime
55 // 0.4 ms for GetSystemTimePreciseAsFileTime
56 // Performance:
57 // 9 cycles for GetSystemTimeAsFileTime
58 // 2761 cycles for GetSystemTimePreciseAsFileTime
59 HMODULE kernel32_dll = LoadLibraryW(L"kernel32.dll");
60 if (kernel32_dll) {
61 // for a 0.0004 ms Resolution on Win8
62 s_pfpgGetSystemTimeFn = (PgGetSystemTimeFn)GetProcAddress(
63 kernel32_dll, "GetSystemTimePreciseAsFileTime");
64 }
65 #endif
66 }
67
~EngineNetworkStream()68 EngineNetworkStream::~EngineNetworkStream() {
69 if (m_inputStreamStartTimeUs >= 0) {
70 stopStream();
71 }
72
73 delete m_pInputFifo;
74 }
75
startStream(double sampleRate)76 void EngineNetworkStream::startStream(double sampleRate) {
77 m_sampleRate = sampleRate;
78 m_inputStreamStartTimeUs = getNetworkTimeUs();
79 m_inputStreamFramesWritten = 0;
80
81 for (NetworkOutputStreamWorkerPtr worker : qAsConst(m_outputWorkers)) {
82 if (worker.isNull()) {
83 continue;
84 }
85
86 worker->startStream(m_sampleRate, m_numOutputChannels);
87 }
88 }
89
stopStream()90 void EngineNetworkStream::stopStream() {
91 m_inputStreamStartTimeUs = -1;
92
93 for (NetworkOutputStreamWorkerPtr worker : qAsConst(m_outputWorkers)) {
94 if (worker.isNull()) {
95 continue;
96 }
97
98 worker->stopStream();
99 }
100 }
101
getReadExpected()102 int EngineNetworkStream::getReadExpected() {
103 return static_cast<int>(getInputStreamTimeFrames() - m_inputStreamFramesRead);
104 }
105
read(CSAMPLE * buffer,int frames)106 void EngineNetworkStream::read(CSAMPLE* buffer, int frames) {
107 int readAvailable = m_pInputFifo->readAvailable();
108 int readRequired = frames * m_numInputChannels;
109 int copyCount = math_min(readAvailable, readRequired);
110 if (copyCount > 0) {
111 (void)m_pInputFifo->read(buffer, copyCount);
112 buffer += copyCount;
113 }
114 if (readAvailable < readRequired) {
115 // Fill missing Samples with silence
116 int silenceCount = readRequired - readAvailable;
117 kLogger.debug() << "write: flushed"
118 << readRequired << "samples";
119 SampleUtil::clear(buffer, silenceCount);
120 }
121 }
122
getInputStreamTimeFrames()123 qint64 EngineNetworkStream::getInputStreamTimeFrames() {
124 return static_cast<qint64>(static_cast<double>(getInputStreamTimeUs()) *
125 m_sampleRate / 1000000.0);
126 }
127
getInputStreamTimeUs()128 qint64 EngineNetworkStream::getInputStreamTimeUs() {
129 return getNetworkTimeUs() - m_inputStreamStartTimeUs;
130 }
131
132 // static
getNetworkTimeUs()133 qint64 EngineNetworkStream::getNetworkTimeUs() {
134 // This matches the GPL2 implementation found in
135 // https://github.com/codders/libshout/blob/a17fb84671d3732317b0353d7281cc47e2df6cf6/src/timing/timing.c
136 // Instead of ms resolution we use a us resolution to allow low latency settings
137 // will overflow > 200,000 years
138 #ifdef __WINDOWS__
139 FILETIME ft;
140 // no GetSystemTimePreciseAsFileTime available, fall
141 // back to GetSystemTimeAsFileTime. This happens before
142 // Windows 8 and Windows Server 2012
143 // GetSystemTime?AsFileTime is NTP adjusted
144 // QueryPerformanceCounter depends on the CPU crystal
145 if(s_pfpgGetSystemTimeFn) {
146 s_pfpgGetSystemTimeFn(&ft);
147 return ((qint64)ft.dwHighDateTime << 32 | ft.dwLowDateTime) / 10;
148 } else {
149 static qint64 oldNow = 0;
150 static qint64 incCount = 0;
151 static PerformanceTimer timerSinceInc;
152 GetSystemTimeAsFileTime(&ft);
153 qint64 now = ((qint64)ft.dwHighDateTime << 32 | ft.dwLowDateTime) / 10;
154 if (now == oldNow) {
155 // timer was not incremented since last call (< 15 ms)
156 // Add time since last function call after last increment
157 // This reduces the jitter < one call cycle which is sufficient
158 now += timerSinceInc.elapsed().toIntegerMicros();
159 } else {
160 // timer was incremented
161 timerSinceInc.start();
162 oldNow = now;
163 }
164 return now;
165 }
166 #elif defined(__APPLE__)
167 // clock_gettime is not implemented on OSX
168 // gettimeofday can go backward due to NTP adjusting
169 // this will work here, because we take the stream start time for reference
170 struct timeval mtv;
171 gettimeofday(&mtv, NULL);
172 return (qint64)(mtv.tv_sec) * 1000000 + mtv.tv_usec;
173 #else
174 // CLOCK_MONOTONIC is NTP adjusted
175 struct timespec ts;
176 clock_gettime(CLOCK_MONOTONIC, &ts);
177 return ts.tv_sec * 1000000LL + ts.tv_nsec / 1000;
178 #endif
179 }
180
addOutputWorker(NetworkOutputStreamWorkerPtr pWorker)181 void EngineNetworkStream::addOutputWorker(NetworkOutputStreamWorkerPtr pWorker) {
182 if (nextOutputSlotAvailable() < 0) {
183 kLogger.warning() << "addWorker: can't add worker:"
184 << "no free slot left in internal list";
185 return;
186 }
187
188 if (pWorker && m_numOutputChannels) {
189 int nextNullItem = nextOutputSlotAvailable();
190 if(nextNullItem > -1) {
191 QSharedPointer<FIFO<CSAMPLE>> workerFifo(
192 new FIFO<CSAMPLE>(m_numOutputChannels * kBufferFrames));
193 pWorker->setOutputFifo(workerFifo);
194 pWorker->startStream(m_sampleRate, m_numOutputChannels);
195 m_outputWorkers[nextNullItem] = pWorker;
196
197 kLogger.debug() << "addWorker: worker added";
198 debugOutputSlots();
199 }
200 }
201 }
202
removeOutputWorker(NetworkOutputStreamWorkerPtr pWorker)203 void EngineNetworkStream::removeOutputWorker(NetworkOutputStreamWorkerPtr pWorker) {
204 int index = m_outputWorkers.indexOf(pWorker);
205 if(index > -1) {
206 m_outputWorkers[index].clear();
207 kLogger.debug() << "removeWorker: worker removed";
208 } else {
209 kLogger.warning() << "removeWorker: ERROR: worker not found";
210 }
211 debugOutputSlots();
212 }
213
setInputWorker(NetworkInputStreamWorker * pInputWorker)214 void EngineNetworkStream::setInputWorker(NetworkInputStreamWorker* pInputWorker) {
215 if (pInputWorker) {
216 pInputWorker->setSourceFifo(m_pInputFifo);
217 }
218 }
219
nextOutputSlotAvailable()220 int EngineNetworkStream::nextOutputSlotAvailable() {
221 return m_outputWorkers.indexOf(NetworkOutputStreamWorkerPtr(nullptr));
222 }
223
debugOutputSlots()224 void EngineNetworkStream::debugOutputSlots() {
225 int available = m_outputWorkers.count(NetworkOutputStreamWorkerPtr(nullptr));
226 int total = m_outputWorkers.size();
227 kLogger.debug() << "worker slots used:"
228 << QString("%1 out of %2").arg(total - available).arg(total);
229 }
230