1 // Copyright (c) Charles J. Cliffe
2 // SPDX-License-Identifier: GPL-2.0+
3
4 #include <vector>
5
6 #ifdef __APPLE__
7 #include <pthread.h>
8 #endif
9
10 #include "DemodulatorPreThread.h"
11 #include "CubicSDR.h"
12 #include "DemodulatorInstance.h"
13
14 //50 ms
15 #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
16
DemodulatorPreThread(DemodulatorInstance * parent)17 DemodulatorPreThread::DemodulatorPreThread(DemodulatorInstance* parent) : IOThread(), iqResampler(nullptr), iqResampleRatio(1), cModem(nullptr), cModemKit(nullptr)
18 {
19 initialized.store(false);
20 this->parent = parent;
21
22 freqShifter = nco_crcf_create(LIQUID_VCO);
23 shiftFrequency = 0;
24
25 workerQueue = std::make_shared<DemodulatorThreadWorkerCommandQueue>();
26 workerQueue->set_max_num_items(2);
27
28 workerResults = std::make_shared<DemodulatorThreadWorkerResultQueue>();
29 workerResults->set_max_num_items(100);
30
31 workerThread = new DemodulatorWorkerThread();
32 workerThread->setInputQueue("WorkerCommandQueue",workerQueue);
33 workerThread->setOutputQueue("WorkerResultQueue",workerResults);
34
35 newSampleRate = currentSampleRate = 0;
36 newBandwidth = currentBandwidth = 0;
37 newAudioSampleRate = currentAudioSampleRate = 0;
38 newFrequency = currentFrequency = 0;
39
40 sampleRateChanged.store(false);
41 frequencyChanged.store(false);
42 bandwidthChanged.store(false);
43 audioSampleRateChanged.store(false);
44 modemSettingsChanged.store(false);
45 demodTypeChanged.store(false);
46 }
47
isInitialized()48 bool DemodulatorPreThread::isInitialized() {
49 return initialized.load();
50 }
51
52 DemodulatorPreThread::~DemodulatorPreThread() = default;
53
run()54 void DemodulatorPreThread::run() {
55 #ifdef __APPLE__
56 pthread_t tID = pthread_self(); // ID of this thread
57 int priority = sched_get_priority_max( SCHED_FIFO) - 1;
58 sched_param prio = {priority}; // scheduling priority of thread
59 pthread_setschedparam(tID, SCHED_FIFO, &prio);
60 #endif
61
62 // std::cout << "Demodulator preprocessor thread started.." << std::endl;
63
64 ReBuffer<DemodulatorThreadPostIQData> buffers("DemodulatorPreThreadBuffers");
65
66 iqInputQueue = std::static_pointer_cast<DemodulatorThreadInputQueue>(getInputQueue("IQDataInput"));
67 iqOutputQueue = std::static_pointer_cast<DemodulatorThreadPostInputQueue>(getOutputQueue("IQDataOutput"));
68
69 std::vector<liquid_float_complex> in_buf_data;
70 std::vector<liquid_float_complex> out_buf_data;
71
72 t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain, workerThread);
73
74 while (!stopping) {
75 DemodulatorThreadIQDataPtr inp;
76
77 if (!iqInputQueue->pop(inp, HEARTBEAT_CHECK_PERIOD_MICROS)) {
78 continue;
79 }
80
81 if (frequencyChanged.load()) {
82 currentFrequency.store(newFrequency);
83 frequencyChanged.store(false);
84 }
85
86 if (inp->sampleRate != currentSampleRate) {
87 newSampleRate = inp->sampleRate;
88 if (newSampleRate) {
89 sampleRateChanged.store(true);
90 }
91 }
92
93 if (!newAudioSampleRate) {
94 newAudioSampleRate = parent->getAudioSampleRate();
95 if (newAudioSampleRate) {
96 audioSampleRateChanged.store(true);
97 }
98 } else if (parent->getAudioSampleRate() != newAudioSampleRate) {
99 if (parent->getAudioSampleRate()) {
100 newAudioSampleRate = parent->getAudioSampleRate();
101 audioSampleRateChanged.store(true);
102 }
103 }
104
105 if (demodTypeChanged.load() && (newSampleRate && newAudioSampleRate && newBandwidth)) {
106 DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::Type::DEMOD_WORKER_THREAD_CMD_MAKE_DEMOD);
107 command.frequency = newFrequency;
108 command.sampleRate = newSampleRate;
109 command.demodType = newDemodType;
110 command.bandwidth = newBandwidth;
111 command.audioSampleRate = newAudioSampleRate;
112 demodType = newDemodType;
113 sampleRateChanged.store(false);
114 audioSampleRateChanged.store(false);
115 ModemSettings lastSettings = parent->getLastModemSettings(newDemodType);
116 if (!lastSettings.empty()) {
117 command.settings = lastSettings;
118 if (!modemSettingsBuffered.empty()) {
119 for (ModemSettings::const_iterator msi = modemSettingsBuffered.begin(); msi != modemSettingsBuffered.end(); msi++) {
120 command.settings[msi->first] = msi->second;
121 }
122 }
123 } else {
124 command.settings = modemSettingsBuffered;
125 }
126 modemSettingsBuffered.clear();
127 modemSettingsChanged.store(false);
128 //VSO: blocking push
129 workerQueue->push(command);
130 cModem = nullptr;
131 cModemKit = nullptr;
132 demodTypeChanged.store(false);
133 initialized.store(false);
134 }
135 else if (
136 cModemKit && cModem &&
137 (bandwidthChanged.load() || sampleRateChanged.load() || audioSampleRateChanged.load() || cModem->shouldRebuildKit()) &&
138 (newSampleRate && newAudioSampleRate && newBandwidth)
139 ) {
140 DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::Type::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS);
141 command.frequency = newFrequency;
142 command.sampleRate = newSampleRate;
143 command.bandwidth = newBandwidth;
144 command.audioSampleRate = newAudioSampleRate;
145 bandwidthChanged.store(false);
146 sampleRateChanged.store(false);
147 audioSampleRateChanged.store(false);
148 modemSettingsBuffered.clear();
149 //VSO: blocking
150 workerQueue->push(command);
151 }
152
153 // Requested frequency is not center, shift it into the center!
154 if ((currentFrequency - inp->frequency) != shiftFrequency) {
155 shiftFrequency = currentFrequency - inp->frequency;
156 if (abs(shiftFrequency) <= (int) ((double) (inp->sampleRate / 2) * 1.5)) {
157 nco_crcf_set_frequency(freqShifter, (2.0 * M_PI) * (((double) abs(shiftFrequency)) / ((double) inp->sampleRate)));
158 }
159 }
160
161 if (cModem && cModemKit && abs(shiftFrequency) > (int) ((double) (inp->sampleRate / 2) * 1.5)) {
162
163 continue;
164 }
165
166 // std::lock_guard < std::mutex > lock(inp->m_mutex);
167 std::vector<liquid_float_complex> *data = &inp->data;
168 if (!data->empty() && (inp->sampleRate == currentSampleRate) && cModem && cModemKit) {
169 size_t bufSize = data->size();
170
171 if (in_buf_data.size() != bufSize) {
172 if (in_buf_data.capacity() < bufSize) {
173 in_buf_data.reserve(bufSize);
174 out_buf_data.reserve(bufSize);
175 }
176 in_buf_data.resize(bufSize);
177 out_buf_data.resize(bufSize);
178 }
179
180 in_buf_data.assign(inp->data.begin(), inp->data.end());
181
182 liquid_float_complex *in_buf = &in_buf_data[0];
183 liquid_float_complex *out_buf = &out_buf_data[0];
184 liquid_float_complex *temp_buf;
185
186 if (shiftFrequency != 0) {
187 if (shiftFrequency < 0) {
188 nco_crcf_mix_block_up(freqShifter, in_buf, out_buf, bufSize);
189 } else {
190 nco_crcf_mix_block_down(freqShifter, in_buf, out_buf, bufSize);
191 }
192 temp_buf = in_buf;
193 in_buf = out_buf;
194 out_buf = temp_buf;
195 }
196
197 DemodulatorThreadPostIQDataPtr resamp = buffers.getBuffer();
198
199 size_t out_size = ceil((double) (bufSize) * iqResampleRatio) + 512;
200
201 if (resampledData.size() != out_size) {
202 if (resampledData.capacity() < out_size) {
203 resampledData.reserve(out_size);
204 }
205 resampledData.resize(out_size);
206 }
207
208 unsigned int numWritten;
209 msresamp_crcf_execute(iqResampler, in_buf, bufSize, &resampledData[0], &numWritten);
210
211 resamp->data.assign(resampledData.begin(), resampledData.begin() + numWritten);
212
213 resamp->modemType = cModem->getType();
214 resamp->modemName = cModem->getName();
215 resamp->modem = cModem;
216 resamp->modemKit = cModemKit;
217 resamp->sampleRate = currentBandwidth;
218
219 //VSO: blocking push
220 iqOutputQueue->push(resamp);
221 }
222
223 DemodulatorWorkerThreadResult result;
224 //process all worker results until
225 while (!stopping && workerResults->try_pop(result)) {
226
227 switch (result.cmd) {
228 case DemodulatorWorkerThreadResult::Type::DEMOD_WORKER_THREAD_RESULT_FILTERS:
229 if (result.iqResampler) {
230 if (iqResampler) {
231 msresamp_crcf_destroy(iqResampler);
232 }
233 iqResampler = result.iqResampler;
234 iqResampleRatio = result.iqResampleRatio;
235 }
236
237 if (result.modem != nullptr) {
238 cModem = result.modem;
239 #if ENABLE_DIGITAL_LAB
240 if (cModem->getType() == "digital") {
241 auto *mDigi = (ModemDigital *)cModem;
242 mDigi->setOutput(parent->getOutput());
243 }
244 #endif
245 }
246
247 if (result.modemKit != nullptr) {
248 cModemKit = result.modemKit;
249 currentAudioSampleRate = cModemKit->audioSampleRate;
250 }
251
252 if (result.bandwidth) {
253 currentBandwidth = result.bandwidth;
254 }
255
256 if (result.sampleRate) {
257 currentSampleRate = result.sampleRate;
258 }
259
260 if (!result.modemName.empty()) {
261 demodType = result.modemName;
262 demodTypeChanged.store(false);
263 }
264
265 shiftFrequency = inp->frequency-1;
266 initialized.store(cModem != nullptr);
267 break;
268 default:
269 break;
270 }
271 } //end while
272
273 if ((cModem != nullptr) && modemSettingsChanged.load()) {
274 cModem->writeSettings(modemSettingsBuffered);
275 modemSettingsBuffered.clear();
276 modemSettingsChanged.store(false);
277 }
278 } //end while stopping
279
280
281 iqOutputQueue->flush();
282 iqInputQueue->flush();
283 }
284
setDemodType(std::string demodType_in)285 void DemodulatorPreThread::setDemodType(std::string demodType_in) {
286 newDemodType = demodType_in;
287 demodTypeChanged.store(true);
288 }
289
getDemodType()290 std::string DemodulatorPreThread::getDemodType() {
291 if (demodTypeChanged.load()) {
292 return newDemodType;
293 }
294 return demodType;
295 }
296
setFrequency(long long freq)297 void DemodulatorPreThread::setFrequency(long long freq) {
298 frequencyChanged.store(true);
299 newFrequency = freq;
300 }
301
getFrequency()302 long long DemodulatorPreThread::getFrequency() {
303 if (frequencyChanged.load()) {
304 return newFrequency;
305 }
306 return currentFrequency;
307 }
308
setSampleRate(long long sampleRate)309 void DemodulatorPreThread::setSampleRate(long long sampleRate) {
310 sampleRateChanged.store(true);
311 newSampleRate = sampleRate;
312 }
313
getSampleRate()314 long long DemodulatorPreThread::getSampleRate() {
315 if (sampleRateChanged.load()) {
316 return newSampleRate;
317 }
318 return currentSampleRate;
319 }
320
setBandwidth(int bandwidth)321 void DemodulatorPreThread::setBandwidth(int bandwidth) {
322 bandwidthChanged.store(true);
323 newBandwidth = bandwidth;
324 }
325
getBandwidth()326 int DemodulatorPreThread::getBandwidth() {
327 // if (bandwidthChanged.load()) {
328 // return newBandwidth;
329 // }
330
331 return currentBandwidth;
332 }
333
setAudioSampleRate(int rate)334 void DemodulatorPreThread::setAudioSampleRate(int rate) {
335 audioSampleRateChanged.store(true);
336 newAudioSampleRate = rate;
337 }
338
getAudioSampleRate()339 int DemodulatorPreThread::getAudioSampleRate() {
340 if (audioSampleRateChanged.load()) {
341 return newAudioSampleRate;
342 }
343 return currentAudioSampleRate;
344 }
345
terminate()346 void DemodulatorPreThread::terminate() {
347
348 //make non-blocking calls to be sure threads are flagged for termination.
349 IOThread::terminate();
350 workerThread->terminate();
351
352 //unblock the push()
353 iqOutputQueue->flush();
354 iqInputQueue->flush();
355
356 //wait blocking for termination here, it could be long with lots of modems and we MUST terminate properly,
357 //else better kill the whole application...
358 workerThread->isTerminated(5000);
359
360 t_Worker->join();
361 delete t_Worker;
362 t_Worker = nullptr;
363
364 delete workerThread;
365 workerThread = nullptr;
366 }
367
getModem()368 Modem *DemodulatorPreThread::getModem() {
369 return cModem;
370 }
371
getModemKit()372 ModemKit *DemodulatorPreThread::getModemKit() {
373 return cModemKit;
374 }
375
376
readModemSetting(const std::string & setting)377 std::string DemodulatorPreThread::readModemSetting(const std::string& setting) {
378 if (cModem) {
379 return cModem->readSetting(setting);
380 } else if (modemSettingsBuffered.find(setting) != modemSettingsBuffered.end()) {
381 return modemSettingsBuffered[setting];
382 }
383 return "";
384 }
385
writeModemSetting(const std::string & setting,std::string value)386 void DemodulatorPreThread::writeModemSetting(const std::string& setting, std::string value) {
387 modemSettingsBuffered[setting] = value;
388 modemSettingsChanged.store(true);
389 }
390
readModemSettings()391 ModemSettings DemodulatorPreThread::readModemSettings() {
392 if (cModem) {
393 return cModem->readSettings();
394 } else {
395 return modemSettingsBuffered;
396 }
397 }
398
writeModemSettings(ModemSettings settings)399 void DemodulatorPreThread::writeModemSettings(ModemSettings settings) {
400 modemSettingsBuffered = settings;
401 modemSettingsChanged.store(true);
402 }
403