1 /*
2 * Copyright (C) 2020
3 * Matthias P. Braendli (matthias.braendli@mpb.li)
4 *
5 * This file is part of the welle.io.
6 * Many of the ideas as implemented in welle.io are derived from
7 * other work, made available through the GNU general Public License.
8 * All copyrights of the original authors are recognized.
9 *
10 * welle.io is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
14 *
15 * welle.io is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public License
21 * along with welle.io; if not, write to the Free Software
22 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 *
24 */
25
26 #include "welle-cli/webradiointerface.h"
27 #include <algorithm>
28 #include <cmath>
29 #include <complex>
30 #include <cstdio>
31 #include <cstdlib>
32 #include <cstring>
33 #include <ctime>
34 #include <errno.h>
35 #include <future>
36 #include <iomanip>
37 #include <iostream>
38 #include <regex>
39 #include <signal.h>
40 #include <stdexcept>
41
42 #if defined(_WIN32)
43 #include <winsock2.h>
44 #include <ws2tcpip.h>
45 #include <windows.h>
46 #else
47 #include <sys/socket.h>
48 #endif
49
50 #include <utility>
51 #include "Socket.h"
52 #include "channels.h"
53 #include "ofdm-decoder.h"
54 #include "radio-receiver.h"
55 #include "virtual_input.h"
56 #include "welle-cli/jsonconvert.h"
57 #include "welle-cli/webprogrammehandler.h"
58
59 #ifdef __unix__
60 # include <unistd.h>
61 # if _POSIX_VERSION >= 200809L
62 # define HAVE_SIGACTION 1
63 # include <signal.h>
64 # else
65 # define HAVE_SIGACTION 0
66 # endif
67 #endif
68
69 #ifndef MSG_NOSIGNAL
70 #define MSG_NOSIGNAL 0
71 #endif
72
73 #ifdef GITDESCRIBE
74 #define VERSION GITDESCRIBE
75 #else
76 #define VERSION "unknown"
77 #endif
78
79 #define ASSERT_RX if (not rx) throw logic_error("rx does not exist")
80
81 constexpr size_t MAX_PENDING_MESSAGES = 512;
82
83 using namespace std;
84
85 static const char* http_ok = "HTTP/1.0 200 OK\r\n";
86 static const char* http_400 = "HTTP/1.0 400 Bad Request\r\n";
87 static const char* http_404 = "HTTP/1.0 404 Not Found\r\n";
88 static const char* http_405 = "HTTP/1.0 405 Method Not Allowed\r\n";
89 static const char* http_500 = "HTTP/1.0 500 Internal Server Error\r\n";
90 static const char* http_503 = "HTTP/1.0 503 Service Unavailable\r\n";
91 static const char* http_contenttype_mp3 = "Content-Type: audio/mpeg\r\n";
92 static const char* http_contenttype_text = "Content-Type: text/plain\r\n";
93 static const char* http_contenttype_data =
94 "Content-Type: application/octet-stream\r\n";
95
96 static const char* http_contenttype_json =
97 "Content-Type: application/json; charset=utf-8\r\n";
98
99 static const char* http_contenttype_js =
100 "Content-Type: text/javascript; charset=utf-8\r\n";
101
102 static const char* http_contenttype_html =
103 "Content-Type: text/html; charset=utf-8\r\n";
104
105 static const char* http_nocache = "Cache-Control: no-cache\r\n";
106
to_hex(uint32_t value,int width)107 static string to_hex(uint32_t value, int width)
108 {
109 std::stringstream sidstream;
110 sidstream << "0x" <<
111 std::setfill('0') << std::setw(width) <<
112 std::hex << value;
113 return sidstream.str();
114 }
115
send_http_response(Socket & s,const string & statuscode,const string & data,const string & content_type=http_contenttype_text)116 static bool send_http_response(Socket& s, const string& statuscode,
117 const string& data, const string& content_type = http_contenttype_text) {
118 string headers = statuscode;
119 headers += content_type;
120 headers += http_nocache;
121 headers += "\r\n";
122 headers += data;
123 ssize_t ret = s.send(headers.data(), headers.size(), MSG_NOSIGNAL);
124 if (ret == -1) {
125 cerr << "Failed to send response " << statuscode << " " << data << endl;
126 }
127 return ret != -1;
128 }
129
WebRadioInterface(CVirtualInput & in,int port,DecodeSettings ds,RadioReceiverOptions rro)130 WebRadioInterface::WebRadioInterface(CVirtualInput& in,
131 int port,
132 DecodeSettings ds,
133 RadioReceiverOptions rro) :
134 dabparams(1),
135 input(in),
136 spectrum_fft_handler(dabparams.T_u),
137 rro(rro),
138 decode_settings(ds)
139 {
140 {
141 // Ensure that rx always exists when rx_mut is free!
142 lock_guard<mutex> lock(rx_mut);
143
144 bool success = serverSocket.bind(port);
145 if (success) {
146 success = serverSocket.listen();
147 }
148
149 if (success) {
150 rx = make_unique<RadioReceiver>(*this, in, rro);
151 }
152
153 if (not rx) {
154 throw runtime_error("Could not initialise WebRadioInterface");
155 }
156
157 time_rx_created = chrono::system_clock::now();
158 rx->restart(false);
159 }
160
161 programme_handler_thread = thread(&WebRadioInterface::handle_phs, this);
162 }
163
~WebRadioInterface()164 WebRadioInterface::~WebRadioInterface()
165 {
166 running = false;
167 if (programme_handler_thread.joinable()) {
168 programme_handler_thread.join();
169 }
170
171 {
172 lock_guard<mutex> lock(rx_mut);
173 rx.reset();
174 }
175 }
176
177 class TuneFailed {};
178
check_decoders_required()179 void WebRadioInterface::check_decoders_required()
180 {
181 lock_guard<mutex> lock(rx_mut);
182 ASSERT_RX;
183
184 try {
185 for (auto& s : rx->getServiceList()) {
186 const auto sid = s.serviceId;
187
188 try {
189 const bool is_active = std::find_if(
190 carousel_services_active.cbegin(),
191 carousel_services_active.cend(),
192 [&](const ActiveCarouselService& acs) {
193 return acs.sid == sid;
194 }) != carousel_services_active.cend();
195
196 const bool require =
197 rx->serviceHasAudioComponent(s) and
198 (decode_settings.strategy == DecodeStrategy::All or
199 phs.at(sid).needsToBeDecoded() or
200 is_active);
201 const bool is_decoded = programmes_being_decoded[sid];
202
203 if (require and not is_decoded) {
204 bool success = rx->addServiceToDecode(phs.at(sid), "", s);
205
206 if (success) {
207 programmes_being_decoded[sid] = success;
208 }
209 else {
210 throw TuneFailed();
211 }
212 }
213 else if (is_decoded and not require) {
214 bool success = rx->removeServiceToDecode(s);
215
216 if (success) {
217 programmes_being_decoded[sid] = false;
218 }
219 else {
220 cerr << "Stop playing 0x" << to_hex(s.serviceId, 4) <<
221 " failed" << endl;
222 throw TuneFailed();
223 }
224 }
225 }
226 catch (const out_of_range&) {
227 cerr << "Cannot tune to 0x" << to_hex(s.serviceId, 4) <<
228 " because no handler exists!" << endl;
229 }
230 }
231 }
232 catch (const TuneFailed&) {
233 rx->restart_decoder();
234 phs.clear();
235 programmes_being_decoded.clear();
236 carousel_services_available.clear();
237 carousel_services_active.clear();
238 }
239 phs_changed.notify_all();
240 }
241
retune(const std::string & channel)242 void WebRadioInterface::retune(const std::string& channel)
243 {
244 // Ensure two closely occurring retune() calls don't get stuck
245 unique_lock<mutex> retune_lock(retune_mut);
246
247 auto freq = channels.getFrequency(channel);
248 if (freq == 0) {
249 cerr << "RETUNE Invalid channel: " << channel << endl;
250 return;
251 }
252
253 cerr << "RETUNE: Retune to " << freq << endl;
254
255 running = false;
256 if (programme_handler_thread.joinable()) {
257 programme_handler_thread.join();
258 }
259
260 cerr << "RETUNE Take ownership of RX" << endl;
261 {
262 unique_lock<mutex> lock(rx_mut);
263 // Even though it would be ok for rx to be inexistent here,
264 // we check to uncover errors.
265 ASSERT_RX;
266
267 cerr << "RETUNE Destroy RX" << endl;
268 rx.reset();
269
270 {
271 lock_guard<mutex> data_lock(data_mut);
272 last_dateTime = {};
273 last_snr = 0;
274 last_fine_correction = 0;
275 last_coarse_correction = 0;
276 }
277
278 synced = false;
279
280 {
281 lock_guard<mutex> fib_lock(fib_mut);
282 num_fic_crc_errors = 0;
283 }
284 tiis.clear();
285
286 cerr << "RETUNE Set frequency" << endl;
287 input.setFrequency(freq);
288 input.reset(); // Clear buffer
289
290 cerr << "RETUNE Restart RX" << endl;
291 rx = make_unique<RadioReceiver>(*this, input, rro);
292 if (not rx) {
293 throw runtime_error("Could not initialise RadioReceiver");
294 }
295
296 time_rx_created = chrono::system_clock::now();
297 rx->restart(false);
298
299 cerr << "RETUNE Start programme handler" << endl;
300 running = true;
301 programme_handler_thread = thread(&WebRadioInterface::handle_phs, this);
302 }
303 }
304
recv_line(Socket & s)305 static string recv_line(Socket& s) {
306 string line;
307 bool cr_seen = false;
308
309 while (true) {
310 char c = 0;
311 ssize_t ret = s.recv(&c, 1, 0);
312 if (ret == 0) {
313 return "";
314 }
315 else if (ret == -1) {
316 string errstr = strerror(errno);
317 cerr << "recv error " << errstr << endl;
318 return "";
319 }
320
321 line += c;
322
323 if (c == '\r') {
324 cr_seen = true;
325 }
326 else if (cr_seen and c == '\n') {
327 return line;
328 }
329 }
330 }
331
recv_exactly(Socket & s,size_t num_bytes)332 static vector<char> recv_exactly(Socket& s, size_t num_bytes)
333 {
334 vector<char> buf(num_bytes);
335 size_t rx = 0;
336
337 while (rx < num_bytes) {
338 const size_t remain = num_bytes - rx;
339 ssize_t ret = s.recv(buf.data() + rx, remain, 0);
340
341 if (ret == 0) {
342 break;
343 }
344 else if (ret == -1) {
345 string errstr = strerror(errno);
346 cerr << "recv error " << errstr << endl;
347 return {};
348 }
349 else {
350 rx += ret;
351 }
352 }
353
354 return buf;
355 }
356
split(const string & str,char c=' ')357 static vector<string> split(const string& str, char c = ' ')
358 {
359 const char *s = str.data();
360 vector<string> result;
361 do {
362 const char *begin = s;
363 while (*s != c && *s)
364 s++;
365 result.push_back(string(begin, s));
366 } while (0 != *s++);
367 return result;
368 }
369
370 struct http_request_t {
371 bool valid = false;
372
373 bool is_get = false;
374 bool is_post = false;
375 string url;
376 map<string, string> headers;
377 string post_data;
378 };
379
380
parse_http_headers(Socket & s)381 static http_request_t parse_http_headers(Socket& s) {
382 http_request_t r;
383
384 const auto first_line = recv_line(s);
385 const auto request_type = split(first_line);
386
387 if (request_type.size() != 3) {
388 cerr << "Malformed request: " << first_line << endl;
389 return r;
390 }
391 else if (request_type[0] == "GET") {
392 r.is_get = true;
393 }
394 else if (request_type[0] == "POST") {
395 r.is_post = true;
396 }
397 else {
398 return r;
399 }
400
401 r.url = request_type[1];
402
403 while (true) {
404 string header_line = recv_line(s);
405
406 if (header_line == "\r\n") {
407 break;
408 }
409
410 const auto header = split(header_line, ':');
411
412 if (header.size() == 2) {
413 r.headers.emplace(header[0], header[1]);
414 }
415 }
416
417 if (r.is_post) {
418 constexpr auto CL = "Content-Length";
419 if (r.headers.count(CL) == 1) {
420 try {
421 const int content_length = std::stoi(r.headers[CL]);
422 if (content_length > 1024 * 1024) {
423 cerr << "Unreasonable POST Content-Length: " << content_length << endl;
424 return r;
425 }
426
427 const auto buf = recv_exactly(s, content_length);
428 r.post_data = string(buf.begin(), buf.end());
429 }
430 catch (const invalid_argument&) {
431 cerr << "Cannot parse POST Content-Length: " << r.headers[CL] << endl;
432 return r;
433 }
434 catch (const out_of_range&) {
435 cerr << "Cannot represent POST Content-Length: " << r.headers[CL] << endl;
436 return r;
437 }
438 }
439 }
440
441 r.valid = true;
442 return r;
443 }
444
dispatch_client(Socket && client)445 bool WebRadioInterface::dispatch_client(Socket&& client)
446 {
447 Socket s(move(client));
448
449 bool success = false;
450
451 if (not s.valid()) {
452 cerr << "socket in dispatcher not valid!" << endl;
453 return false;
454 }
455
456 const auto req = parse_http_headers(s);
457
458 if (not req.valid) {
459 return false;
460 }
461 else {
462 if (req.is_get) {
463 if (req.url == "/") {
464 success = send_file(s, "index.html", http_contenttype_html);
465 }
466 else if (req.url == "/index.js") {
467 success = send_file(s, "index.js", http_contenttype_js);
468 }
469 else if (req.url == "/mux.json") {
470 success = send_mux_json(s);
471 }
472 else if (req.url == "/fic") {
473 success = send_fic(s);
474 }
475 else if (req.url == "/impulseresponse") {
476 success = send_impulseresponse(s);
477 }
478 else if (req.url == "/spectrum") {
479 success = send_spectrum(s);
480 }
481 else if (req.url == "/constellation") {
482 success = send_constellation(s);
483 }
484 else if (req.url == "/nullspectrum") {
485 success = send_null_spectrum(s);
486 }
487 else if (req.url == "/channel") {
488 success = send_channel(s);
489 }
490 else if (req.url == "/fftwindowplacement" or req.url == "/enablecoarsecorrector") {
491 send_http_response(s, http_405,
492 "405 Method Not Allowed\r\n" + req.url + " is POST-only");
493 return false;
494 }
495 else {
496 const regex regex_slide(R"(^[/]slide[/]([^ ]+))");
497 std::smatch match_slide;
498
499 const regex regex_mp3(R"(^[/]mp3[/]([^ ]+))");
500 std::smatch match_mp3;
501 if (regex_search(req.url, match_mp3, regex_mp3)) {
502 success = send_mp3(s, match_mp3[1]);
503 }
504 else if (regex_search(req.url, match_slide, regex_slide)) {
505 success = send_slide(s, match_slide[1]);
506 }
507 else {
508 cerr << "Could not understand GET request " << req.url << endl;
509 }
510 }
511 }
512 else if (req.is_post) {
513 if (req.url == "/channel") {
514 success = handle_channel_post(s, req.post_data);
515 }
516 else if (req.url == "/fftwindowplacement") {
517 success = handle_fft_window_placement_post(s, req.post_data);
518 }
519 else if (req.url == "/enablecoarsecorrector") {
520 success = handle_coarse_corrector_post(s, req.post_data);
521 }
522 else {
523 cerr << "Could not understand POST request " << req.url << endl;
524 }
525 }
526 else {
527 throw logic_error("valid req is neither GET nor POST!");
528 }
529
530 if (not success) {
531 send_http_response(s, http_404, "Could not understand request.\r\n");
532 }
533
534 return success;
535 }
536 }
537
send_file(Socket & s,const std::string & filename,const std::string & content_type)538 bool WebRadioInterface::send_file(Socket& s,
539 const std::string& filename,
540 const std::string& content_type)
541 {
542 FILE *fd = fopen(filename.c_str(), "r");
543 if (fd) {
544 if (not send_http_response(s, http_ok, "", content_type)) {
545 cerr << "Failed to send file headers" << endl;
546 fclose(fd);
547 return false;
548 }
549
550 vector<char> data(1024);
551 ssize_t ret = 0;
552 do {
553 ret = fread(data.data(), 1, data.size(), fd);
554 ret = s.send(data.data(), ret, MSG_NOSIGNAL);
555 if (ret == -1) {
556 cerr << "Failed to send file data" << endl;
557 fclose(fd);
558 return false;
559 }
560
561 } while (ret > 0);
562
563 fclose(fd);
564 return true;
565 }
566 else {
567 return send_http_response(s, http_500, "file '" + filename + "' is missing!");
568 }
569 return false;
570 }
571
calculate_cir_peaks(const vector<float> & cir_linear)572 static vector<PeakJson> calculate_cir_peaks(const vector<float>& cir_linear)
573 {
574 constexpr size_t num_peaks = 6;
575
576 vector<PeakJson> peaks;
577
578 if (not cir_linear.empty()) {
579 vector<float> cir_lin(cir_linear);
580
581 // Every time we find a peak, we attenuate it, including
582 // its surrounding values, and we go search the next peak.
583 for (size_t peak = 0; peak < num_peaks; peak++) {
584 PeakJson p;
585 for (size_t i = 1; i < cir_lin.size(); i++) {
586 if (cir_lin[i] > p.value) {
587 p.value = cir_lin[i];
588 p.index = i;
589 }
590 }
591
592 const size_t windowsize = 25;
593 for (size_t j = 0; j < windowsize; j++) {
594 const ssize_t i = p.index + j - windowsize/2;
595 if (i >= 0 and i < (ssize_t)cir_lin.size()) {
596 cir_lin[i] *= 0;
597 }
598 }
599
600 peaks.push_back(move(p));
601 }
602 }
603
604 return peaks;
605 }
606
send_mux_json(Socket & s)607 bool WebRadioInterface::send_mux_json(Socket& s)
608 {
609 MuxJson mux_json;
610
611 mux_json.receiver.software.name = "welle.io";
612 mux_json.receiver.software.version = VERSION;
613 mux_json.receiver.software.fftwindowplacement = fftPlacementMethodToString(rro.fftPlacementMethod);
614 mux_json.receiver.software.coarsecorrectorenabled = not rro.disableCoarseCorrector;
615 mux_json.receiver.software.freqsyncmethod = freqSyncMethodToString(rro.freqsyncMethod);
616 mux_json.receiver.software.lastchannelchange = time_rx_created;
617 mux_json.receiver.hardware.name = input.getDescription();
618 mux_json.receiver.hardware.gain = input.getGain();
619
620 {
621 lock_guard<mutex> lock(fib_mut);
622 mux_json.demodulator_fic_numcrcerrors = num_fic_crc_errors;
623 }
624
625 {
626 lock_guard<mutex> lock(rx_mut);
627 ASSERT_RX;
628
629 mux_json.ensemble.label = rx->getEnsembleLabel();
630
631 mux_json.ensemble.id = to_hex(rx->getEnsembleId(), 4);
632 mux_json.ensemble.ecc = to_hex(rx->getEnsembleEcc(), 2);
633
634 for (const auto& s : rx->getServiceList()) {
635 ServiceJson service;
636 service.sid = to_hex(s.serviceId, 4);
637 service.programType = s.programType;
638 service.ptystring = DABConstants::getProgramTypeName(s.programType);
639 service.language = s.language;
640 service.languagestring = DABConstants::getLanguageName(s.language);
641 service.label = s.serviceLabel;
642 service.url_mp3 = "";
643
644 for (const auto& sc : rx->getComponents(s)) {
645 ComponentJson component;
646 component.componentnr = sc.componentNr;
647 component.primary = (sc.PS_flag ? true : false);
648 component.caflag = (sc.CAflag ? true : false);
649 component.label = sc.componentLabel;
650
651 const auto sub = rx->getSubchannel(sc);
652
653 switch (sc.transportMode()) {
654 case TransportMode::Audio:
655 component.transportmode = "audio";
656 component.ascty = make_unique<string>(
657 string{
658 (sc.audioType() == AudioServiceComponentType::DAB ? "DAB" :
659 sc.audioType() == AudioServiceComponentType::DABPlus ? "DAB+" :
660 "unknown")});
661 if (sc.audioType() == AudioServiceComponentType::DAB or
662 sc.audioType() == AudioServiceComponentType::DABPlus) {
663 string urlmp3 = "/mp3/" + to_hex(s.serviceId, 4);
664 service.url_mp3 = urlmp3;
665 }
666 break;
667 case TransportMode::FIDC:
668 component.transportmode = "fidc";
669 component.dscty = make_unique<uint16_t>(sc.DSCTy);
670 break;
671 case TransportMode::PacketData:
672 component.transportmode = "packetdata";
673 component.scid = make_unique<uint16_t>(sc.SCId);
674 break;
675 case TransportMode::StreamData:
676 component.transportmode = "streamdata";
677 component.dscty = make_unique<uint16_t>(sc.DSCTy);
678 break;
679 }
680
681 component.subchannel = sub;
682
683 service.components.push_back(move(component));
684 }
685
686 try {
687 const auto& wph = phs.at(s.serviceId);
688 const auto al = wph.getAudioLevels();
689 service.audiolevel_present = true;
690 service.audiolevel_time = chrono::system_clock::to_time_t(al.time);
691 service.audiolevel_left = al.last_audioLevel_L;
692 service.audiolevel_right = al.last_audioLevel_R;
693
694 service.channels = 2;
695 service.samplerate = wph.rate;
696 service.mode = wph.mode;
697
698 auto mot = wph.getMOT();
699 service.mot_time = chrono::system_clock::to_time_t(mot.time);
700 service.mot_lastchange = chrono::system_clock::to_time_t(mot.last_changed);
701
702 auto dls = wph.getDLS();
703 service.dls_label = dls.label;
704 service.dls_time = chrono::system_clock::to_time_t(dls.time);
705 service.dls_lastchange = chrono::system_clock::to_time_t(mot.last_changed);
706
707 auto errorcounters = wph.getErrorCounters();
708 service.errorcounters_frameerrors = errorcounters.num_frameErrors;
709 service.errorcounters_rserrors = errorcounters.num_rsErrors;
710 service.errorcounters_aacerrors = errorcounters.num_aacErrors;
711 service.errorcounters_time = chrono::system_clock::to_time_t(dls.time);
712
713 auto xpad_err = wph.getXPADErrors();
714 service.xpaderror_haserror = xpad_err.has_error;
715 if (xpad_err.has_error) {
716 service.xpaderror_announcedlen = xpad_err.announced_xpad_len;
717 service.xpaderror_len = xpad_err.xpad_len;
718 service.xpaderror_time = chrono::system_clock::to_time_t(xpad_err.time);
719 }
720 }
721 catch (const out_of_range&) {
722 service.audiolevel_present = false;
723 service.channels = 0;
724 service.samplerate = 0;
725 service.mode = "invalid";
726 }
727
728 mux_json.services.push_back(move(service));
729 }
730 }
731
732 {
733 lock_guard<mutex> lock(data_mut);
734
735 mux_json.utctime.year = last_dateTime.year;
736 mux_json.utctime.month = last_dateTime.month;
737 mux_json.utctime.day = last_dateTime.day;
738 mux_json.utctime.hour = last_dateTime.hour;
739 mux_json.utctime.minutes = last_dateTime.minutes;
740 mux_json.utctime.lto = last_dateTime.hourOffset + ((double)last_dateTime.minuteOffset / 30.0);
741
742 for (const auto& m : pending_messages) {
743 using namespace chrono;
744
745 stringstream ss;
746
747 const auto ms = duration_cast<milliseconds>(
748 m.timestamp.time_since_epoch());
749
750 const auto s = duration_cast<seconds>(ms);
751 const std::time_t t = s.count();
752 const std::size_t fractional_seconds = ms.count() % 1000;
753
754 ss << std::ctime(&t) << "." << fractional_seconds;
755
756 switch (m.level) {
757 case message_level_t::Information:
758 ss << " INFO : ";
759 break;
760 case message_level_t::Error:
761 ss << " ERROR: ";
762 break;
763 }
764
765 ss << m.text;
766 mux_json.messages.push_back(ss.str());
767 }
768
769 pending_messages.clear();
770
771 mux_json.demodulator_snr = last_snr;
772 mux_json.demodulator_frequencycorrection = last_fine_correction + last_coarse_correction;
773 mux_json.demodulator_timelastfct0frame = rx->getReceiverStats().timeLastFCT0Frame;
774
775 mux_json.tii = getTiiStats();
776 }
777
778 {
779 lock_guard<mutex> lock(plotdata_mut);
780 mux_json.cir_peaks = calculate_cir_peaks(last_CIR);
781 }
782
783 if (not send_http_response(s, http_ok, "", http_contenttype_json)) {
784 return false;
785 }
786
787 const auto json_str = build_mux_json(mux_json);
788
789 ssize_t ret = s.send(json_str.c_str(), json_str.size(), MSG_NOSIGNAL);
790 if (ret == -1) {
791 cerr << "Failed to send mux.json data" << endl;
792 return false;
793 }
794 return true;
795 }
796
send_mp3(Socket & s,const std::string & stream)797 bool WebRadioInterface::send_mp3(Socket& s, const std::string& stream)
798 {
799 unique_lock<mutex> lock(rx_mut);
800 ASSERT_RX;
801
802 for (const auto& srv : rx->getServiceList()) {
803 if (rx->serviceHasAudioComponent(srv) and
804 (to_hex(srv.serviceId, 4) == stream or
805 (uint32_t)std::stoul(stream) == srv.serviceId)) {
806 try {
807 auto& ph = phs.at(srv.serviceId);
808
809 lock.unlock();
810
811 if (not send_http_response(s, http_ok, "", http_contenttype_mp3)) {
812 cerr << "Failed to send mp3 headers" << endl;
813 return false;
814 }
815
816 ProgrammeSender sender(move(s));
817
818 cerr << "Registering mp3 sender" << endl;
819 ph.registerSender(&sender);
820 check_decoders_required();
821 sender.wait_for_termination();
822
823 cerr << "Removing mp3 sender" << endl;
824 ph.removeSender(&sender);
825 check_decoders_required();
826
827 return true;
828 }
829 catch (const out_of_range& e) {
830 cerr << "Could not setup mp3 sender for " <<
831 srv.serviceId << ": " << e.what() << endl;
832
833 send_http_response(s, http_503, e.what());
834 return false;
835 }
836 }
837 }
838 return false;
839 }
840
send_slide(Socket & s,const std::string & stream)841 bool WebRadioInterface::send_slide(Socket& s, const std::string& stream)
842 {
843 for (const auto& wph : phs) {
844 if (to_hex(wph.first, 4) == stream or
845 (uint32_t)std::stoul(stream) == wph.first) {
846 const auto mot = wph.second.getMOT();
847
848 if (mot.data.empty()) {
849 send_http_response(s, http_404, "404 Not Found\r\nSlide not available.\r\n");
850 return true;
851 }
852
853 stringstream headers;
854 headers << http_ok;
855
856 headers << "Content-Type: ";
857 switch (mot.subtype) {
858 case MOTType::Unknown:
859 headers << "application/octet-stream";
860 break;
861 case MOTType::JPEG:
862 headers << "image/jpeg";
863 break;
864 case MOTType::PNG:
865 headers << "image/png";
866 break;
867 }
868 headers << "\r\n";
869
870 headers << http_nocache;
871
872 headers << "Last-Modified: ";
873 std::time_t t = chrono::system_clock::to_time_t(mot.time);
874 headers << put_time(std::gmtime(&t), "%a, %d %b %Y %T GMT");
875 headers << "\r\n";
876
877 headers << "\r\n";
878 const auto headers_str = headers.str();
879 int ret = s.send(headers_str.data(), headers_str.size(), MSG_NOSIGNAL);
880 if (ret == (ssize_t)headers_str.size()) {
881 ret = s.send(mot.data.data(), mot.data.size(), MSG_NOSIGNAL);
882 }
883
884 if (ret == -1) {
885 cerr << "Failed to send slide" << endl;
886 }
887
888 return true;
889 }
890 }
891 return false;
892 }
893
send_fic(Socket & s)894 bool WebRadioInterface::send_fic(Socket& s)
895 {
896 if (not send_http_response(s, http_ok, "", http_contenttype_data)) {
897 cerr << "Failed to send FIC headers" << endl;
898 return false;
899 }
900
901 while (true) {
902 unique_lock<mutex> lock(fib_mut);
903 while (fib_blocks.empty()) {
904 new_fib_block_available.wait_for(lock, chrono::seconds(1));
905 }
906 ssize_t ret = s.send(fib_blocks.front().data(),
907 fib_blocks.front().size(), MSG_NOSIGNAL);
908 if (ret == -1) {
909 cerr << "Failed to send FIC data" << endl;
910 return false;
911 }
912
913 fib_blocks.pop_front();
914 }
915 return true;
916 }
917
send_impulseresponse(Socket & s)918 bool WebRadioInterface::send_impulseresponse(Socket& s)
919 {
920 if (not send_http_response(s, http_ok, "", http_contenttype_data)) {
921 cerr << "Failed to send CIR headers" << endl;
922 return false;
923 }
924
925 lock_guard<mutex> lock(plotdata_mut);
926 vector<float> cir_db(last_CIR.size());
927 std::transform(last_CIR.begin(), last_CIR.end(), cir_db.begin(),
928 [](float y) { return 10.0f * log10(y); });
929
930 size_t lengthBytes = cir_db.size() * sizeof(float);
931 ssize_t ret = s.send(cir_db.data(), lengthBytes, MSG_NOSIGNAL);
932 if (ret == -1) {
933 cerr << "Failed to send CIR data" << endl;
934 return false;
935 }
936
937 return true;
938 }
939
send_fft_data(Socket & s,DSPCOMPLEX * spectrumBuffer,size_t T_u)940 static bool send_fft_data(Socket& s, DSPCOMPLEX *spectrumBuffer, size_t T_u)
941 {
942 vector<float> spectrum(T_u);
943
944 // Shift FFT samples
945 const size_t half_Tu = T_u / 2;
946 for (size_t i = 0; i < half_Tu; i++) {
947 spectrum[i] = abs(spectrumBuffer[i + half_Tu]);
948 }
949 for (size_t i = half_Tu; i < T_u; i++) {
950 spectrum[i] = abs(spectrumBuffer[i - half_Tu]);
951 }
952
953 if (not send_http_response(s, http_ok, "", http_contenttype_data)) {
954 cerr << "Failed to send spectrum headers" << endl;
955 return false;
956 }
957
958 size_t lengthBytes = spectrum.size() * sizeof(float);
959 ssize_t ret = s.send(spectrum.data(), lengthBytes, MSG_NOSIGNAL);
960 if (ret == -1) {
961 cerr << "Failed to send spectrum data" << endl;
962 return false;
963 }
964
965 return true;
966 }
967
send_spectrum(Socket & s)968 bool WebRadioInterface::send_spectrum(Socket& s)
969 {
970 // Get FFT buffer
971 DSPCOMPLEX* spectrumBuffer = spectrum_fft_handler.getVector();
972 auto samples = input.getSpectrumSamples(dabparams.T_u);
973
974 // Continue only if we got data
975 if (samples.size() != (size_t)dabparams.T_u)
976 return false;
977
978 std::copy(samples.begin(), samples.end(), spectrumBuffer);
979
980 // Do FFT to get the spectrum
981 spectrum_fft_handler.do_FFT();
982
983 return send_fft_data(s, spectrumBuffer, dabparams.T_u);
984 }
985
send_null_spectrum(Socket & s)986 bool WebRadioInterface::send_null_spectrum(Socket& s)
987 {
988 // Get FFT buffer
989 DSPCOMPLEX* spectrumBuffer = spectrum_fft_handler.getVector();
990
991 lock_guard<mutex> lock(plotdata_mut);
992 if (last_NULL.empty()) {
993 return false;
994 }
995 else if (last_NULL.size() != (size_t)dabparams.T_null) {
996 cerr << "Invalid NULL size " << last_NULL.size() << endl;
997 return false;
998 }
999
1000 copy(last_NULL.begin(), last_NULL.begin() + dabparams.T_u, spectrumBuffer);
1001
1002 // Do FFT to get the spectrum
1003 spectrum_fft_handler.do_FFT();
1004
1005 return send_fft_data(s, spectrumBuffer, dabparams.T_u);
1006 }
1007
send_constellation(Socket & s)1008 bool WebRadioInterface::send_constellation(Socket& s)
1009 {
1010 const size_t decim = OfdmDecoder::constellationDecimation;
1011 const size_t num_iqpoints = (dabparams.L-1) * dabparams.K / decim;
1012 std::vector<float> phases(num_iqpoints);
1013
1014 lock_guard<mutex> lock(plotdata_mut);
1015 if (last_constellation.size() == num_iqpoints) {
1016 phases.resize(num_iqpoints);
1017 for (size_t i = 0; i < num_iqpoints; i++) {
1018 const float y = 180.0f / (float)M_PI * std::arg(last_constellation[i]);
1019 phases[i] = y;
1020 }
1021
1022 if (not send_http_response(s, http_ok, "", http_contenttype_data)) {
1023 cerr << "Failed to send constellation headers" << endl;
1024 return false;
1025 }
1026
1027 size_t lengthBytes = phases.size() * sizeof(float);
1028 ssize_t ret = s.send(phases.data(), lengthBytes, MSG_NOSIGNAL);
1029 if (ret == -1) {
1030 cerr << "Failed to send constellation data" << endl;
1031 return false;
1032 }
1033
1034 return true;
1035 }
1036
1037 return false;
1038 }
1039
send_channel(Socket & s)1040 bool WebRadioInterface::send_channel(Socket& s)
1041 {
1042 const auto freq = input.getFrequency();
1043
1044 try {
1045 const auto chan = channels.getChannelForFrequency(freq);
1046
1047 string response = http_ok;
1048 response += http_contenttype_text;
1049 response += http_nocache;
1050 response += "\r\n";
1051 response += chan;
1052 ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1053 if (ret == -1) {
1054 cerr << "Failed to send frequency" << endl;
1055 return false;
1056 }
1057 }
1058 catch (const out_of_range& e) {
1059 string response = http_500;
1060 response += http_contenttype_text;
1061 response += http_nocache;
1062 response += "\r\n";
1063 response += "Error: ";
1064 response += e.what();
1065 ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1066 if (ret == -1) {
1067 cerr << "Failed to send frequency 500" << endl;
1068 return false;
1069 }
1070 }
1071 return true;
1072 }
1073
handle_fft_window_placement_post(Socket & s,const std::string & fft_window_placement)1074 bool WebRadioInterface::handle_fft_window_placement_post(Socket& s, const std::string& fft_window_placement)
1075 {
1076 cerr << "POST fft window: " << fft_window_placement << endl;
1077
1078 if (fft_window_placement == "EarliestPeakWithBinning") {
1079 rro.fftPlacementMethod = FFTPlacementMethod::EarliestPeakWithBinning;
1080 }
1081 else if (fft_window_placement == "StrongestPeak") {
1082 rro.fftPlacementMethod = FFTPlacementMethod::StrongestPeak;
1083 }
1084 else if (fft_window_placement == "ThresholdBeforePeak") {
1085 rro.fftPlacementMethod = FFTPlacementMethod::ThresholdBeforePeak;
1086 }
1087 else {
1088 string response = http_400;
1089 response += http_contenttype_text;
1090 response += http_nocache;
1091 response += "\r\n";
1092 response += "Invalid FFT Window Placement requested.";
1093 ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1094 if (ret == -1) {
1095 cerr << "Failed to send frequency" << endl;
1096 return false;
1097 }
1098 return true;
1099 }
1100
1101 {
1102 lock_guard<mutex> lock(rx_mut);
1103 ASSERT_RX;
1104 rx->setReceiverOptions(rro);
1105 }
1106
1107 string response = http_ok;
1108 response += http_contenttype_text;
1109 response += http_nocache;
1110 response += "\r\n";
1111 response += "Switched FFT Window Placement.";
1112 ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1113 if (ret == -1) {
1114 cerr << "Failed to send frequency" << endl;
1115 return false;
1116 }
1117 return true;
1118 }
1119
handle_coarse_corrector_post(Socket & s,const std::string & coarseCorrector)1120 bool WebRadioInterface::handle_coarse_corrector_post(Socket& s, const std::string& coarseCorrector)
1121 {
1122 cerr << "POST coarse : " << coarseCorrector << endl;
1123
1124 if (coarseCorrector == "0") {
1125 rro.disableCoarseCorrector = true;
1126 }
1127 else if (coarseCorrector == "1") {
1128 rro.disableCoarseCorrector = false;
1129 }
1130 else {
1131 string response = http_400;
1132 response += http_contenttype_text;
1133 response += http_nocache;
1134 response += "\r\n";
1135 response += "Invalid coarse corrector selected";
1136 ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1137 if (ret == -1) {
1138 cerr << "Failed to set response" << endl;
1139 return false;
1140 }
1141 return true;
1142 }
1143
1144 {
1145 lock_guard<mutex> lock(rx_mut);
1146 ASSERT_RX;
1147 rx->setReceiverOptions(rro);
1148 }
1149
1150 string response = http_ok;
1151 response += http_contenttype_text;
1152 response += http_nocache;
1153 response += "\r\n";
1154 response += "Switched Coarse corrector.";
1155 ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1156 if (ret == -1) {
1157 cerr << "Failed to send coarse switch confirmation" << endl;
1158 return false;
1159 }
1160 return true;
1161 }
1162
handle_channel_post(Socket & s,const std::string & channel)1163 bool WebRadioInterface::handle_channel_post(Socket& s, const std::string& channel)
1164 {
1165 cerr << "POST channel: " << channel << endl;
1166
1167 retune(channel);
1168
1169 string response = http_ok;
1170 response += http_contenttype_text;
1171 response += http_nocache;
1172 response += "\r\n";
1173 response += "Retuning...";
1174 ssize_t ret = s.send(response.data(), response.size(), MSG_NOSIGNAL);
1175 if (ret == -1) {
1176 cerr << "Failed to send frequency" << endl;
1177 return false;
1178 }
1179 return true;
1180 }
1181
handle_phs()1182 void WebRadioInterface::handle_phs()
1183 {
1184 while (running) {
1185 this_thread::sleep_for(chrono::seconds(2));
1186
1187 unique_lock<mutex> lock(rx_mut);
1188 ASSERT_RX;
1189
1190 auto serviceList = rx->getServiceList();
1191 for (auto& s : serviceList) {
1192 auto scs = rx->getComponents(s);
1193
1194 if (std::find(
1195 carousel_services_available.cbegin(),
1196 carousel_services_available.cend(),
1197 s.serviceId) == carousel_services_available.cend()) {
1198 for (auto& sc : scs) {
1199 if (sc.transportMode() == TransportMode::Audio) {
1200 carousel_services_available.push_back(s.serviceId);
1201 }
1202 }
1203 }
1204
1205 if (phs.count(s.serviceId) == 0) {
1206 WebProgrammeHandler ph(s.serviceId);
1207 phs.emplace(std::make_pair(s.serviceId, move(ph)));
1208 }
1209 }
1210
1211 using namespace chrono;
1212 size_t max_services_in_carousel = std::min(
1213 carousel_services_available.size(),
1214 (size_t)decode_settings.num_decoders_in_carousel);
1215
1216 if (decode_settings.strategy == DecodeStrategy::Carousel10) {
1217 while (carousel_services_active.size() < max_services_in_carousel) {
1218 carousel_services_active.emplace_back(
1219 carousel_services_available.front());
1220 carousel_services_available.pop_front();
1221 }
1222
1223 for (auto& acs : carousel_services_active) {
1224 if (acs.time_change + chrono::seconds(10) <
1225 chrono::steady_clock::now()) {
1226 acs.sid = 0;
1227
1228 if (not carousel_services_available.empty()) {
1229 carousel_services_active.emplace_back(
1230 carousel_services_available.front());
1231 carousel_services_available.pop_front();
1232 }
1233 }
1234 }
1235 }
1236 else if (decode_settings.strategy == DecodeStrategy::CarouselPAD) {
1237 while (carousel_services_active.size() < max_services_in_carousel) {
1238 if (not serviceList.empty()) {
1239 carousel_services_active.emplace_back(
1240 carousel_services_available.front());
1241 carousel_services_available.pop_front();
1242 }
1243 }
1244
1245 for (auto& acs : carousel_services_active) {
1246 if (acs.time_change + chrono::seconds(5) <
1247 chrono::steady_clock::now()) {
1248 auto current_it = phs.find(acs.sid);
1249 if (current_it == phs.end()) {
1250 cerr << "Reset service decoder carousel!"
1251 "Cannot find service "
1252 << acs.sid << endl;
1253 acs.sid = 0;
1254 }
1255 else {
1256 // Switch to next programme once both DLS and Slideshow
1257 // got decoded, but at most after 80 seconds
1258 const auto now = system_clock::now();
1259 const auto mot = current_it->second.getMOT();
1260 const auto dls = current_it->second.getDLS();
1261 // Slide and DLS received in the last 60 seconds?
1262 const bool switchBecausePAD = (
1263 now - mot.time < seconds(60) and
1264 now - dls.time < seconds(60));
1265
1266 const bool switchBecauseLate =
1267 acs.time_change + seconds(80) < steady_clock::now();
1268
1269 if (switchBecausePAD or switchBecauseLate) {
1270 acs.sid = 0;
1271
1272 if (not carousel_services_available.empty()) {
1273 carousel_services_active.emplace_back(
1274 carousel_services_available.front());
1275 carousel_services_available.pop_front();
1276 }
1277 }
1278 }
1279 }
1280 }
1281 }
1282
1283 carousel_services_active.erase(
1284 remove_if(
1285 carousel_services_active.begin(),
1286 carousel_services_active.end(),
1287 [](const ActiveCarouselService& acs){
1288 return acs.sid == 0;
1289 }), carousel_services_active.end());
1290 lock.unlock();
1291 check_decoders_required();
1292 }
1293
1294 cerr << "TEARDOWN Cancel all PHs and remove services" << endl;
1295 {
1296 lock_guard<mutex> lock(rx_mut);
1297 for (auto& ph : phs) {
1298 ph.second.cancelAll();
1299
1300 const auto srv = rx->getService(ph.first);
1301 if (srv.serviceId != 0) {
1302 (void)rx->removeServiceToDecode(srv);
1303 }
1304 }
1305 }
1306
1307 cerr << "TEARDOWN Stop rx" << endl;
1308 {
1309 unique_lock<mutex> lock(rx_mut);
1310 rx->stop();
1311 }
1312 }
1313
1314 #if HAVE_SIGACTION
1315 static volatile sig_atomic_t sig_caught = 0;
handler(int)1316 static void handler(int /*signum*/)
1317 {
1318 sig_caught = 1;
1319 }
1320 #else
1321 const int sig_caught = 0;
1322 #endif
1323
serve()1324 void WebRadioInterface::serve()
1325 {
1326 deque<future<bool> > running_connections;
1327
1328 #if HAVE_SIGACTION
1329 struct sigaction sa = {};
1330 sa.sa_handler = handler;
1331 sigemptyset(&sa.sa_mask);
1332 if (sigaction(SIGINT, &sa, NULL) == -1) {
1333 cerr << "Failed to set up signal handler" << endl;
1334 }
1335 #endif
1336
1337 while (sig_caught == 0) {
1338 auto client = serverSocket.accept();
1339
1340 running_connections.push_back(async(launch::async,
1341 &WebRadioInterface::dispatch_client, this, move(client)));
1342
1343 deque<future<bool> > still_running_connections;
1344 for (auto& fut : running_connections) {
1345 if (fut.valid()) {
1346 switch (fut.wait_for(chrono::milliseconds(1))) {
1347 case future_status::deferred:
1348 case future_status::timeout:
1349 still_running_connections.push_back(move(fut));
1350 break;
1351 case future_status::ready:
1352 fut.get();
1353 break;
1354 }
1355 }
1356 }
1357 running_connections = move(still_running_connections);
1358 }
1359
1360 cerr << "SERVE No more connections running" << endl;
1361
1362 running = false;
1363 if (programme_handler_thread.joinable()) {
1364 programme_handler_thread.join();
1365 }
1366
1367 cerr << "SERVE Wait for all futures to clear" << endl;
1368 while (running_connections.size() > 0) {
1369 deque<future<bool> > still_running_connections;
1370 for (auto& fut : running_connections) {
1371 if (fut.valid()) {
1372 switch (fut.wait_for(chrono::milliseconds(1))) {
1373 case future_status::deferred:
1374 case future_status::timeout:
1375 still_running_connections.push_back(move(fut));
1376 break;
1377 case future_status::ready:
1378 fut.get();
1379 break;
1380 }
1381 }
1382 }
1383 running_connections = move(still_running_connections);
1384 }
1385
1386 cerr << "SERVE clear remaining data structures" << endl;
1387 phs.clear();
1388 programmes_being_decoded.clear();
1389 carousel_services_available.clear();
1390 carousel_services_active.clear();
1391 }
1392
onSNR(float snr)1393 void WebRadioInterface::onSNR(float snr)
1394 {
1395 lock_guard<mutex> lock(data_mut);
1396 last_snr = snr;
1397 }
1398
onFrequencyCorrectorChange(int fine,int coarse)1399 void WebRadioInterface::onFrequencyCorrectorChange(int fine, int coarse)
1400 {
1401 lock_guard<mutex> lock(data_mut);
1402 last_fine_correction = fine;
1403 last_coarse_correction = coarse;
1404 }
1405
onSyncChange(char isSync)1406 void WebRadioInterface::onSyncChange(char isSync)
1407 {
1408 synced = isSync;
1409 }
1410
onSignalPresence(bool)1411 void WebRadioInterface::onSignalPresence(bool /*isSignal*/) { }
onServiceDetected(uint32_t)1412 void WebRadioInterface::onServiceDetected(uint32_t /*sId*/) { }
onNewEnsemble(uint16_t)1413 void WebRadioInterface::onNewEnsemble(uint16_t /*eId*/) { }
onSetEnsembleLabel(DabLabel &)1414 void WebRadioInterface::onSetEnsembleLabel(DabLabel& /*label*/) { }
1415
onDateTimeUpdate(const dab_date_time_t & dateTime)1416 void WebRadioInterface::onDateTimeUpdate(const dab_date_time_t& dateTime)
1417 {
1418 lock_guard<mutex> lock(data_mut);
1419 last_dateTime = dateTime;
1420 }
1421
onFIBDecodeSuccess(bool crcCheckOk,const uint8_t * fib)1422 void WebRadioInterface::onFIBDecodeSuccess(bool crcCheckOk, const uint8_t* fib)
1423 {
1424 if (not crcCheckOk) {
1425 lock_guard<mutex> lock(fib_mut);
1426 num_fic_crc_errors++;
1427 return;
1428 }
1429
1430 // Convert the fib bitvector to bytes
1431 vector<uint8_t> buf(32);
1432 for (size_t i = 0; i < buf.size(); i++) {
1433 uint8_t v = 0;
1434 for (int j = 0; j < 8; j++) {
1435 if (fib[8*i+j]) {
1436 v |= 1 << (7-j);
1437 }
1438 }
1439 buf[i] = v;
1440 }
1441
1442 {
1443 lock_guard<mutex> lock(fib_mut);
1444 fib_blocks.push_back(move(buf));
1445
1446 if (fib_blocks.size() > 3*250) { // six seconds
1447 fib_blocks.pop_front();
1448 }
1449 }
1450
1451 new_fib_block_available.notify_one();
1452 }
1453
onNewImpulseResponse(std::vector<float> && data)1454 void WebRadioInterface::onNewImpulseResponse(std::vector<float>&& data)
1455 {
1456 lock_guard<mutex> lock(plotdata_mut);
1457 last_CIR = move(data);
1458 }
1459
onNewNullSymbol(std::vector<DSPCOMPLEX> && data)1460 void WebRadioInterface::onNewNullSymbol(std::vector<DSPCOMPLEX>&& data)
1461 {
1462 lock_guard<mutex> lock(plotdata_mut);
1463 last_NULL = move(data);
1464 }
1465
onConstellationPoints(std::vector<DSPCOMPLEX> && data)1466 void WebRadioInterface::onConstellationPoints(std::vector<DSPCOMPLEX>&& data)
1467 {
1468 lock_guard<mutex> lock(plotdata_mut);
1469 last_constellation = move(data);
1470 }
1471
onMessage(message_level_t level,const std::string & text,const std::string & text2)1472 void WebRadioInterface::onMessage(message_level_t level, const std::string& text, const std::string& text2)
1473 {
1474 std::string fullText;
1475 if (text2.empty())
1476 fullText = text;
1477 else
1478 fullText = text + text2;
1479
1480 lock_guard<mutex> lock(data_mut);
1481 const auto now = std::chrono::system_clock::now();
1482 pending_message_t m = { .level = level, .text = fullText, .timestamp = now};
1483 pending_messages.emplace_back(move(m));
1484
1485 if (pending_messages.size() > MAX_PENDING_MESSAGES) {
1486 pending_messages.pop_front();
1487 }
1488 }
1489
onTIIMeasurement(tii_measurement_t && m)1490 void WebRadioInterface::onTIIMeasurement(tii_measurement_t&& m)
1491 {
1492 lock_guard<mutex> lock(data_mut);
1493 auto& l = tiis[make_pair(m.comb, m.pattern)];
1494 l.push_back(move(m));
1495
1496 if (l.size() > 20) {
1497 l.pop_front();
1498 }
1499 }
1500
onInputFailure()1501 void WebRadioInterface::onInputFailure()
1502 {
1503 std::exit(1);
1504 }
1505
getTiiStats()1506 list<tii_measurement_t> WebRadioInterface::getTiiStats()
1507 {
1508 list<tii_measurement_t> l;
1509
1510 for (const auto& cp_list : tiis) {
1511 const auto comb = cp_list.first.first;
1512 const auto pattern = cp_list.first.second;
1513
1514 if (cp_list.second.size() < 5) {
1515 continue;
1516 }
1517
1518 tii_measurement_t avg;
1519 avg.comb = comb;
1520 avg.pattern = pattern;
1521 vector<int> delays;
1522 double error = 0.0;
1523 size_t len = 0;
1524 for (const auto& meas : cp_list.second) {
1525 delays.push_back(meas.delay_samples);
1526 error += meas.error;
1527 len++;
1528 }
1529
1530 if (len > 0) {
1531 avg.error = error / len;
1532
1533 // Calculate the median
1534 std::nth_element(delays.begin(), delays.begin() + len/2, delays.end());
1535 avg.delay_samples = delays[len/2];
1536 }
1537 else {
1538 // To quiet static analysis check
1539 avg.error = 0.0;
1540 avg.delay_samples = 0;
1541 }
1542 l.push_back(move(avg));
1543 }
1544
1545 using namespace std::chrono;
1546 const auto now = steady_clock::now();
1547 // Remove a single entry every second to make the flukes
1548 // disappear
1549 if (time_last_tiis_clean + seconds(1) > now) {
1550 for (auto& cp_list : tiis) {
1551 cp_list.second.pop_front();
1552 }
1553
1554 time_last_tiis_clean = now;
1555 }
1556
1557 return l;
1558 }
1559