1 // Copyright (c) Charles J. Cliffe
2 // SPDX-License-Identifier: GPL-2.0+
3
4 #include "AudioThread.h"
5 #include <vector>
6 #include <algorithm>
7 #include "CubicSDR.h"
8 #include "DemodulatorInstance.h"
9 #include <mutex>
10
11 //50 ms
12 #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
13
14 std::map<int, AudioThread* > AudioThread::deviceController;
15
16 std::map<int, int> AudioThread::deviceSampleRate;
17
18 std::recursive_mutex AudioThread::m_device_mutex;
19
AudioThread()20 AudioThread::AudioThread() : IOThread(), nBufferFrames(1024), sampleRate(0), controllerThread(nullptr) {
21
22 audioQueuePtr = 0;
23 underflowCount = 0;
24 active.store(false);
25 outputDevice.store(-1);
26 gain = 1.0;
27 }
28
~AudioThread()29 AudioThread::~AudioThread() {
30
31 if (controllerThread != nullptr) {
32
33 //
34 //NOT PROTECTED by m_mutex on purpose, to prevent deadlocks with controllerThread
35 // it doesn't matter, it is only called when all "normal" audio threads are detached from the controller.
36 //
37
38 terminate();
39 controllerThread->join();
40 delete controllerThread;
41 controllerThread = nullptr;
42 }
43 }
44
getMutex()45 std::recursive_mutex & AudioThread::getMutex()
46 {
47 return m_mutex;
48 }
49
attachControllerThread(std::thread * controllerThread_in)50 void AudioThread::attachControllerThread(std::thread* controllerThread_in) {
51
52 controllerThread = controllerThread_in;
53 }
54
bindThread(AudioThread * other)55 void AudioThread::bindThread(AudioThread *other) {
56
57 std::lock_guard<std::recursive_mutex> lock(m_mutex);
58
59 if (std::find(boundThreads.begin(), boundThreads.end(), other) == boundThreads.end()) {
60 boundThreads.push_back(other);
61 }
62 }
63
removeThread(AudioThread * other)64 void AudioThread::removeThread(AudioThread *other) {
65
66 std::lock_guard<std::recursive_mutex> lock(m_mutex);
67
68 auto i = std::find(boundThreads.begin(), boundThreads.end(), other);
69
70 if (i != boundThreads.end()) {
71 boundThreads.erase(i);
72 }
73 }
74
deviceCleanup()75 void AudioThread::deviceCleanup() {
76 //
77 //NOT PROTECTED by m_device_mutex on purpose, to prevent deadlocks with i->second->controllerThread
78 // it doesn't matter, it is only called when all "normal" audio threads are detached from the controller.
79 //
80 for (auto & i : deviceController) {
81
82 delete i.second;
83 }
84
85 deviceController.clear();
86 }
87
audioCallback(void * outputBuffer,void *,unsigned int nBufferFrames,double,RtAudioStreamStatus status,void * userData)88 static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned int nBufferFrames, double /* streamTime */, RtAudioStreamStatus status,
89 void *userData) {
90
91 float *out = (float*)outputBuffer;
92
93 //Zero output buffer in all cases: this allow to mute audio if no AudioThread data is
94 //actually active.
95 ::memset(out, 0, nBufferFrames * 2 * sizeof(float));
96
97 //src in the controller thread:
98 auto *src = (AudioThread *)userData;
99
100 //by construction, src is a controller thread, from deviceController:
101 std::lock_guard<std::recursive_mutex> lock(src->getMutex());
102
103 if (src->isTerminated()) {
104 return 1;
105 }
106
107 if (status) {
108 std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl << std::flush;
109 }
110
111 double peak = 0.0;
112
113 //Process the bound threads audio:
114 for (size_t j = 0; j < src->boundThreads.size(); j++) {
115
116 AudioThread *srcmix = src->boundThreads[j];
117
118 //lock every single boundThread srcmix in succession the time we process
119 //its audio samples.
120 std::lock_guard<std::recursive_mutex> lock(srcmix->getMutex());
121
122 if (srcmix->isTerminated() || !srcmix->inputQueue || srcmix->inputQueue->empty() || !srcmix->isActive()) {
123 continue;
124 }
125
126 if (!srcmix->currentInput) {
127 srcmix->audioQueuePtr = 0;
128
129 if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
130 continue;
131 }
132
133 continue;
134 }
135
136 if (srcmix->currentInput->sampleRate != src->getSampleRate()) {
137
138 while (srcmix->inputQueue->try_pop(srcmix->currentInput)) {
139
140 if (srcmix->currentInput) {
141 if (srcmix->currentInput->sampleRate == src->getSampleRate()) {
142 break;
143 }
144
145 }
146 srcmix->currentInput = nullptr;
147 } //end while
148
149 srcmix->audioQueuePtr = 0;
150
151 if (!srcmix->currentInput) {
152 continue;
153 }
154 }
155
156
157 if (srcmix->currentInput->channels == 0 || srcmix->currentInput->data.empty()) {
158 if (!srcmix->inputQueue->empty()) {
159 srcmix->audioQueuePtr = 0;
160 if (srcmix->currentInput) {
161
162 srcmix->currentInput = nullptr;
163 }
164
165 if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
166 continue;
167 }
168 }
169 continue;
170 }
171
172 double mixPeak = srcmix->currentInput->peak * srcmix->gain;
173
174 if (srcmix->currentInput->channels == 1) {
175
176 for (unsigned int i = 0; i < nBufferFrames; i++) {
177
178 if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) {
179 srcmix->audioQueuePtr = 0;
180 if (srcmix->currentInput) {
181
182 srcmix->currentInput = nullptr;
183 }
184
185 if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
186 break;
187 }
188
189
190 double srcPeak = srcmix->currentInput->peak * srcmix->gain;
191 if (mixPeak < srcPeak) {
192 mixPeak = srcPeak;
193 }
194 }
195 if (srcmix->currentInput && !srcmix->currentInput->data.empty()) {
196 float v = srcmix->currentInput->data[srcmix->audioQueuePtr] * srcmix->gain;
197 out[i * 2] += v;
198 out[i * 2 + 1] += v;
199 }
200 srcmix->audioQueuePtr++;
201 }
202 }
203 else {
204 for (unsigned int i = 0, iMax = srcmix->currentInput->channels * nBufferFrames; i < iMax; i++) {
205
206 if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) {
207 srcmix->audioQueuePtr = 0;
208 if (srcmix->currentInput) {
209
210 srcmix->currentInput = nullptr;
211 }
212
213 if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
214 break;
215 }
216
217 double srcPeak = srcmix->currentInput->peak * srcmix->gain;
218 if (mixPeak < srcPeak) {
219 mixPeak = srcPeak;
220 }
221 }
222 if (srcmix->currentInput && !srcmix->currentInput->data.empty()) {
223
224 out[i] = out[i] + srcmix->currentInput->data[srcmix->audioQueuePtr] * srcmix->gain;
225 }
226 srcmix->audioQueuePtr++;
227 }
228 }
229
230 peak += mixPeak;
231 }
232
233 //normalize volume
234 if (peak > 1.0) {
235 float invPeak = (float)(1.0 / peak);
236
237 for (unsigned int i = 0; i < nBufferFrames * 2; i++) {
238 out[i] *= invPeak;
239 }
240 }
241
242 return 0;
243 }
244
enumerateDevices(std::vector<RtAudio::DeviceInfo> & devs)245 void AudioThread::enumerateDevices(std::vector<RtAudio::DeviceInfo> &devs) {
246 RtAudio endac;
247
248 unsigned int numDevices = endac.getDeviceCount();
249
250 for (unsigned int i = 0; i < numDevices; i++) {
251 RtAudio::DeviceInfo info = endac.getDeviceInfo(i);
252
253 devs.push_back(info);
254
255 std::cout << std::endl;
256
257 std::cout << "Audio Device #" << i << " " << info.name << std::endl;
258 std::cout << "\tDefault Output? " << (info.isDefaultOutput ? "Yes" : "No") << std::endl;
259 std::cout << "\tDefault Input? " << (info.isDefaultInput ? "Yes" : "No") << std::endl;
260 std::cout << "\tInput channels: " << info.inputChannels << std::endl;
261 std::cout << "\tOutput channels: " << info.outputChannels << std::endl;
262 std::cout << "\tDuplex channels: " << info.duplexChannels << std::endl;
263
264 std::cout << "\t" << "Native formats:" << std::endl;
265 RtAudioFormat nFormats = info.nativeFormats;
266 if (nFormats & RTAUDIO_SINT8) {
267 std::cout << "\t\t8-bit signed integer." << std::endl;
268 }
269 if (nFormats & RTAUDIO_SINT16) {
270 std::cout << "\t\t16-bit signed integer." << std::endl;
271 }
272 if (nFormats & RTAUDIO_SINT24) {
273 std::cout << "\t\t24-bit signed integer." << std::endl;
274 }
275 if (nFormats & RTAUDIO_SINT32) {
276 std::cout << "\t\t32-bit signed integer." << std::endl;
277 }
278 if (nFormats & RTAUDIO_FLOAT32) {
279 std::cout << "\t\t32-bit float normalized between plus/minus 1.0." << std::endl;
280 }
281 if (nFormats & RTAUDIO_FLOAT64) {
282 std::cout << "\t\t64-bit float normalized between plus/minus 1.0." << std::endl;
283 }
284
285 std::vector<unsigned int>::iterator srate;
286
287 std::cout << "\t" << "Supported sample rates:" << std::endl;
288
289 for (srate = info.sampleRates.begin(); srate != info.sampleRates.end(); srate++) {
290 std::cout << "\t\t" << (*srate) << "hz" << std::endl;
291 }
292
293 std::cout << std::endl;
294 }
295 }
296
setDeviceSampleRate(int deviceId,int sampleRate)297 void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) {
298
299 AudioThread* matchingControllerThread = nullptr;
300
301 //scope lock here to minimize the common unique static lock contention
302 {
303 std::lock_guard<std::recursive_mutex> lock(m_device_mutex);
304
305 if (deviceController.find(deviceId) != deviceController.end()) {
306
307 matchingControllerThread = deviceController[deviceId];
308 }
309 }
310
311 //out-of-lock test
312 if (matchingControllerThread != nullptr) {
313
314 AudioThreadCommand refreshDevice;
315 refreshDevice.cmdType = AudioThreadCommand::Type::AUDIO_THREAD_CMD_SET_SAMPLE_RATE;
316 refreshDevice.int_value = sampleRate;
317 //VSO : blocking push !
318 matchingControllerThread->getCommandQueue()->push(refreshDevice);
319 }
320 }
321
setSampleRate(int sampleRate_in)322 void AudioThread::setSampleRate(int sampleRate_in) {
323
324 bool thisIsAController = false;
325
326 //scope lock here to minimize the common unique static lock contention
327 {
328 std::lock_guard<std::recursive_mutex> lock(m_device_mutex);
329
330 if (deviceController[outputDevice.load()] == this) {
331 thisIsAController = true;
332 deviceSampleRate[outputDevice.load()] = sampleRate_in;
333 }
334 }
335
336 std::lock_guard<std::recursive_mutex> lock(m_mutex);
337
338 if (thisIsAController) {
339
340 dac.stopStream();
341 dac.closeStream();
342
343 //Set bounded sample rate:
344 for (auto srcmix : boundThreads) {
345 srcmix->setSampleRate(sampleRate_in);
346 }
347
348 //make a local copy, snapshot of the list of demodulators
349 std::vector<DemodulatorInstancePtr> demodulators = wxGetApp().getDemodMgr().getDemodulators();
350
351 for (const auto& demod : demodulators) {
352 if (demod->getOutputDevice() == outputDevice.load()) {
353 demod->setAudioSampleRate(sampleRate_in);
354 }
355 }
356
357 dac.openStream(¶meters, nullptr, RTAUDIO_FLOAT32, sampleRate_in, &nBufferFrames, &audioCallback, (void *)this, &opts);
358 dac.startStream();
359 }
360
361 sampleRate = sampleRate_in;
362 }
363
getSampleRate()364 int AudioThread::getSampleRate() {
365 std::lock_guard<std::recursive_mutex> lock(m_mutex);
366
367 return sampleRate;
368 }
369
setupDevice(int deviceId)370 void AudioThread::setupDevice(int deviceId) {
371
372 //global lock to setup the device...
373 std::lock_guard<std::recursive_mutex> lock(m_device_mutex);
374
375 parameters.deviceId = deviceId;
376 parameters.nChannels = 2;
377 parameters.firstChannel = 0;
378
379 opts.streamName = "CubicSDR Audio Output";
380
381 try {
382 if (deviceController.find(outputDevice.load()) != deviceController.end()) {
383 //'this' is not the controller, so remove it from the bounded list:
384 //beware, we must take the controller mutex, because the audio callback may use the list of bounded
385 //threads at that moment:
386 std::lock_guard<std::recursive_mutex> lock(deviceController[outputDevice.load()]->getMutex());
387
388 deviceController[outputDevice.load()]->removeThread(this);
389 }
390 #ifndef _MSC_VER
391 opts.priority = sched_get_priority_max(SCHED_FIFO);
392 #endif
393 // opts.flags = RTAUDIO_MINIMIZE_LATENCY;
394 opts.flags = RTAUDIO_SCHEDULE_REALTIME;
395
396 if (deviceSampleRate.find(parameters.deviceId) != deviceSampleRate.end()) {
397 sampleRate = deviceSampleRate[parameters.deviceId];
398 }
399 else {
400 std::cout << "Error, device sample rate wasn't initialized?" << std::endl;
401 return;
402 // sampleRate = AudioThread::getDefaultAudioSampleRate();
403 // deviceSampleRate[parameters.deviceId] = sampleRate;
404 }
405
406 //Create a new controller:
407 if (deviceController.find(parameters.deviceId) == deviceController.end()) {
408
409 //Create a new controller thread for parameters.deviceId:
410 auto* newController = new AudioThread();
411
412 newController->setInitOutputDevice(parameters.deviceId, sampleRate);
413 newController->bindThread(this);
414 newController->attachControllerThread(new std::thread(&AudioThread::threadMain, newController));
415
416 deviceController[parameters.deviceId] = newController;
417 }
418 else if (deviceController[parameters.deviceId] == this) {
419
420 //Attach callback
421 dac.openStream(¶meters, nullptr, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *)this, &opts);
422 dac.startStream();
423 }
424 else {
425 //we are a bound thread, add ourselves to the controller deviceController[parameters.deviceId].
426 //beware, we must take the controller mutex, because the audio callback may use the list of bounded
427 //threads at that moment:
428 std::lock_guard<std::recursive_mutex> lock(deviceController[parameters.deviceId]->getMutex());
429
430 deviceController[parameters.deviceId]->bindThread(this);
431 }
432 active = true;
433
434 }
435 catch (RtAudioError& e) {
436 e.printMessage();
437 return;
438 }
439 if (deviceId != -1) {
440 outputDevice = deviceId;
441 }
442 }
443
getOutputDevice()444 int AudioThread::getOutputDevice() {
445
446 std::lock_guard<std::recursive_mutex> lock(m_mutex);
447
448 if (outputDevice == -1) {
449 return dac.getDefaultOutputDevice();
450 }
451 return outputDevice;
452 }
453
setInitOutputDevice(int deviceId,int sampleRate_in)454 void AudioThread::setInitOutputDevice(int deviceId, int sampleRate_in) {
455
456 //global lock
457 std::lock_guard<std::recursive_mutex> lock(m_device_mutex);
458
459 outputDevice = deviceId;
460 if (sampleRate_in == -1) {
461 if (deviceSampleRate.find(parameters.deviceId) != deviceSampleRate.end()) {
462 sampleRate_in = deviceSampleRate[deviceId];
463 }
464 }
465 else {
466 deviceSampleRate[deviceId] = sampleRate_in;
467 }
468 sampleRate = sampleRate_in;
469 }
470
run()471 void AudioThread::run() {
472 #ifdef __APPLE__
473 pthread_t tID = pthread_self(); // ID of this thread
474 int priority = sched_get_priority_max(SCHED_RR) - 1;
475 sched_param prio = { priority }; // scheduling priority of thread
476 pthread_setschedparam(tID, SCHED_RR, &prio);
477 #endif
478
479 // std::cout << "Audio thread initializing.." << std::endl;
480
481 if (dac.getDeviceCount() < 1) {
482 std::cout << "No audio devices found!" << std::endl;
483 return;
484 }
485
486 setupDevice((outputDevice.load() == -1) ? (dac.getDefaultOutputDevice()) : outputDevice.load());
487
488 // std::cout << "Audio thread started." << std::endl;
489
490 inputQueue = std::static_pointer_cast<AudioThreadInputQueue>(getInputQueue("AudioDataInput"));
491
492 //Infinite loop, witing for commands or for termination
493 while (!stopping) {
494 AudioThreadCommand command;
495
496 if (!cmdQueue.pop(command, HEARTBEAT_CHECK_PERIOD_MICROS)) {
497 continue;
498 }
499
500 if (command.cmdType == AudioThreadCommand::Type::AUDIO_THREAD_CMD_SET_DEVICE) {
501 setupDevice(command.int_value);
502 }
503 if (command.cmdType == AudioThreadCommand::Type::AUDIO_THREAD_CMD_SET_SAMPLE_RATE) {
504 setSampleRate(command.int_value);
505 }
506 } //end while
507
508 // Drain any remaining inputs, with a non-blocking pop
509 if (inputQueue != nullptr) {
510 inputQueue->flush();
511 }
512
513 //Nullify currentInput...
514 currentInput = nullptr;
515
516 //Stop : Retrieve the matching controlling thread in a scope lock:
517 AudioThread* controllerMatchingThread;
518 {
519 std::lock_guard<std::recursive_mutex> global_lock(m_device_mutex);
520 controllerMatchingThread = deviceController[parameters.deviceId];
521 }
522
523 if (controllerMatchingThread != this) {
524 //'this' is not the controller, so remove it from the bounded list:
525 //beware, we must take the controller mutex, because the audio callback may use the list of bounded
526 //threads at that moment:
527 std::lock_guard<std::recursive_mutex> lock(controllerMatchingThread->getMutex());
528
529 controllerMatchingThread->removeThread(this);
530 }
531 else {
532 // 'this' is a controller thread:
533 try {
534 if (dac.isStreamOpen()) {
535 dac.stopStream();
536 }
537 dac.closeStream();
538 }
539 catch (RtAudioError& e) {
540 e.printMessage();
541 }
542 }
543
544 // std::cout << "Audio thread done." << std::endl;
545 }
546
terminate()547 void AudioThread::terminate() {
548 IOThread::terminate();
549 }
550
isActive()551 bool AudioThread::isActive() {
552 std::lock_guard<std::recursive_mutex> lock(m_mutex);
553
554 return active;
555 }
556
setActive(bool state)557 void AudioThread::setActive(bool state) {
558
559 AudioThread* matchingControllerThread = nullptr;
560
561 std::lock_guard<std::recursive_mutex> lock(m_mutex);
562
563 //scope lock here to minimize the common unique static lock contention
564 {
565 std::lock_guard<std::recursive_mutex> lock(m_device_mutex);
566
567 if (deviceController.find(parameters.deviceId) != deviceController.end()) {
568
569 matchingControllerThread = deviceController[parameters.deviceId];
570 }
571 }
572
573 if (matchingControllerThread == nullptr) {
574 return;
575 }
576
577 if (state && !active && inputQueue) {
578 matchingControllerThread->bindThread(this);
579 }
580 else if (!state && active) {
581 matchingControllerThread->removeThread(this);
582 }
583
584 // Activity state changing, clear any inputs
585 if (inputQueue) {
586 inputQueue->flush();
587 }
588 active = state;
589 }
590
getCommandQueue()591 AudioThreadCommandQueue *AudioThread::getCommandQueue() {
592 return &cmdQueue;
593 }
594
setGain(float gain_in)595 void AudioThread::setGain(float gain_in) {
596
597 if (gain_in < 0.0) {
598 gain_in = 0.0;
599 }
600 if (gain_in > 2.0) {
601 gain_in = 2.0;
602 }
603 gain = gain_in;
604 }
605