1 // Copyright (c) Charles J. Cliffe
2 // SPDX-License-Identifier: GPL-2.0+
3 
4 #include <vector>
5 
6 #ifdef __APPLE__
7 #include <pthread.h>
8 #endif
9 
10 #include "DemodulatorPreThread.h"
11 #include "CubicSDR.h"
12 #include "DemodulatorInstance.h"
13 
14 //50 ms
15 #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
16 
DemodulatorPreThread(DemodulatorInstance * parent)17 DemodulatorPreThread::DemodulatorPreThread(DemodulatorInstance* parent) : IOThread(), iqResampler(nullptr), iqResampleRatio(1), cModem(nullptr), cModemKit(nullptr)
18  {
19 	initialized.store(false);
20     this->parent = parent;
21 
22     freqShifter = nco_crcf_create(LIQUID_VCO);
23     shiftFrequency = 0;
24 
25     workerQueue = std::make_shared<DemodulatorThreadWorkerCommandQueue>();
26     workerQueue->set_max_num_items(2);
27 
28     workerResults = std::make_shared<DemodulatorThreadWorkerResultQueue>();
29     workerResults->set_max_num_items(100);
30 
31     workerThread = new DemodulatorWorkerThread();
32     workerThread->setInputQueue("WorkerCommandQueue",workerQueue);
33     workerThread->setOutputQueue("WorkerResultQueue",workerResults);
34 
35     newSampleRate = currentSampleRate = 0;
36     newBandwidth = currentBandwidth = 0;
37     newAudioSampleRate = currentAudioSampleRate = 0;
38     newFrequency = currentFrequency = 0;
39 
40     sampleRateChanged.store(false);
41     frequencyChanged.store(false);
42     bandwidthChanged.store(false);
43     audioSampleRateChanged.store(false);
44     modemSettingsChanged.store(false);
45     demodTypeChanged.store(false);
46 }
47 
isInitialized()48 bool DemodulatorPreThread::isInitialized() {
49     return initialized.load();
50 }
51 
52 DemodulatorPreThread::~DemodulatorPreThread() = default;
53 
run()54 void DemodulatorPreThread::run() {
55 #ifdef __APPLE__
56     pthread_t tID = pthread_self();  // ID of this thread
57     int priority = sched_get_priority_max( SCHED_FIFO) - 1;
58     sched_param prio = {priority}; // scheduling priority of thread
59     pthread_setschedparam(tID, SCHED_FIFO, &prio);
60 #endif
61 
62 //    std::cout << "Demodulator preprocessor thread started.." << std::endl;
63 
64     ReBuffer<DemodulatorThreadPostIQData> buffers("DemodulatorPreThreadBuffers");
65 
66     iqInputQueue = std::static_pointer_cast<DemodulatorThreadInputQueue>(getInputQueue("IQDataInput"));
67     iqOutputQueue = std::static_pointer_cast<DemodulatorThreadPostInputQueue>(getOutputQueue("IQDataOutput"));
68 
69     std::vector<liquid_float_complex> in_buf_data;
70     std::vector<liquid_float_complex> out_buf_data;
71 
72     t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain, workerThread);
73 
74     while (!stopping) {
75         DemodulatorThreadIQDataPtr inp;
76 
77         if (!iqInputQueue->pop(inp, HEARTBEAT_CHECK_PERIOD_MICROS)) {
78             continue;
79         }
80 
81         if (frequencyChanged.load()) {
82             currentFrequency.store(newFrequency);
83             frequencyChanged.store(false);
84         }
85 
86         if (inp->sampleRate != currentSampleRate) {
87             newSampleRate = inp->sampleRate;
88             if (newSampleRate) {
89                 sampleRateChanged.store(true);
90             }
91         }
92 
93         if (!newAudioSampleRate) {
94             newAudioSampleRate = parent->getAudioSampleRate();
95             if (newAudioSampleRate) {
96                 audioSampleRateChanged.store(true);
97             }
98         } else if (parent->getAudioSampleRate() != newAudioSampleRate) {
99             if (parent->getAudioSampleRate()) {
100                 newAudioSampleRate = parent->getAudioSampleRate();
101                 audioSampleRateChanged.store(true);
102             }
103         }
104 
105         if (demodTypeChanged.load() && (newSampleRate && newAudioSampleRate && newBandwidth)) {
106             DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::Type::DEMOD_WORKER_THREAD_CMD_MAKE_DEMOD);
107             command.frequency = newFrequency;
108             command.sampleRate = newSampleRate;
109             command.demodType = newDemodType;
110             command.bandwidth = newBandwidth;
111             command.audioSampleRate = newAudioSampleRate;
112             demodType = newDemodType;
113             sampleRateChanged.store(false);
114             audioSampleRateChanged.store(false);
115             ModemSettings lastSettings = parent->getLastModemSettings(newDemodType);
116             if (!lastSettings.empty()) {
117                 command.settings = lastSettings;
118                 if (!modemSettingsBuffered.empty()) {
119                     for (ModemSettings::const_iterator msi = modemSettingsBuffered.begin(); msi != modemSettingsBuffered.end(); msi++) {
120                         command.settings[msi->first] = msi->second;
121                     }
122                 }
123             } else {
124                 command.settings = modemSettingsBuffered;
125             }
126             modemSettingsBuffered.clear();
127             modemSettingsChanged.store(false);
128             //VSO: blocking push
129             workerQueue->push(command);
130             cModem = nullptr;
131             cModemKit = nullptr;
132             demodTypeChanged.store(false);
133             initialized.store(false);
134         }
135         else if (
136             cModemKit && cModem &&
137             (bandwidthChanged.load() || sampleRateChanged.load() || audioSampleRateChanged.load() || cModem->shouldRebuildKit()) &&
138             (newSampleRate && newAudioSampleRate && newBandwidth)
139         ) {
140             DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::Type::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS);
141             command.frequency = newFrequency;
142             command.sampleRate = newSampleRate;
143             command.bandwidth = newBandwidth;
144             command.audioSampleRate = newAudioSampleRate;
145             bandwidthChanged.store(false);
146             sampleRateChanged.store(false);
147             audioSampleRateChanged.store(false);
148             modemSettingsBuffered.clear();
149             //VSO: blocking
150             workerQueue->push(command);
151         }
152 
153         // Requested frequency is not center, shift it into the center!
154         if ((currentFrequency - inp->frequency) != shiftFrequency) {
155             shiftFrequency = currentFrequency - inp->frequency;
156             if (abs(shiftFrequency) <= (int) ((double) (inp->sampleRate / 2) * 1.5)) {
157                 nco_crcf_set_frequency(freqShifter, (2.0 * M_PI) * (((double) abs(shiftFrequency)) / ((double) inp->sampleRate)));
158             }
159         }
160 
161         if (cModem && cModemKit && abs(shiftFrequency) > (int) ((double) (inp->sampleRate / 2) * 1.5)) {
162 
163             continue;
164         }
165 
166 //        std::lock_guard < std::mutex > lock(inp->m_mutex);
167         std::vector<liquid_float_complex> *data = &inp->data;
168         if (!data->empty() && (inp->sampleRate == currentSampleRate) && cModem && cModemKit) {
169             size_t bufSize = data->size();
170 
171             if (in_buf_data.size() != bufSize) {
172                 if (in_buf_data.capacity() < bufSize) {
173                     in_buf_data.reserve(bufSize);
174                     out_buf_data.reserve(bufSize);
175                 }
176                 in_buf_data.resize(bufSize);
177                 out_buf_data.resize(bufSize);
178             }
179 
180             in_buf_data.assign(inp->data.begin(), inp->data.end());
181 
182             liquid_float_complex *in_buf = &in_buf_data[0];
183             liquid_float_complex *out_buf = &out_buf_data[0];
184             liquid_float_complex *temp_buf;
185 
186             if (shiftFrequency != 0) {
187                 if (shiftFrequency < 0) {
188                     nco_crcf_mix_block_up(freqShifter, in_buf, out_buf, bufSize);
189                 } else {
190                     nco_crcf_mix_block_down(freqShifter, in_buf, out_buf, bufSize);
191                 }
192                 temp_buf = in_buf;
193                 in_buf = out_buf;
194                 out_buf = temp_buf;
195             }
196 
197             DemodulatorThreadPostIQDataPtr resamp = buffers.getBuffer();
198 
199             size_t out_size = ceil((double) (bufSize) * iqResampleRatio) + 512;
200 
201             if (resampledData.size() != out_size) {
202                 if (resampledData.capacity() < out_size) {
203                     resampledData.reserve(out_size);
204                 }
205                 resampledData.resize(out_size);
206             }
207 
208             unsigned int numWritten;
209             msresamp_crcf_execute(iqResampler, in_buf, bufSize, &resampledData[0], &numWritten);
210 
211             resamp->data.assign(resampledData.begin(), resampledData.begin() + numWritten);
212 
213             resamp->modemType = cModem->getType();
214             resamp->modemName = cModem->getName();
215             resamp->modem = cModem;
216             resamp->modemKit = cModemKit;
217             resamp->sampleRate = currentBandwidth;
218 
219             //VSO: blocking push
220             iqOutputQueue->push(resamp);
221         }
222 
223         DemodulatorWorkerThreadResult result;
224         //process all worker results until
225         while (!stopping && workerResults->try_pop(result)) {
226 
227             switch (result.cmd) {
228                 case DemodulatorWorkerThreadResult::Type::DEMOD_WORKER_THREAD_RESULT_FILTERS:
229                     if (result.iqResampler) {
230                         if (iqResampler) {
231                             msresamp_crcf_destroy(iqResampler);
232                         }
233                         iqResampler = result.iqResampler;
234                         iqResampleRatio = result.iqResampleRatio;
235                     }
236 
237                     if (result.modem != nullptr) {
238                         cModem = result.modem;
239 #if ENABLE_DIGITAL_LAB
240                         if (cModem->getType() == "digital") {
241                             auto *mDigi = (ModemDigital *)cModem;
242                             mDigi->setOutput(parent->getOutput());
243                         }
244 #endif
245                     }
246 
247                     if (result.modemKit != nullptr) {
248                         cModemKit = result.modemKit;
249                         currentAudioSampleRate = cModemKit->audioSampleRate;
250                     }
251 
252                     if (result.bandwidth) {
253                         currentBandwidth = result.bandwidth;
254                     }
255 
256                     if (result.sampleRate) {
257                         currentSampleRate = result.sampleRate;
258                     }
259 
260                     if (!result.modemName.empty()) {
261                         demodType = result.modemName;
262                         demodTypeChanged.store(false);
263                     }
264 
265                     shiftFrequency = inp->frequency-1;
266                     initialized.store(cModem != nullptr);
267                     break;
268                 default:
269                     break;
270             }
271         } //end while
272 
273         if ((cModem != nullptr) && modemSettingsChanged.load()) {
274             cModem->writeSettings(modemSettingsBuffered);
275             modemSettingsBuffered.clear();
276             modemSettingsChanged.store(false);
277         }
278     } //end while stopping
279 
280 
281     iqOutputQueue->flush();
282     iqInputQueue->flush();
283 }
284 
setDemodType(std::string demodType_in)285 void DemodulatorPreThread::setDemodType(std::string demodType_in) {
286     newDemodType = demodType_in;
287     demodTypeChanged.store(true);
288 }
289 
getDemodType()290 std::string DemodulatorPreThread::getDemodType() {
291     if (demodTypeChanged.load()) {
292         return newDemodType;
293     }
294     return demodType;
295 }
296 
setFrequency(long long freq)297 void DemodulatorPreThread::setFrequency(long long freq) {
298     frequencyChanged.store(true);
299     newFrequency = freq;
300 }
301 
getFrequency()302 long long DemodulatorPreThread::getFrequency() {
303     if (frequencyChanged.load()) {
304         return newFrequency;
305     }
306     return currentFrequency;
307 }
308 
setSampleRate(long long sampleRate)309 void DemodulatorPreThread::setSampleRate(long long sampleRate) {
310     sampleRateChanged.store(true);
311     newSampleRate = sampleRate;
312 }
313 
getSampleRate()314 long long DemodulatorPreThread::getSampleRate() {
315     if (sampleRateChanged.load()) {
316         return newSampleRate;
317     }
318     return currentSampleRate;
319 }
320 
setBandwidth(int bandwidth)321 void DemodulatorPreThread::setBandwidth(int bandwidth) {
322     bandwidthChanged.store(true);
323     newBandwidth = bandwidth;
324 }
325 
getBandwidth()326 int DemodulatorPreThread::getBandwidth() {
327 //    if (bandwidthChanged.load()) {
328 //        return newBandwidth;
329 //    }
330 
331     return currentBandwidth;
332 }
333 
setAudioSampleRate(int rate)334 void DemodulatorPreThread::setAudioSampleRate(int rate) {
335     audioSampleRateChanged.store(true);
336     newAudioSampleRate = rate;
337 }
338 
getAudioSampleRate()339 int DemodulatorPreThread::getAudioSampleRate() {
340     if (audioSampleRateChanged.load()) {
341         return newAudioSampleRate;
342     }
343     return currentAudioSampleRate;
344 }
345 
terminate()346 void DemodulatorPreThread::terminate() {
347 
348     //make non-blocking calls to be sure threads are flagged for termination.
349     IOThread::terminate();
350     workerThread->terminate();
351 
352     //unblock the push()
353     iqOutputQueue->flush();
354     iqInputQueue->flush();
355 
356     //wait blocking for termination here, it could be long with lots of modems and we MUST terminate properly,
357     //else better kill the whole application...
358     workerThread->isTerminated(5000);
359 
360     t_Worker->join();
361     delete t_Worker;
362     t_Worker = nullptr;
363 
364     delete workerThread;
365     workerThread = nullptr;
366 }
367 
getModem()368 Modem *DemodulatorPreThread::getModem() {
369     return cModem;
370 }
371 
getModemKit()372 ModemKit *DemodulatorPreThread::getModemKit() {
373     return cModemKit;
374 }
375 
376 
readModemSetting(const std::string & setting)377 std::string DemodulatorPreThread::readModemSetting(const std::string& setting) {
378     if (cModem) {
379         return cModem->readSetting(setting);
380     } else if (modemSettingsBuffered.find(setting) != modemSettingsBuffered.end()) {
381         return modemSettingsBuffered[setting];
382     }
383     return "";
384 }
385 
writeModemSetting(const std::string & setting,std::string value)386 void DemodulatorPreThread::writeModemSetting(const std::string& setting, std::string value) {
387     modemSettingsBuffered[setting] = value;
388     modemSettingsChanged.store(true);
389 }
390 
readModemSettings()391 ModemSettings DemodulatorPreThread::readModemSettings() {
392     if (cModem) {
393         return cModem->readSettings();
394     } else {
395         return modemSettingsBuffered;
396     }
397 }
398 
writeModemSettings(ModemSettings settings)399 void DemodulatorPreThread::writeModemSettings(ModemSettings settings) {
400     modemSettingsBuffered = settings;
401     modemSettingsChanged.store(true);
402 }
403