1 // Copyright (c) Charles J. Cliffe
2 // SPDX-License-Identifier: GPL-2.0+
3 
4 #include "AudioThread.h"
5 #include <vector>
6 #include <algorithm>
7 #include "CubicSDR.h"
8 #include "DemodulatorInstance.h"
9 #include <mutex>
10 
11 //50 ms
12 #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
13 
14 std::map<int, AudioThread* >  AudioThread::deviceController;
15 
16 std::map<int, int> AudioThread::deviceSampleRate;
17 
18 std::recursive_mutex AudioThread::m_device_mutex;
19 
AudioThread()20 AudioThread::AudioThread() : IOThread(), nBufferFrames(1024), sampleRate(0), controllerThread(nullptr) {
21 
22     audioQueuePtr = 0;
23     underflowCount = 0;
24     active.store(false);
25     outputDevice.store(-1);
26     gain = 1.0;
27 }
28 
~AudioThread()29 AudioThread::~AudioThread() {
30 
31     if (controllerThread != nullptr) {
32 
33         //
34         //NOT PROTECTED by m_mutex on purpose, to prevent deadlocks with controllerThread
35         // it doesn't matter, it is only called when all "normal" audio threads are detached from the controller.
36         //
37 
38         terminate();
39         controllerThread->join();
40         delete controllerThread;
41         controllerThread = nullptr;
42     }
43 }
44 
getMutex()45 std::recursive_mutex & AudioThread::getMutex()
46 {
47     return m_mutex;
48 }
49 
attachControllerThread(std::thread * controllerThread_in)50 void AudioThread::attachControllerThread(std::thread* controllerThread_in) {
51 
52     controllerThread = controllerThread_in;
53 }
54 
bindThread(AudioThread * other)55 void AudioThread::bindThread(AudioThread *other) {
56 
57     std::lock_guard<std::recursive_mutex> lock(m_mutex);
58 
59     if (std::find(boundThreads.begin(), boundThreads.end(), other) == boundThreads.end()) {
60         boundThreads.push_back(other);
61     }
62 }
63 
removeThread(AudioThread * other)64 void AudioThread::removeThread(AudioThread *other) {
65 
66     std::lock_guard<std::recursive_mutex> lock(m_mutex);
67 
68     auto i = std::find(boundThreads.begin(), boundThreads.end(), other);
69 
70     if (i != boundThreads.end()) {
71         boundThreads.erase(i);
72     }
73 }
74 
deviceCleanup()75 void AudioThread::deviceCleanup() {
76     //
77     //NOT PROTECTED by m_device_mutex on purpose, to prevent deadlocks with i->second->controllerThread
78     // it doesn't matter, it is only called when all "normal" audio threads are detached from the controller.
79     //
80     for (auto & i : deviceController) {
81 
82         delete i.second;
83     }
84 
85     deviceController.clear();
86 }
87 
audioCallback(void * outputBuffer,void *,unsigned int nBufferFrames,double,RtAudioStreamStatus status,void * userData)88 static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned int nBufferFrames, double /* streamTime */, RtAudioStreamStatus status,
89     void *userData) {
90 
91     float *out = (float*)outputBuffer;
92 
93     //Zero output buffer in all cases: this allow to mute audio if no AudioThread data is
94     //actually active.
95     ::memset(out, 0, nBufferFrames * 2 * sizeof(float));
96 
97     //src in the controller thread:
98     auto *src = (AudioThread *)userData;
99 
100     //by construction, src is a controller thread, from deviceController:
101     std::lock_guard<std::recursive_mutex> lock(src->getMutex());
102 
103     if (src->isTerminated()) {
104         return 1;
105     }
106 
107     if (status) {
108         std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl << std::flush;
109     }
110 
111     double peak = 0.0;
112 
113     //Process the bound threads audio:
114     for (size_t j = 0; j < src->boundThreads.size(); j++) {
115 
116         AudioThread *srcmix = src->boundThreads[j];
117 
118         //lock every single boundThread srcmix in succession the time we process
119         //its audio samples.
120         std::lock_guard<std::recursive_mutex> lock(srcmix->getMutex());
121 
122         if (srcmix->isTerminated() || !srcmix->inputQueue || srcmix->inputQueue->empty() || !srcmix->isActive()) {
123             continue;
124         }
125 
126         if (!srcmix->currentInput) {
127             srcmix->audioQueuePtr = 0;
128 
129             if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
130                 continue;
131             }
132 
133             continue;
134         }
135 
136         if (srcmix->currentInput->sampleRate != src->getSampleRate()) {
137 
138             while (srcmix->inputQueue->try_pop(srcmix->currentInput)) {
139 
140                 if (srcmix->currentInput) {
141                     if (srcmix->currentInput->sampleRate == src->getSampleRate()) {
142                         break;
143                     }
144 
145                 }
146                 srcmix->currentInput = nullptr;
147             } //end while
148 
149             srcmix->audioQueuePtr = 0;
150 
151             if (!srcmix->currentInput) {
152                 continue;
153             }
154         }
155 
156 
157         if (srcmix->currentInput->channels == 0 || srcmix->currentInput->data.empty()) {
158             if (!srcmix->inputQueue->empty()) {
159                 srcmix->audioQueuePtr = 0;
160                 if (srcmix->currentInput) {
161 
162                     srcmix->currentInput = nullptr;
163                 }
164 
165                 if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
166                     continue;
167                 }
168             }
169             continue;
170         }
171 
172         double mixPeak = srcmix->currentInput->peak * srcmix->gain;
173 
174         if (srcmix->currentInput->channels == 1) {
175 
176             for (unsigned int i = 0; i < nBufferFrames; i++) {
177 
178                 if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) {
179                     srcmix->audioQueuePtr = 0;
180                     if (srcmix->currentInput) {
181 
182                         srcmix->currentInput = nullptr;
183                     }
184 
185                     if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
186                         break;
187                     }
188 
189 
190                     double srcPeak = srcmix->currentInput->peak * srcmix->gain;
191                     if (mixPeak < srcPeak) {
192                         mixPeak = srcPeak;
193                     }
194                 }
195                 if (srcmix->currentInput && !srcmix->currentInput->data.empty()) {
196                     float v = srcmix->currentInput->data[srcmix->audioQueuePtr] * srcmix->gain;
197                     out[i * 2] += v;
198                     out[i * 2 + 1] += v;
199                 }
200                 srcmix->audioQueuePtr++;
201             }
202         }
203         else {
204             for (unsigned int i = 0, iMax = srcmix->currentInput->channels * nBufferFrames; i < iMax; i++) {
205 
206                 if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) {
207                     srcmix->audioQueuePtr = 0;
208                     if (srcmix->currentInput) {
209 
210                         srcmix->currentInput = nullptr;
211                     }
212 
213                     if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
214                         break;
215                     }
216 
217                     double srcPeak = srcmix->currentInput->peak * srcmix->gain;
218                     if (mixPeak < srcPeak) {
219                         mixPeak = srcPeak;
220                     }
221                 }
222                 if (srcmix->currentInput && !srcmix->currentInput->data.empty()) {
223 
224                     out[i] = out[i] + srcmix->currentInput->data[srcmix->audioQueuePtr] * srcmix->gain;
225                 }
226                 srcmix->audioQueuePtr++;
227             }
228         }
229 
230         peak += mixPeak;
231     }
232 
233     //normalize volume
234     if (peak > 1.0) {
235         float invPeak = (float)(1.0 / peak);
236 
237         for (unsigned int i = 0; i < nBufferFrames * 2; i++) {
238             out[i] *= invPeak;
239         }
240     }
241 
242     return 0;
243 }
244 
enumerateDevices(std::vector<RtAudio::DeviceInfo> & devs)245 void AudioThread::enumerateDevices(std::vector<RtAudio::DeviceInfo> &devs) {
246     RtAudio endac;
247 
248     unsigned int numDevices = endac.getDeviceCount();
249 
250     for (unsigned int i = 0; i < numDevices; i++) {
251         RtAudio::DeviceInfo info = endac.getDeviceInfo(i);
252 
253         devs.push_back(info);
254 
255         std::cout << std::endl;
256 
257         std::cout << "Audio Device #" << i << " " << info.name << std::endl;
258         std::cout << "\tDefault Output? " << (info.isDefaultOutput ? "Yes" : "No") << std::endl;
259         std::cout << "\tDefault Input? " << (info.isDefaultInput ? "Yes" : "No") << std::endl;
260         std::cout << "\tInput channels: " << info.inputChannels << std::endl;
261         std::cout << "\tOutput channels: " << info.outputChannels << std::endl;
262         std::cout << "\tDuplex channels: " << info.duplexChannels << std::endl;
263 
264         std::cout << "\t" << "Native formats:" << std::endl;
265         RtAudioFormat nFormats = info.nativeFormats;
266         if (nFormats & RTAUDIO_SINT8) {
267             std::cout << "\t\t8-bit signed integer." << std::endl;
268         }
269         if (nFormats & RTAUDIO_SINT16) {
270             std::cout << "\t\t16-bit signed integer." << std::endl;
271         }
272         if (nFormats & RTAUDIO_SINT24) {
273             std::cout << "\t\t24-bit signed integer." << std::endl;
274         }
275         if (nFormats & RTAUDIO_SINT32) {
276             std::cout << "\t\t32-bit signed integer." << std::endl;
277         }
278         if (nFormats & RTAUDIO_FLOAT32) {
279             std::cout << "\t\t32-bit float normalized between plus/minus 1.0." << std::endl;
280         }
281         if (nFormats & RTAUDIO_FLOAT64) {
282             std::cout << "\t\t64-bit float normalized between plus/minus 1.0." << std::endl;
283         }
284 
285         std::vector<unsigned int>::iterator srate;
286 
287         std::cout << "\t" << "Supported sample rates:" << std::endl;
288 
289         for (srate = info.sampleRates.begin(); srate != info.sampleRates.end(); srate++) {
290             std::cout << "\t\t" << (*srate) << "hz" << std::endl;
291         }
292 
293         std::cout << std::endl;
294     }
295 }
296 
setDeviceSampleRate(int deviceId,int sampleRate)297 void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) {
298 
299     AudioThread* matchingControllerThread = nullptr;
300 
301     //scope lock here to minimize the common unique static lock contention
302     {
303         std::lock_guard<std::recursive_mutex> lock(m_device_mutex);
304 
305         if (deviceController.find(deviceId) != deviceController.end()) {
306 
307             matchingControllerThread = deviceController[deviceId];
308         }
309     }
310 
311     //out-of-lock test
312     if (matchingControllerThread != nullptr) {
313 
314         AudioThreadCommand refreshDevice;
315         refreshDevice.cmdType = AudioThreadCommand::Type::AUDIO_THREAD_CMD_SET_SAMPLE_RATE;
316         refreshDevice.int_value = sampleRate;
317         //VSO : blocking push !
318         matchingControllerThread->getCommandQueue()->push(refreshDevice);
319     }
320 }
321 
setSampleRate(int sampleRate_in)322 void AudioThread::setSampleRate(int sampleRate_in) {
323 
324     bool thisIsAController = false;
325 
326     //scope lock here to minimize the common unique static lock contention
327     {
328         std::lock_guard<std::recursive_mutex> lock(m_device_mutex);
329 
330         if (deviceController[outputDevice.load()] == this) {
331             thisIsAController = true;
332             deviceSampleRate[outputDevice.load()] = sampleRate_in;
333         }
334     }
335 
336     std::lock_guard<std::recursive_mutex> lock(m_mutex);
337 
338     if (thisIsAController) {
339 
340         dac.stopStream();
341         dac.closeStream();
342 
343         //Set bounded sample rate:
344         for (auto srcmix : boundThreads) {
345             srcmix->setSampleRate(sampleRate_in);
346         }
347 
348         //make a local copy, snapshot of the list of demodulators
349         std::vector<DemodulatorInstancePtr> demodulators = wxGetApp().getDemodMgr().getDemodulators();
350 
351         for (const auto& demod : demodulators) {
352             if (demod->getOutputDevice() == outputDevice.load()) {
353                 demod->setAudioSampleRate(sampleRate_in);
354             }
355         }
356 
357         dac.openStream(&parameters, nullptr, RTAUDIO_FLOAT32, sampleRate_in, &nBufferFrames, &audioCallback, (void *)this, &opts);
358         dac.startStream();
359     }
360 
361     sampleRate = sampleRate_in;
362 }
363 
getSampleRate()364 int AudioThread::getSampleRate() {
365     std::lock_guard<std::recursive_mutex> lock(m_mutex);
366 
367     return sampleRate;
368 }
369 
setupDevice(int deviceId)370 void AudioThread::setupDevice(int deviceId) {
371 
372     //global lock to setup the device...
373     std::lock_guard<std::recursive_mutex> lock(m_device_mutex);
374 
375     parameters.deviceId = deviceId;
376     parameters.nChannels = 2;
377     parameters.firstChannel = 0;
378 
379     opts.streamName = "CubicSDR Audio Output";
380 
381     try {
382         if (deviceController.find(outputDevice.load()) != deviceController.end()) {
383             //'this' is not the controller, so remove it from the bounded list:
384             //beware, we must take the controller mutex, because the audio callback may use the list of bounded
385             //threads at that moment:
386             std::lock_guard<std::recursive_mutex> lock(deviceController[outputDevice.load()]->getMutex());
387 
388             deviceController[outputDevice.load()]->removeThread(this);
389         }
390 #ifndef _MSC_VER
391         opts.priority = sched_get_priority_max(SCHED_FIFO);
392 #endif
393         //    opts.flags = RTAUDIO_MINIMIZE_LATENCY;
394         opts.flags = RTAUDIO_SCHEDULE_REALTIME;
395 
396         if (deviceSampleRate.find(parameters.deviceId) != deviceSampleRate.end()) {
397             sampleRate = deviceSampleRate[parameters.deviceId];
398         }
399         else {
400             std::cout << "Error, device sample rate wasn't initialized?" << std::endl;
401             return;
402             //            sampleRate = AudioThread::getDefaultAudioSampleRate();
403             //            deviceSampleRate[parameters.deviceId] = sampleRate;
404         }
405 
406         //Create a new controller:
407         if (deviceController.find(parameters.deviceId) == deviceController.end()) {
408 
409             //Create a new controller thread for parameters.deviceId:
410             auto* newController = new AudioThread();
411 
412             newController->setInitOutputDevice(parameters.deviceId, sampleRate);
413             newController->bindThread(this);
414             newController->attachControllerThread(new std::thread(&AudioThread::threadMain, newController));
415 
416             deviceController[parameters.deviceId] = newController;
417         }
418         else if (deviceController[parameters.deviceId] == this) {
419 
420             //Attach callback
421             dac.openStream(&parameters, nullptr, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *)this, &opts);
422             dac.startStream();
423         }
424         else {
425             //we are a bound thread, add ourselves to the controller deviceController[parameters.deviceId].
426             //beware, we must take the controller mutex, because the audio callback may use the list of bounded
427             //threads at that moment:
428             std::lock_guard<std::recursive_mutex> lock(deviceController[parameters.deviceId]->getMutex());
429 
430             deviceController[parameters.deviceId]->bindThread(this);
431         }
432         active = true;
433 
434     }
435     catch (RtAudioError& e) {
436         e.printMessage();
437         return;
438     }
439     if (deviceId != -1) {
440         outputDevice = deviceId;
441     }
442 }
443 
getOutputDevice()444 int AudioThread::getOutputDevice() {
445 
446     std::lock_guard<std::recursive_mutex> lock(m_mutex);
447 
448     if (outputDevice == -1) {
449         return dac.getDefaultOutputDevice();
450     }
451     return outputDevice;
452 }
453 
setInitOutputDevice(int deviceId,int sampleRate_in)454 void AudioThread::setInitOutputDevice(int deviceId, int sampleRate_in) {
455 
456     //global lock
457     std::lock_guard<std::recursive_mutex> lock(m_device_mutex);
458 
459     outputDevice = deviceId;
460     if (sampleRate_in == -1) {
461         if (deviceSampleRate.find(parameters.deviceId) != deviceSampleRate.end()) {
462             sampleRate_in = deviceSampleRate[deviceId];
463         }
464     }
465     else {
466         deviceSampleRate[deviceId] = sampleRate_in;
467     }
468     sampleRate = sampleRate_in;
469 }
470 
run()471 void AudioThread::run() {
472 #ifdef __APPLE__
473     pthread_t tID = pthread_self();	 // ID of this thread
474     int priority = sched_get_priority_max(SCHED_RR) - 1;
475     sched_param prio = { priority }; // scheduling priority of thread
476     pthread_setschedparam(tID, SCHED_RR, &prio);
477 #endif
478 
479     //    std::cout << "Audio thread initializing.." << std::endl;
480 
481     if (dac.getDeviceCount() < 1) {
482         std::cout << "No audio devices found!" << std::endl;
483         return;
484     }
485 
486     setupDevice((outputDevice.load() == -1) ? (dac.getDefaultOutputDevice()) : outputDevice.load());
487 
488     //    std::cout << "Audio thread started." << std::endl;
489 
490     inputQueue = std::static_pointer_cast<AudioThreadInputQueue>(getInputQueue("AudioDataInput"));
491 
492     //Infinite loop, witing for commands or for termination
493     while (!stopping) {
494         AudioThreadCommand command;
495 
496         if (!cmdQueue.pop(command, HEARTBEAT_CHECK_PERIOD_MICROS)) {
497             continue;
498         }
499 
500         if (command.cmdType == AudioThreadCommand::Type::AUDIO_THREAD_CMD_SET_DEVICE) {
501             setupDevice(command.int_value);
502         }
503         if (command.cmdType == AudioThreadCommand::Type::AUDIO_THREAD_CMD_SET_SAMPLE_RATE) {
504             setSampleRate(command.int_value);
505         }
506     } //end while
507 
508     // Drain any remaining inputs, with a non-blocking pop
509     if (inputQueue != nullptr) {
510         inputQueue->flush();
511     }
512 
513     //Nullify currentInput...
514     currentInput = nullptr;
515 
516     //Stop : Retrieve the matching controlling thread in a scope lock:
517     AudioThread* controllerMatchingThread;
518     {
519         std::lock_guard<std::recursive_mutex> global_lock(m_device_mutex);
520         controllerMatchingThread = deviceController[parameters.deviceId];
521     }
522 
523     if (controllerMatchingThread != this) {
524         //'this' is not the controller, so remove it from the bounded list:
525         //beware, we must take the controller mutex, because the audio callback may use the list of bounded
526         //threads at that moment:
527         std::lock_guard<std::recursive_mutex> lock(controllerMatchingThread->getMutex());
528 
529         controllerMatchingThread->removeThread(this);
530     }
531     else {
532         // 'this' is a controller thread:
533         try {
534             if (dac.isStreamOpen()) {
535                 dac.stopStream();
536             }
537             dac.closeStream();
538         }
539         catch (RtAudioError& e) {
540             e.printMessage();
541         }
542     }
543 
544     //    std::cout << "Audio thread done." << std::endl;
545 }
546 
terminate()547 void AudioThread::terminate() {
548     IOThread::terminate();
549 }
550 
isActive()551 bool AudioThread::isActive() {
552     std::lock_guard<std::recursive_mutex> lock(m_mutex);
553 
554     return active;
555 }
556 
setActive(bool state)557 void AudioThread::setActive(bool state) {
558 
559     AudioThread* matchingControllerThread = nullptr;
560 
561     std::lock_guard<std::recursive_mutex> lock(m_mutex);
562 
563     //scope lock here to minimize the common unique static lock contention
564     {
565         std::lock_guard<std::recursive_mutex> lock(m_device_mutex);
566 
567         if (deviceController.find(parameters.deviceId) != deviceController.end()) {
568 
569             matchingControllerThread = deviceController[parameters.deviceId];
570         }
571     }
572 
573     if (matchingControllerThread == nullptr) {
574         return;
575     }
576 
577     if (state && !active && inputQueue) {
578         matchingControllerThread->bindThread(this);
579     }
580     else if (!state && active) {
581         matchingControllerThread->removeThread(this);
582     }
583 
584     // Activity state changing, clear any inputs
585     if (inputQueue) {
586         inputQueue->flush();
587     }
588     active = state;
589 }
590 
getCommandQueue()591 AudioThreadCommandQueue *AudioThread::getCommandQueue() {
592     return &cmdQueue;
593 }
594 
setGain(float gain_in)595 void AudioThread::setGain(float gain_in) {
596 
597     if (gain_in < 0.0) {
598         gain_in = 0.0;
599     }
600     if (gain_in > 2.0) {
601         gain_in = 2.0;
602     }
603     gain = gain_in;
604 }
605