1 /*
2  *    Copyright (C) 2020
3  *    Matthias P. Braendli (matthias.braendli@mpb.li)
4  *
5  *    This file is part of the welle.io.
6  *    Many of the ideas as implemented in welle.io are derived from
7  *    other work, made available through the GNU general Public License.
8  *    All copyrights of the original authors are recognized.
9  *
10  *    welle.io is free software; you can redistribute it and/or modify
11  *    it under the terms of the GNU General Public License as published by
12  *    the Free Software Foundation; either version 2 of the License, or
13  *    (at your option) any later version.
14  *
15  *    welle.io is distributed in the hope that it will be useful,
16  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *    GNU General Public License for more details.
19  *
20  *    You should have received a copy of the GNU General Public License
21  *    along with welle.io; if not, write to the Free Software
22  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  *
24  */
25 
26 #include "welle-cli/webradiointerface.h"
27 #include <algorithm>
28 #include <cmath>
29 #include <complex>
30 #include <cstdio>
31 #include <cstdlib>
32 #include <cstring>
33 #include <ctime>
34 #include <errno.h>
35 #include <future>
36 #include <iomanip>
37 #include <iostream>
38 #include <regex>
39 #include <signal.h>
40 #include <stdexcept>
41 
42 #if defined(_WIN32)
43  #include <winsock2.h>
44  #include <ws2tcpip.h>
45  #include <windows.h>
46 #else
47  #include <sys/socket.h>
48 #endif
49 
50 #include <utility>
51 #include "Socket.h"
52 #include "channels.h"
53 #include "ofdm-decoder.h"
54 #include "radio-receiver.h"
55 #include "virtual_input.h"
56 #include "welle-cli/jsonconvert.h"
57 #include "welle-cli/webprogrammehandler.h"
58 
59 #ifdef __unix__
60 # include <unistd.h>
61 # if _POSIX_VERSION >= 200809L
62 #  define HAVE_SIGACTION 1
63 #  include <signal.h>
64 # else
65 #  define HAVE_SIGACTION 0
66 # endif
67 #endif
68 
69 #ifndef MSG_NOSIGNAL
70 #define MSG_NOSIGNAL 0
71 #endif
72 
73 #ifdef GITDESCRIBE
74 #define VERSION GITDESCRIBE
75 #else
76 #define VERSION "unknown"
77 #endif
78 
79 #define ASSERT_RX if (not rx) throw logic_error("rx does not exist")
80 
81 constexpr size_t MAX_PENDING_MESSAGES = 512;
82 
83 using namespace std;
84 
85 static const char* http_ok = "HTTP/1.0 200 OK\r\n";
86 static const char* http_400 = "HTTP/1.0 400 Bad Request\r\n";
87 static const char* http_404 = "HTTP/1.0 404 Not Found\r\n";
88 static const char* http_405 = "HTTP/1.0 405 Method Not Allowed\r\n";
89 static const char* http_500 = "HTTP/1.0 500 Internal Server Error\r\n";
90 static const char* http_503 = "HTTP/1.0 503 Service Unavailable\r\n";
91 static const char* http_contenttype_mp3 = "Content-Type: audio/mpeg\r\n";
92 static const char* http_contenttype_text = "Content-Type: text/plain\r\n";
93 static const char* http_contenttype_data =
94         "Content-Type: application/octet-stream\r\n";
95 
96 static const char* http_contenttype_json =
97         "Content-Type: application/json; charset=utf-8\r\n";
98 
99 static const char* http_contenttype_js =
100         "Content-Type: text/javascript; charset=utf-8\r\n";
101 
102 static const char* http_contenttype_html =
103         "Content-Type: text/html; charset=utf-8\r\n";
104 
105 static const char* http_nocache = "Cache-Control: no-cache\r\n";
106 
to_hex(uint32_t value,int width)107 static string to_hex(uint32_t value, int width)
108 {
109     std::stringstream sidstream;
110     sidstream << "0x" <<
111         std::setfill('0') << std::setw(width) <<
112         std::hex << value;
113     return sidstream.str();
114 }
115 
send_http_response(Socket & s,const string & statuscode,const string & data,const string & content_type=http_contenttype_text)116 static bool send_http_response(Socket& s, const string& statuscode,
117         const string& data, const string& content_type = http_contenttype_text) {
118     string headers = statuscode;
119     headers += content_type;
120     headers += http_nocache;
121     headers += "\r\n";
122     headers += data;
123     ssize_t ret = s.send(headers.data(), headers.size(), MSG_NOSIGNAL);
124     if (ret == -1) {
125         cerr << "Failed to send response " << statuscode << " " << data << endl;
126     }
127     return ret != -1;
128 }
129 
WebRadioInterface(CVirtualInput & in,int port,DecodeSettings ds,RadioReceiverOptions rro)130 WebRadioInterface::WebRadioInterface(CVirtualInput& in,
131         int port,
132         DecodeSettings ds,
133         RadioReceiverOptions rro) :
134     dabparams(1),
135     input(in),
136     spectrum_fft_handler(dabparams.T_u),
137     rro(rro),
138     decode_settings(ds)
139 {
140     {
141         // Ensure that rx always exists when rx_mut is free!
142         lock_guard<mutex> lock(rx_mut);
143 
144         bool success = serverSocket.bind(port);
145         if (success) {
146             success = serverSocket.listen();
147         }
148 
149         if (success) {
150             rx = make_unique<RadioReceiver>(*this, in, rro);
151         }
152 
153         if (not rx) {
154             throw runtime_error("Could not initialise WebRadioInterface");
155         }
156 
157         time_rx_created = chrono::system_clock::now();
158         rx->restart(false);
159     }
160 
161     programme_handler_thread = thread(&WebRadioInterface::handle_phs, this);
162 }
163 
~WebRadioInterface()164 WebRadioInterface::~WebRadioInterface()
165 {
166     running = false;
167     if (programme_handler_thread.joinable()) {
168         programme_handler_thread.join();
169     }
170 
171     {
172         lock_guard<mutex> lock(rx_mut);
173         rx.reset();
174     }
175 }
176 
177 class TuneFailed {};
178 
check_decoders_required()179 void WebRadioInterface::check_decoders_required()
180 {
181     lock_guard<mutex> lock(rx_mut);
182     ASSERT_RX;
183 
184     try {
185         for (auto& s : rx->getServiceList()) {
186             const auto sid = s.serviceId;
187 
188             try {
189                 const bool is_active = std::find_if(
190                         carousel_services_active.cbegin(),
191                         carousel_services_active.cend(),
192                         [&](const ActiveCarouselService& acs) {
193                             return acs.sid == sid;
194                         }) != carousel_services_active.cend();
195 
196                 const bool require =
197                     rx->serviceHasAudioComponent(s) and
198                     (decode_settings.strategy == DecodeStrategy::All or
199                      phs.at(sid).needsToBeDecoded() or
200                      is_active);
201                 const bool is_decoded = programmes_being_decoded[sid];
202 
203                 if (require and not is_decoded) {
204                     bool success = rx->addServiceToDecode(phs.at(sid), "", s);
205 
206                     if (success) {
207                         programmes_being_decoded[sid] = success;
208                     }
209                     else {
210                         throw TuneFailed();
211                     }
212                 }
213                 else if (is_decoded and not require) {
214                     bool success = rx->removeServiceToDecode(s);
215 
216                     if (success) {
217                         programmes_being_decoded[sid] = false;
218                     }
219                     else {
220                         cerr << "Stop playing 0x" << to_hex(s.serviceId, 4) <<
221                             " failed" << endl;
222                         throw TuneFailed();
223                     }
224                 }
225             }
226             catch (const out_of_range&) {
227                 cerr << "Cannot tune to 0x" << to_hex(s.serviceId, 4) <<
228                     " because no handler exists!" << endl;
229             }
230         }
231     }
232     catch (const TuneFailed&) {
233         rx->restart_decoder();
234         phs.clear();
235         programmes_being_decoded.clear();
236         carousel_services_available.clear();
237         carousel_services_active.clear();
238     }
239     phs_changed.notify_all();
240 }
241 
retune(const std::string & channel)242 void WebRadioInterface::retune(const std::string& channel)
243 {
244     // Ensure two closely occurring retune() calls don't get stuck
245     unique_lock<mutex> retune_lock(retune_mut);
246 
247     auto freq = channels.getFrequency(channel);
248     if (freq == 0) {
249         cerr << "RETUNE Invalid channel: " << channel << endl;
250         return;
251     }
252 
253     cerr << "RETUNE: Retune to " << freq << endl;
254 
255     running = false;
256     if (programme_handler_thread.joinable()) {
257         programme_handler_thread.join();
258     }
259 
260     cerr << "RETUNE Take ownership of RX" << endl;
261     {
262         unique_lock<mutex> lock(rx_mut);
263         // Even though it would be ok for rx to be inexistent here,
264         // we check to uncover errors.
265         ASSERT_RX;
266 
267         cerr << "RETUNE Destroy RX" << endl;
268         rx.reset();
269 
270         {
271             lock_guard<mutex> data_lock(data_mut);
272             last_dateTime = {};
273             last_snr = 0;
274             last_fine_correction = 0;
275             last_coarse_correction = 0;
276         }
277 
278         synced = false;
279 
280         {
281             lock_guard<mutex> fib_lock(fib_mut);
282             num_fic_crc_errors = 0;
283         }
284         tiis.clear();
285 
286         cerr << "RETUNE Set frequency" << endl;
287         input.setFrequency(freq);
288         input.reset(); // Clear buffer
289 
290         cerr << "RETUNE Restart RX" << endl;
291         rx = make_unique<RadioReceiver>(*this, input, rro);
292         if (not rx) {
293             throw runtime_error("Could not initialise RadioReceiver");
294         }
295 
296         time_rx_created = chrono::system_clock::now();
297         rx->restart(false);
298 
299         cerr << "RETUNE Start programme handler" << endl;
300         running = true;
301         programme_handler_thread = thread(&WebRadioInterface::handle_phs, this);
302     }
303 }
304 
recv_line(Socket & s)305 static string recv_line(Socket& s) {
306     string line;
307     bool cr_seen = false;
308 
309     while (true) {
310         char c = 0;
311         ssize_t ret = s.recv(&c, 1, 0);
312         if (ret == 0) {
313             return "";
314         }
315         else if (ret == -1) {
316             string errstr = strerror(errno);
317             cerr << "recv error " << errstr << endl;
318             return "";
319         }
320 
321         line += c;
322 
323         if (c == '\r') {
324             cr_seen = true;
325         }
326         else if (cr_seen and c == '\n') {
327             return line;
328         }
329     }
330 }
331 
recv_exactly(Socket & s,size_t num_bytes)332 static vector<char> recv_exactly(Socket& s, size_t num_bytes)
333 {
334     vector<char> buf(num_bytes);
335     size_t rx = 0;
336 
337     while (rx < num_bytes) {
338         const size_t remain = num_bytes - rx;
339         ssize_t ret = s.recv(buf.data() + rx, remain, 0);
340 
341         if (ret == 0) {
342             break;
343         }
344         else if (ret == -1) {
345             string errstr = strerror(errno);
346             cerr << "recv error " << errstr << endl;
347             return {};
348         }
349         else {
350             rx += ret;
351         }
352     }
353 
354     return buf;
355 }
356 
split(const string & str,char c=' ')357 static vector<string> split(const string& str, char c = ' ')
358 {
359     const char *s = str.data();
360     vector<string> result;
361     do {
362         const char *begin = s;
363         while (*s != c && *s)
364             s++;
365         result.push_back(string(begin, s));
366     } while (0 != *s++);
367     return result;
368 }
369 
370 struct http_request_t {
371     bool valid = false;
372 
373     bool is_get = false;
374     bool is_post = false;
375     string url;
376     map<string, string> headers;
377     string post_data;
378 };
379 
380 
parse_http_headers(Socket & s)381 static http_request_t parse_http_headers(Socket& s) {
382     http_request_t r;
383 
384     const auto first_line = recv_line(s);
385     const auto request_type = split(first_line);
386 
387     if (request_type.size() != 3) {
388         cerr << "Malformed request: " << first_line << endl;
389         return r;
390     }
391     else if (request_type[0] == "GET") {
392         r.is_get = true;
393     }
394     else if (request_type[0] == "POST") {
395         r.is_post = true;
396     }
397     else {
398         return r;
399     }
400 
401     r.url = request_type[1];
402 
403     while (true) {
404         string header_line = recv_line(s);
405 
406         if (header_line == "\r\n") {
407             break;
408         }
409 
410         const auto header = split(header_line, ':');
411 
412         if (header.size() == 2) {
413             r.headers.emplace(header[0], header[1]);
414         }
415     }
416 
417     if (r.is_post) {
418         constexpr auto CL = "Content-Length";
419         if (r.headers.count(CL) == 1) {
420             try {
421                 const int content_length = std::stoi(r.headers[CL]);
422                 if (content_length > 1024 * 1024) {
423                     cerr << "Unreasonable POST Content-Length: " << content_length << endl;
424                     return r;
425                 }
426 
427                 const auto buf = recv_exactly(s, content_length);
428                 r.post_data = string(buf.begin(), buf.end());
429             }
430             catch (const invalid_argument&) {
431                 cerr << "Cannot parse POST Content-Length: " << r.headers[CL] << endl;
432                 return r;
433             }
434             catch (const out_of_range&) {
435                 cerr << "Cannot represent POST Content-Length: " << r.headers[CL] << endl;
436                 return r;
437             }
438         }
439     }
440 
441     r.valid = true;
442     return r;
443 }
444 
dispatch_client(Socket && client)445 bool WebRadioInterface::dispatch_client(Socket&& client)
446 {
447     Socket s(move(client));
448 
449     bool success = false;
450 
451     if (not s.valid()) {
452         cerr << "socket in dispatcher not valid!" << endl;
453         return false;
454     }
455 
456     const auto req = parse_http_headers(s);
457 
458     if (not req.valid) {
459         return false;
460     }
461     else {
462         if (req.is_get) {
463             if (req.url == "/") {
464                 success = send_file(s, "index.html", http_contenttype_html);
465             }
466             else if (req.url == "/index.js") {
467                 success = send_file(s, "index.js", http_contenttype_js);
468             }
469             else if (req.url == "/mux.json") {
470                 success = send_mux_json(s);
471             }
472             else if (req.url == "/fic") {
473                 success = send_fic(s);
474             }
475             else if (req.url == "/impulseresponse") {
476                 success = send_impulseresponse(s);
477             }
478             else if (req.url == "/spectrum") {
479                 success = send_spectrum(s);
480             }
481             else if (req.url == "/constellation") {
482                 success = send_constellation(s);
483             }
484             else if (req.url == "/nullspectrum") {
485                 success = send_null_spectrum(s);
486             }
487             else if (req.url == "/channel") {
488                 success = send_channel(s);
489             }
490             else if (req.url == "/fftwindowplacement" or req.url == "/enablecoarsecorrector") {
491                 send_http_response(s, http_405,
492                         "405 Method Not Allowed\r\n" + req.url + " is POST-only");
493                 return false;
494             }
495             else {
496                 const regex regex_slide(R"(^[/]slide[/]([^ ]+))");
497                 std::smatch match_slide;
498 
499                 const regex regex_mp3(R"(^[/]mp3[/]([^ ]+))");
500                 std::smatch match_mp3;
501                 if (regex_search(req.url, match_mp3, regex_mp3)) {
502                     success = send_mp3(s, match_mp3[1]);
503                 }
504                 else if (regex_search(req.url, match_slide, regex_slide)) {
505                     success = send_slide(s, match_slide[1]);
506                 }
507                 else {
508                     cerr << "Could not understand GET request " << req.url << endl;
509                 }
510             }
511         }
512         else if (req.is_post) {
513             if (req.url == "/channel") {
514                 success = handle_channel_post(s, req.post_data);
515             }
516             else if (req.url == "/fftwindowplacement") {
517                 success = handle_fft_window_placement_post(s, req.post_data);
518             }
519             else if (req.url == "/enablecoarsecorrector") {
520                 success = handle_coarse_corrector_post(s, req.post_data);
521             }
522             else {
523                 cerr << "Could not understand POST request " << req.url << endl;
524             }
525         }
526         else {
527             throw logic_error("valid req is neither GET nor POST!");
528         }
529 
530         if (not success) {
531             send_http_response(s, http_404, "Could not understand request.\r\n");
532         }
533 
534         return success;
535     }
536 }
537 
send_file(Socket & s,const std::string & filename,const std::string & content_type)538 bool WebRadioInterface::send_file(Socket& s,
539         const std::string& filename,
540         const std::string& content_type)
541 {
542     FILE *fd = fopen(filename.c_str(), "r");
543     if (fd) {
544         if (not send_http_response(s, http_ok, "", content_type)) {
545             cerr << "Failed to send file headers" << endl;
546             fclose(fd);
547             return false;
548         }
549 
550         vector<char> data(1024);
551         ssize_t ret = 0;
552         do {
553             ret = fread(data.data(), 1, data.size(), fd);
554             ret = s.send(data.data(), ret, MSG_NOSIGNAL);
555             if (ret == -1) {
556                 cerr << "Failed to send file data" << endl;
557                 fclose(fd);
558                 return false;
559             }
560 
561         } while (ret > 0);
562 
563         fclose(fd);
564         return true;
565     }
566     else {
567         return send_http_response(s, http_500, "file '" + filename + "' is missing!");
568     }
569     return false;
570 }
571 
calculate_cir_peaks(const vector<float> & cir_linear)572 static vector<PeakJson> calculate_cir_peaks(const vector<float>& cir_linear)
573 {
574     constexpr size_t num_peaks = 6;
575 
576     vector<PeakJson> peaks;
577 
578     if (not cir_linear.empty()) {
579         vector<float> cir_lin(cir_linear);
580 
581         // Every time we find a peak, we attenuate it, including
582         // its surrounding values, and we go search the next peak.
583         for (size_t peak = 0; peak < num_peaks; peak++) {
584             PeakJson p;
585             for (size_t i = 1; i < cir_lin.size(); i++) {
586                 if (cir_lin[i] > p.value) {
587                     p.value = cir_lin[i];
588                     p.index = i;
589                 }
590             }
591 
592             const size_t windowsize = 25;
593             for (size_t j = 0; j < windowsize; j++) {
594                 const ssize_t i = p.index + j - windowsize/2;
595                 if (i >= 0 and i < (ssize_t)cir_lin.size()) {
596                     cir_lin[i] *= 0;
597                 }
598             }
599 
600             peaks.push_back(move(p));
601         }
602     }
603 
604     return peaks;
605 }
606 
send_mux_json(Socket & s)607 bool WebRadioInterface::send_mux_json(Socket& s)
608 {
609     MuxJson mux_json;
610 
611     mux_json.receiver.software.name = "welle.io";
612     mux_json.receiver.software.version = VERSION;
613     mux_json.receiver.software.fftwindowplacement = fftPlacementMethodToString(rro.fftPlacementMethod);
614     mux_json.receiver.software.coarsecorrectorenabled = not rro.disableCoarseCorrector;
615     mux_json.receiver.software.freqsyncmethod = freqSyncMethodToString(rro.freqsyncMethod);
616     mux_json.receiver.software.lastchannelchange = time_rx_created;
617     mux_json.receiver.hardware.name = input.getDescription();
618     mux_json.receiver.hardware.gain = input.getGain();
619 
620     {
621         lock_guard<mutex> lock(fib_mut);
622         mux_json.demodulator_fic_numcrcerrors = num_fic_crc_errors;
623     }
624 
625     {
626         lock_guard<mutex> lock(rx_mut);
627         ASSERT_RX;
628 
629         mux_json.ensemble.label = rx->getEnsembleLabel();
630 
631         mux_json.ensemble.id = to_hex(rx->getEnsembleId(), 4);
632         mux_json.ensemble.ecc = to_hex(rx->getEnsembleEcc(), 2);
633 
634         for (const auto& s : rx->getServiceList()) {
635             ServiceJson service;
636             service.sid = to_hex(s.serviceId, 4);
637             service.programType = s.programType;
638             service.ptystring = DABConstants::getProgramTypeName(s.programType);
639             service.language = s.language;
640             service.languagestring = DABConstants::getLanguageName(s.language);
641             service.label = s.serviceLabel;
642             service.url_mp3 = "";
643 
644             for (const auto& sc : rx->getComponents(s)) {
645                 ComponentJson component;
646                 component.componentnr = sc.componentNr;
647                 component.primary = (sc.PS_flag ? true : false);
648                 component.caflag = (sc.CAflag ? true : false);
649                 component.label = sc.componentLabel;
650 
651                 const auto sub = rx->getSubchannel(sc);
652 
653                 switch (sc.transportMode()) {
654                     case TransportMode::Audio:
655                         component.transportmode = "audio";
656                         component.ascty = make_unique<string>(
657                                 string{
658                                     (sc.audioType() == AudioServiceComponentType::DAB ? "DAB" :
659                                      sc.audioType() == AudioServiceComponentType::DABPlus ? "DAB+" :
660                                      "unknown")});
661                         if (sc.audioType() == AudioServiceComponentType::DAB or
662                             sc.audioType() == AudioServiceComponentType::DABPlus) {
663                             string urlmp3 = "/mp3/" + to_hex(s.serviceId, 4);
664                             service.url_mp3 = urlmp3;
665                         }
666                         break;
667                     case TransportMode::FIDC:
668                         component.transportmode = "fidc";
669                         component.dscty = make_unique<uint16_t>(sc.DSCTy);
670                         break;
671                     case TransportMode::PacketData:
672                         component.transportmode = "packetdata";
673                         component.scid = make_unique<uint16_t>(sc.SCId);
674                         break;
675                     case TransportMode::StreamData:
676                         component.transportmode = "streamdata";
677                         component.dscty = make_unique<uint16_t>(sc.DSCTy);
678                         break;
679                 }
680 
681                 component.subchannel = sub;
682 
683                 service.components.push_back(move(component));
684             }
685 
686             try {
687                 const auto& wph = phs.at(s.serviceId);
688                 const auto al = wph.getAudioLevels();
689                 service.audiolevel_present = true;
690                 service.audiolevel_time = chrono::system_clock::to_time_t(al.time);
691                 service.audiolevel_left = al.last_audioLevel_L;
692                 service.audiolevel_right = al.last_audioLevel_R;
693 
694                 service.channels = 2;
695                 service.samplerate = wph.rate;
696                 service.mode = wph.mode;
697 
698                 auto mot = wph.getMOT();
699                 service.mot_time = chrono::system_clock::to_time_t(mot.time);
700                 service.mot_lastchange = chrono::system_clock::to_time_t(mot.last_changed);
701 
702                 auto dls = wph.getDLS();
703                 service.dls_label = dls.label;
704                 service.dls_time = chrono::system_clock::to_time_t(dls.time);
705                 service.dls_lastchange = chrono::system_clock::to_time_t(mot.last_changed);
706 
707                 auto errorcounters = wph.getErrorCounters();
708                 service.errorcounters_frameerrors = errorcounters.num_frameErrors;
709                 service.errorcounters_rserrors = errorcounters.num_rsErrors;
710                 service.errorcounters_aacerrors = errorcounters.num_aacErrors;
711                 service.errorcounters_time = chrono::system_clock::to_time_t(dls.time);
712 
713                 auto xpad_err = wph.getXPADErrors();
714                 service.xpaderror_haserror = xpad_err.has_error;
715                 if (xpad_err.has_error) {
716                     service.xpaderror_announcedlen = xpad_err.announced_xpad_len;
717                     service.xpaderror_len = xpad_err.xpad_len;
718                     service.xpaderror_time = chrono::system_clock::to_time_t(xpad_err.time);
719                 }
720             }
721             catch (const out_of_range&) {
722                 service.audiolevel_present = false;
723                 service.channels = 0;
724                 service.samplerate = 0;
725                 service.mode = "invalid";
726             }
727 
728             mux_json.services.push_back(move(service));
729         }
730     }
731 
732     {
733         lock_guard<mutex> lock(data_mut);
734 
735         mux_json.utctime.year = last_dateTime.year;
736         mux_json.utctime.month = last_dateTime.month;
737         mux_json.utctime.day = last_dateTime.day;
738         mux_json.utctime.hour = last_dateTime.hour;
739         mux_json.utctime.minutes = last_dateTime.minutes;
740         mux_json.utctime.lto = last_dateTime.hourOffset + ((double)last_dateTime.minuteOffset / 30.0);
741 
742         for (const auto& m : pending_messages) {
743             using namespace chrono;
744 
745             stringstream ss;
746 
747             const auto ms = duration_cast<milliseconds>(
748                     m.timestamp.time_since_epoch());
749 
750             const auto s = duration_cast<seconds>(ms);
751             const std::time_t t = s.count();
752             const std::size_t fractional_seconds = ms.count() % 1000;
753 
754             ss << std::ctime(&t) << "." << fractional_seconds;
755 
756             switch (m.level) {
757                 case message_level_t::Information:
758                     ss << " INFO : ";
759                     break;
760                 case message_level_t::Error:
761                     ss << " ERROR: ";
762                     break;
763             }
764 
765             ss << m.text;
766             mux_json.messages.push_back(ss.str());
767         }
768 
769         pending_messages.clear();
770 
771         mux_json.demodulator_snr = last_snr;
772         mux_json.demodulator_frequencycorrection = last_fine_correction + last_coarse_correction;
773         mux_json.demodulator_timelastfct0frame = rx->getReceiverStats().timeLastFCT0Frame;
774 
775         mux_json.tii = getTiiStats();
776     }
777 
778     {
779         lock_guard<mutex> lock(plotdata_mut);
780         mux_json.cir_peaks = calculate_cir_peaks(last_CIR);
781     }
782 
783     if (not send_http_response(s, http_ok, "", http_contenttype_json)) {
784         return false;
785     }
786 
787     const auto json_str = build_mux_json(mux_json);
788 
789     ssize_t ret = s.send(json_str.c_str(), json_str.size(), MSG_NOSIGNAL);
790     if (ret == -1) {
791         cerr << "Failed to send mux.json data" << endl;
792         return false;
793     }
794     return true;
795 }
796 
send_mp3(Socket & s,const std::string & stream)797 bool WebRadioInterface::send_mp3(Socket& s, const std::string& stream)
798 {
799     unique_lock<mutex> lock(rx_mut);
800     ASSERT_RX;
801 
802     for (const auto& srv : rx->getServiceList()) {
803         if (rx->serviceHasAudioComponent(srv) and
804                 (to_hex(srv.serviceId, 4) == stream or
805                 (uint32_t)std::stoul(stream) == srv.serviceId)) {
806             try {
807                 auto& ph = phs.at(srv.serviceId);
808 
809                 lock.unlock();
810 
811                 if (not send_http_response(s, http_ok, "", http_contenttype_mp3)) {
812                     cerr << "Failed to send mp3 headers" << endl;
813                     return false;
814                 }
815 
816                 ProgrammeSender sender(move(s));
817 
818                 cerr << "Registering mp3 sender" << endl;
819                 ph.registerSender(&sender);
820                 check_decoders_required();
821                 sender.wait_for_termination();
822 
823                 cerr << "Removing mp3 sender" << endl;
824                 ph.removeSender(&sender);
825                 check_decoders_required();
826 
827                 return true;
828             }
829             catch (const out_of_range& e) {
830                 cerr << "Could not setup mp3 sender for " <<
831                     srv.serviceId << ": " << e.what() << endl;
832 
833                 send_http_response(s, http_503, e.what());
834                 return false;
835             }
836         }
837     }
838     return false;
839 }
840 
send_slide(Socket & s,const std::string & stream)841 bool WebRadioInterface::send_slide(Socket& s, const std::string& stream)
842 {
843     for (const auto& wph : phs) {
844         if (to_hex(wph.first, 4) == stream or
845                 (uint32_t)std::stoul(stream) == wph.first) {
846             const auto mot = wph.second.getMOT();
847 
848             if (mot.data.empty()) {
849                 send_http_response(s, http_404, "404 Not Found\r\nSlide not available.\r\n");
850                 return true;
851             }
852 
853             stringstream headers;
854             headers << http_ok;
855 
856             headers << "Content-Type: ";
857             switch (mot.subtype) {
858                 case MOTType::Unknown:
859                     headers << "application/octet-stream";
860                     break;
861                 case MOTType::JPEG:
862                     headers << "image/jpeg";
863                     break;
864                 case MOTType::PNG:
865                     headers << "image/png";
866                     break;
867             }
868             headers << "\r\n";
869 
870             headers << http_nocache;
871 
872             headers << "Last-Modified: ";
873             std::time_t t = chrono::system_clock::to_time_t(mot.time);
874             headers << put_time(std::gmtime(&t), "%a, %d %b %Y %T GMT");
875             headers << "\r\n";
876 
877             headers << "\r\n";
878             const auto headers_str = headers.str();
879             int ret = s.send(headers_str.data(), headers_str.size(), MSG_NOSIGNAL);
880             if (ret == (ssize_t)headers_str.size()) {
881                 ret = s.send(mot.data.data(), mot.data.size(), MSG_NOSIGNAL);
882             }
883 
884             if (ret == -1) {
885                 cerr << "Failed to send slide" << endl;
886             }
887 
888             return true;
889         }
890     }
891     return false;
892 }
893 
send_fic(Socket & s)894 bool WebRadioInterface::send_fic(Socket& s)
895 {
896     if (not send_http_response(s, http_ok, "", http_contenttype_data)) {
897         cerr << "Failed to send FIC headers" << endl;
898         return false;
899     }
900 
901     while (true) {
902         unique_lock<mutex> lock(fib_mut);
903         while (fib_blocks.empty()) {
904             new_fib_block_available.wait_for(lock, chrono::seconds(1));
905         }
906         ssize_t ret = s.send(fib_blocks.front().data(),
907                 fib_blocks.front().size(), MSG_NOSIGNAL);
908         if (ret == -1) {
909             cerr << "Failed to send FIC data" << endl;
910             return false;
911         }
912 
913         fib_blocks.pop_front();
914     }
915     return true;
916 }
917 
send_impulseresponse(Socket & s)918 bool WebRadioInterface::send_impulseresponse(Socket& s)
919 {
920     if (not send_http_response(s, http_ok, "", http_contenttype_data)) {
921         cerr << "Failed to send CIR headers" << endl;
922         return false;
923     }
924 
925     lock_guard<mutex> lock(plotdata_mut);
926     vector<float> cir_db(last_CIR.size());
927     std::transform(last_CIR.begin(), last_CIR.end(), cir_db.begin(),
928             [](float y) { return 10.0f * log10(y); });
929 
930     size_t lengthBytes = cir_db.size() * sizeof(float);
931     ssize_t ret = s.send(cir_db.data(), lengthBytes, MSG_NOSIGNAL);
932     if (ret == -1) {
933         cerr << "Failed to send CIR data" << endl;
934         return false;
935     }
936 
937     return true;
938 }
939 
send_fft_data(Socket & s,DSPCOMPLEX * spectrumBuffer,size_t T_u)940 static bool send_fft_data(Socket& s, DSPCOMPLEX *spectrumBuffer, size_t T_u)
941 {
942     vector<float> spectrum(T_u);
943 
944     // Shift FFT samples
945     const size_t half_Tu = T_u / 2;
946     for (size_t i = 0; i < half_Tu; i++) {
947         spectrum[i] = abs(spectrumBuffer[i + half_Tu]);
948     }
949     for (size_t i = half_Tu; i < T_u; i++) {
950         spectrum[i] = abs(spectrumBuffer[i - half_Tu]);
951     }
952 
953     if (not send_http_response(s, http_ok, "", http_contenttype_data)) {
954         cerr << "Failed to send spectrum headers" << endl;
955         return false;
956     }
957 
958     size_t lengthBytes = spectrum.size() * sizeof(float);
959     ssize_t ret = s.send(spectrum.data(), lengthBytes, MSG_NOSIGNAL);
960     if (ret == -1) {
961         cerr << "Failed to send spectrum data" << endl;
962         return false;
963     }
964 
965     return true;
966 }
967 
send_spectrum(Socket & s)968 bool WebRadioInterface::send_spectrum(Socket& s)
969 {
970     // Get FFT buffer
971     DSPCOMPLEX* spectrumBuffer = spectrum_fft_handler.getVector();
972     auto samples = input.getSpectrumSamples(dabparams.T_u);
973 
974     // Continue only if we got data
975     if (samples.size() != (size_t)dabparams.T_u)
976         return false;
977 
978     std::copy(samples.begin(), samples.end(), spectrumBuffer);
979 
980     // Do FFT to get the spectrum
981     spectrum_fft_handler.do_FFT();
982 
983     return send_fft_data(s, spectrumBuffer, dabparams.T_u);
984 }
985 
send_null_spectrum(Socket & s)986 bool WebRadioInterface::send_null_spectrum(Socket& s)
987 {
988     // Get FFT buffer
989     DSPCOMPLEX* spectrumBuffer = spectrum_fft_handler.getVector();
990 
991     lock_guard<mutex> lock(plotdata_mut);
992     if (last_NULL.empty()) {
993         return false;
994     }
995     else if (last_NULL.size() != (size_t)dabparams.T_null) {
996         cerr << "Invalid NULL size " << last_NULL.size() << endl;
997         return false;
998     }
999 
1000     copy(last_NULL.begin(), last_NULL.begin() + dabparams.T_u, spectrumBuffer);
1001 
1002     // Do FFT to get the spectrum
1003     spectrum_fft_handler.do_FFT();
1004 
1005     return send_fft_data(s, spectrumBuffer, dabparams.T_u);
1006 }
1007 
send_constellation(Socket & s)1008 bool WebRadioInterface::send_constellation(Socket& s)
1009 {
1010     const size_t decim = OfdmDecoder::constellationDecimation;
1011     const size_t num_iqpoints = (dabparams.L-1) * dabparams.K / decim;
1012     std::vector<float> phases(num_iqpoints);
1013 
1014     lock_guard<mutex> lock(plotdata_mut);
1015     if (last_constellation.size() == num_iqpoints) {
1016         phases.resize(num_iqpoints);
1017         for (size_t i = 0; i < num_iqpoints; i++) {
1018             const float y = 180.0f / (float)M_PI * std::arg(last_constellation[i]);
1019             phases[i] = y;
1020         }
1021 
1022         if (not send_http_response(s, http_ok, "", http_contenttype_data)) {
1023             cerr << "Failed to send constellation headers" << endl;
1024             return false;
1025         }
1026 
1027         size_t lengthBytes = phases.size() * sizeof(float);
1028         ssize_t ret = s.send(phases.data(), lengthBytes, MSG_NOSIGNAL);
1029         if (ret == -1) {
1030             cerr << "Failed to send constellation data" << endl;
1031             return false;
1032         }
1033 
1034         return true;
1035     }
1036 
1037     return false;
1038 }
1039 
send_channel(Socket & s)1040 bool WebRadioInterface::send_channel(Socket& s)
1041 {
1042     const auto freq = input.getFrequency();
1043 
1044     try {
1045         const auto chan = channels.getChannelForFrequency(freq);
1046 
1047         string response = http_ok;
1048         response += http_contenttype_text;
1049         response += http_nocache;
1050         response += "\r\n";
1051         response += chan;
1052         ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1053         if (ret == -1) {
1054             cerr << "Failed to send frequency" << endl;
1055             return false;
1056         }
1057     }
1058     catch (const out_of_range& e) {
1059         string response = http_500;
1060         response += http_contenttype_text;
1061         response += http_nocache;
1062         response += "\r\n";
1063         response += "Error: ";
1064         response += e.what();
1065         ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1066         if (ret == -1) {
1067             cerr << "Failed to send frequency 500" << endl;
1068             return false;
1069         }
1070     }
1071     return true;
1072 }
1073 
handle_fft_window_placement_post(Socket & s,const std::string & fft_window_placement)1074 bool WebRadioInterface::handle_fft_window_placement_post(Socket& s, const std::string& fft_window_placement)
1075 {
1076     cerr << "POST fft window: " << fft_window_placement << endl;
1077 
1078     if (fft_window_placement == "EarliestPeakWithBinning") {
1079         rro.fftPlacementMethod = FFTPlacementMethod::EarliestPeakWithBinning;
1080     }
1081     else if (fft_window_placement == "StrongestPeak") {
1082         rro.fftPlacementMethod = FFTPlacementMethod::StrongestPeak;
1083     }
1084     else if (fft_window_placement == "ThresholdBeforePeak") {
1085         rro.fftPlacementMethod = FFTPlacementMethod::ThresholdBeforePeak;
1086     }
1087     else {
1088         string response = http_400;
1089         response += http_contenttype_text;
1090         response += http_nocache;
1091         response += "\r\n";
1092         response += "Invalid FFT Window Placement requested.";
1093         ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1094         if (ret == -1) {
1095             cerr << "Failed to send frequency" << endl;
1096             return false;
1097         }
1098         return true;
1099     }
1100 
1101     {
1102         lock_guard<mutex> lock(rx_mut);
1103         ASSERT_RX;
1104         rx->setReceiverOptions(rro);
1105     }
1106 
1107     string response = http_ok;
1108     response += http_contenttype_text;
1109     response += http_nocache;
1110     response += "\r\n";
1111     response += "Switched FFT Window Placement.";
1112     ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1113     if (ret == -1) {
1114         cerr << "Failed to send frequency" << endl;
1115         return false;
1116     }
1117     return true;
1118 }
1119 
handle_coarse_corrector_post(Socket & s,const std::string & coarseCorrector)1120 bool WebRadioInterface::handle_coarse_corrector_post(Socket& s, const std::string& coarseCorrector)
1121 {
1122     cerr << "POST coarse : " << coarseCorrector << endl;
1123 
1124     if (coarseCorrector == "0") {
1125         rro.disableCoarseCorrector = true;
1126     }
1127     else if (coarseCorrector == "1") {
1128         rro.disableCoarseCorrector = false;
1129     }
1130     else {
1131         string response = http_400;
1132         response += http_contenttype_text;
1133         response += http_nocache;
1134         response += "\r\n";
1135         response += "Invalid coarse corrector selected";
1136         ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1137         if (ret == -1) {
1138             cerr << "Failed to set response" << endl;
1139             return false;
1140         }
1141         return true;
1142     }
1143 
1144     {
1145         lock_guard<mutex> lock(rx_mut);
1146         ASSERT_RX;
1147         rx->setReceiverOptions(rro);
1148     }
1149 
1150     string response = http_ok;
1151     response += http_contenttype_text;
1152     response += http_nocache;
1153     response += "\r\n";
1154     response += "Switched Coarse corrector.";
1155     ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1156     if (ret == -1) {
1157         cerr << "Failed to send coarse switch confirmation" << endl;
1158         return false;
1159     }
1160     return true;
1161 }
1162 
handle_channel_post(Socket & s,const std::string & channel)1163 bool WebRadioInterface::handle_channel_post(Socket& s, const std::string& channel)
1164 {
1165     cerr << "POST channel: " << channel << endl;
1166 
1167     retune(channel);
1168 
1169     string response = http_ok;
1170     response += http_contenttype_text;
1171     response += http_nocache;
1172     response += "\r\n";
1173     response += "Retuning...";
1174     ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1175     if (ret == -1) {
1176         cerr << "Failed to send frequency" << endl;
1177         return false;
1178     }
1179     return true;
1180 }
1181 
handle_phs()1182 void WebRadioInterface::handle_phs()
1183 {
1184     while (running) {
1185         this_thread::sleep_for(chrono::seconds(2));
1186 
1187         unique_lock<mutex> lock(rx_mut);
1188         ASSERT_RX;
1189 
1190         auto serviceList = rx->getServiceList();
1191         for (auto& s : serviceList) {
1192             auto scs = rx->getComponents(s);
1193 
1194             if (std::find(
1195                         carousel_services_available.cbegin(),
1196                         carousel_services_available.cend(),
1197                         s.serviceId) == carousel_services_available.cend()) {
1198                 for (auto& sc : scs) {
1199                     if (sc.transportMode() == TransportMode::Audio) {
1200                         carousel_services_available.push_back(s.serviceId);
1201                     }
1202                 }
1203             }
1204 
1205             if (phs.count(s.serviceId) == 0) {
1206                 WebProgrammeHandler ph(s.serviceId);
1207                 phs.emplace(std::make_pair(s.serviceId, move(ph)));
1208             }
1209         }
1210 
1211         using namespace chrono;
1212         size_t max_services_in_carousel = std::min(
1213                 carousel_services_available.size(),
1214                 (size_t)decode_settings.num_decoders_in_carousel);
1215 
1216         if (decode_settings.strategy == DecodeStrategy::Carousel10) {
1217             while (carousel_services_active.size() < max_services_in_carousel) {
1218                 carousel_services_active.emplace_back(
1219                         carousel_services_available.front());
1220                 carousel_services_available.pop_front();
1221             }
1222 
1223             for (auto& acs : carousel_services_active) {
1224                 if (acs.time_change + chrono::seconds(10) <
1225                         chrono::steady_clock::now()) {
1226                     acs.sid = 0;
1227 
1228                     if (not carousel_services_available.empty()) {
1229                         carousel_services_active.emplace_back(
1230                                 carousel_services_available.front());
1231                         carousel_services_available.pop_front();
1232                     }
1233                 }
1234             }
1235         }
1236         else if (decode_settings.strategy == DecodeStrategy::CarouselPAD) {
1237             while (carousel_services_active.size() < max_services_in_carousel) {
1238                 if (not serviceList.empty()) {
1239                     carousel_services_active.emplace_back(
1240                             carousel_services_available.front());
1241                     carousel_services_available.pop_front();
1242                 }
1243             }
1244 
1245             for (auto& acs : carousel_services_active) {
1246                 if (acs.time_change + chrono::seconds(5) <
1247                         chrono::steady_clock::now()) {
1248                     auto current_it = phs.find(acs.sid);
1249                     if (current_it == phs.end()) {
1250                         cerr << "Reset service decoder carousel!"
1251                             "Cannot find service "
1252                             << acs.sid << endl;
1253                         acs.sid = 0;
1254                     }
1255                     else {
1256                         // Switch to next programme once both DLS and Slideshow
1257                         // got decoded, but at most after 80 seconds
1258                         const auto now = system_clock::now();
1259                         const auto mot = current_it->second.getMOT();
1260                         const auto dls = current_it->second.getDLS();
1261                         // Slide and DLS received in the last 60 seconds?
1262                         const bool switchBecausePAD = (
1263                                 now - mot.time < seconds(60) and
1264                                 now - dls.time < seconds(60));
1265 
1266                         const bool switchBecauseLate =
1267                             acs.time_change + seconds(80) < steady_clock::now();
1268 
1269                         if (switchBecausePAD or switchBecauseLate) {
1270                             acs.sid = 0;
1271 
1272                             if (not carousel_services_available.empty()) {
1273                                 carousel_services_active.emplace_back(
1274                                         carousel_services_available.front());
1275                                 carousel_services_available.pop_front();
1276                             }
1277                         }
1278                     }
1279                 }
1280             }
1281         }
1282 
1283         carousel_services_active.erase(
1284                 remove_if(
1285                     carousel_services_active.begin(),
1286                     carousel_services_active.end(),
1287                     [](const ActiveCarouselService& acs){
1288                         return acs.sid == 0;
1289                     }), carousel_services_active.end());
1290         lock.unlock();
1291         check_decoders_required();
1292     }
1293 
1294     cerr << "TEARDOWN Cancel all PHs and remove services" << endl;
1295     {
1296         lock_guard<mutex> lock(rx_mut);
1297         for (auto& ph : phs) {
1298             ph.second.cancelAll();
1299 
1300             const auto srv = rx->getService(ph.first);
1301             if (srv.serviceId != 0) {
1302                 (void)rx->removeServiceToDecode(srv);
1303             }
1304         }
1305     }
1306 
1307     cerr << "TEARDOWN Stop rx" << endl;
1308     {
1309         unique_lock<mutex> lock(rx_mut);
1310         rx->stop();
1311     }
1312 }
1313 
1314 #if HAVE_SIGACTION
1315 static volatile sig_atomic_t sig_caught = 0;
handler(int)1316 static void handler(int /*signum*/)
1317 {
1318     sig_caught = 1;
1319 }
1320 #else
1321 const int sig_caught = 0;
1322 #endif
1323 
serve()1324 void WebRadioInterface::serve()
1325 {
1326     deque<future<bool> > running_connections;
1327 
1328 #if HAVE_SIGACTION
1329     struct sigaction sa = {};
1330     sa.sa_handler = handler;
1331     sigemptyset(&sa.sa_mask);
1332     if (sigaction(SIGINT, &sa, NULL) == -1) {
1333         cerr << "Failed to set up signal handler" << endl;
1334     }
1335 #endif
1336 
1337     while (sig_caught == 0) {
1338         auto client = serverSocket.accept();
1339 
1340         running_connections.push_back(async(launch::async,
1341                     &WebRadioInterface::dispatch_client, this, move(client)));
1342 
1343         deque<future<bool> > still_running_connections;
1344         for (auto& fut : running_connections) {
1345             if (fut.valid()) {
1346                 switch (fut.wait_for(chrono::milliseconds(1))) {
1347                     case future_status::deferred:
1348                     case future_status::timeout:
1349                         still_running_connections.push_back(move(fut));
1350                         break;
1351                     case future_status::ready:
1352                         fut.get();
1353                         break;
1354                 }
1355             }
1356         }
1357         running_connections = move(still_running_connections);
1358     }
1359 
1360     cerr << "SERVE No more connections running" << endl;
1361 
1362     running = false;
1363     if (programme_handler_thread.joinable()) {
1364         programme_handler_thread.join();
1365     }
1366 
1367     cerr << "SERVE Wait for all futures to clear" << endl;
1368     while (running_connections.size() > 0) {
1369         deque<future<bool> > still_running_connections;
1370         for (auto& fut : running_connections) {
1371             if (fut.valid()) {
1372                 switch (fut.wait_for(chrono::milliseconds(1))) {
1373                     case future_status::deferred:
1374                     case future_status::timeout:
1375                         still_running_connections.push_back(move(fut));
1376                         break;
1377                     case future_status::ready:
1378                         fut.get();
1379                         break;
1380                 }
1381             }
1382         }
1383         running_connections = move(still_running_connections);
1384     }
1385 
1386     cerr << "SERVE clear remaining data structures" << endl;
1387     phs.clear();
1388     programmes_being_decoded.clear();
1389     carousel_services_available.clear();
1390     carousel_services_active.clear();
1391 }
1392 
onSNR(float snr)1393 void WebRadioInterface::onSNR(float snr)
1394 {
1395     lock_guard<mutex> lock(data_mut);
1396     last_snr = snr;
1397 }
1398 
onFrequencyCorrectorChange(int fine,int coarse)1399 void WebRadioInterface::onFrequencyCorrectorChange(int fine, int coarse)
1400 {
1401     lock_guard<mutex> lock(data_mut);
1402     last_fine_correction = fine;
1403     last_coarse_correction = coarse;
1404 }
1405 
onSyncChange(char isSync)1406 void WebRadioInterface::onSyncChange(char isSync)
1407 {
1408     synced = isSync;
1409 }
1410 
onSignalPresence(bool)1411 void WebRadioInterface::onSignalPresence(bool /*isSignal*/) { }
onServiceDetected(uint32_t)1412 void WebRadioInterface::onServiceDetected(uint32_t /*sId*/) { }
onNewEnsemble(uint16_t)1413 void WebRadioInterface::onNewEnsemble(uint16_t /*eId*/) { }
onSetEnsembleLabel(DabLabel &)1414 void WebRadioInterface::onSetEnsembleLabel(DabLabel& /*label*/) { }
1415 
onDateTimeUpdate(const dab_date_time_t & dateTime)1416 void WebRadioInterface::onDateTimeUpdate(const dab_date_time_t& dateTime)
1417 {
1418     lock_guard<mutex> lock(data_mut);
1419     last_dateTime = dateTime;
1420 }
1421 
onFIBDecodeSuccess(bool crcCheckOk,const uint8_t * fib)1422 void WebRadioInterface::onFIBDecodeSuccess(bool crcCheckOk, const uint8_t* fib)
1423 {
1424     if (not crcCheckOk) {
1425         lock_guard<mutex> lock(fib_mut);
1426         num_fic_crc_errors++;
1427         return;
1428     }
1429 
1430     // Convert the fib bitvector to bytes
1431     vector<uint8_t> buf(32);
1432     for (size_t i = 0; i < buf.size(); i++) {
1433         uint8_t v = 0;
1434         for (int j = 0; j < 8; j++) {
1435             if (fib[8*i+j]) {
1436                 v |= 1 << (7-j);
1437             }
1438         }
1439         buf[i] = v;
1440     }
1441 
1442     {
1443         lock_guard<mutex> lock(fib_mut);
1444         fib_blocks.push_back(move(buf));
1445 
1446         if (fib_blocks.size() > 3*250) { // six seconds
1447             fib_blocks.pop_front();
1448         }
1449     }
1450 
1451     new_fib_block_available.notify_one();
1452 }
1453 
onNewImpulseResponse(std::vector<float> && data)1454 void WebRadioInterface::onNewImpulseResponse(std::vector<float>&& data)
1455 {
1456     lock_guard<mutex> lock(plotdata_mut);
1457     last_CIR = move(data);
1458 }
1459 
onNewNullSymbol(std::vector<DSPCOMPLEX> && data)1460 void WebRadioInterface::onNewNullSymbol(std::vector<DSPCOMPLEX>&& data)
1461 {
1462     lock_guard<mutex> lock(plotdata_mut);
1463     last_NULL = move(data);
1464 }
1465 
onConstellationPoints(std::vector<DSPCOMPLEX> && data)1466 void WebRadioInterface::onConstellationPoints(std::vector<DSPCOMPLEX>&& data)
1467 {
1468     lock_guard<mutex> lock(plotdata_mut);
1469     last_constellation = move(data);
1470 }
1471 
onMessage(message_level_t level,const std::string & text,const std::string & text2)1472 void WebRadioInterface::onMessage(message_level_t level, const std::string& text, const std::string& text2)
1473 {
1474     std::string fullText;
1475     if (text2.empty())
1476         fullText = text;
1477     else
1478         fullText = text + text2;
1479 
1480     lock_guard<mutex> lock(data_mut);
1481     const auto now = std::chrono::system_clock::now();
1482     pending_message_t m = { .level = level, .text = fullText, .timestamp = now};
1483     pending_messages.emplace_back(move(m));
1484 
1485     if (pending_messages.size() > MAX_PENDING_MESSAGES) {
1486         pending_messages.pop_front();
1487     }
1488 }
1489 
onTIIMeasurement(tii_measurement_t && m)1490 void WebRadioInterface::onTIIMeasurement(tii_measurement_t&& m)
1491 {
1492     lock_guard<mutex> lock(data_mut);
1493     auto& l = tiis[make_pair(m.comb, m.pattern)];
1494     l.push_back(move(m));
1495 
1496     if (l.size() > 20) {
1497         l.pop_front();
1498     }
1499 }
1500 
onInputFailure()1501 void WebRadioInterface::onInputFailure()
1502 {
1503     std::exit(1);
1504 }
1505 
getTiiStats()1506 list<tii_measurement_t> WebRadioInterface::getTiiStats()
1507 {
1508     list<tii_measurement_t> l;
1509 
1510     for (const auto& cp_list : tiis) {
1511         const auto comb = cp_list.first.first;
1512         const auto pattern = cp_list.first.second;
1513 
1514         if (cp_list.second.size() < 5) {
1515             continue;
1516         }
1517 
1518         tii_measurement_t avg;
1519         avg.comb = comb;
1520         avg.pattern = pattern;
1521         vector<int> delays;
1522         double error = 0.0;
1523         size_t len = 0;
1524         for (const auto& meas : cp_list.second) {
1525             delays.push_back(meas.delay_samples);
1526             error += meas.error;
1527             len++;
1528         }
1529 
1530         if (len > 0) {
1531             avg.error = error / len;
1532 
1533             // Calculate the median
1534             std::nth_element(delays.begin(), delays.begin() + len/2, delays.end());
1535             avg.delay_samples = delays[len/2];
1536         }
1537         else {
1538             // To quiet static analysis check
1539             avg.error = 0.0;
1540             avg.delay_samples = 0;
1541         }
1542         l.push_back(move(avg));
1543     }
1544 
1545     using namespace std::chrono;
1546     const auto now = steady_clock::now();
1547     // Remove a single entry every second to make the flukes
1548     // disappear
1549     if (time_last_tiis_clean + seconds(1) > now) {
1550         for (auto& cp_list : tiis) {
1551             cp_list.second.pop_front();
1552         }
1553 
1554         time_last_tiis_clean = now;
1555     }
1556 
1557     return l;
1558 }
1559