1 /*
2 * SRT - Secure, Reliable, Transport
3 * Copyright (c) 2018 Haivision Systems Inc.
4 *
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 *
9 */
10
11 /*****************************************************************************
12 written by
13 Haivision Systems Inc.
14 *****************************************************************************/
15
16 #include "platform_sys.h"
17
18 #include <atomic>
19 #include <iostream>
20 #include <iterator>
21 #include <vector>
22 #include <list>
23 #include <map>
24 #include <stdexcept>
25 #include <string>
26 #include <thread>
27 #include <chrono>
28 #include <deque>
29 #include <mutex>
30 #include <condition_variable>
31 #include <csignal>
32 #include <sys/stat.h>
33 #include <srt.h>
34 #include <udt.h>
35
36 #include "testactivemedia.hpp"
37
38 #include "apputil.hpp"
39 #include "uriparser.hpp"
40 #include "logsupport.hpp"
41 #include "logging.h"
42 #include "socketoptions.hpp"
43 #include "verbose.hpp"
44 #include "testmedia.hpp"
45 #include "threadname.h"
46
47
48
49
50 bool Upload(UriParser& srt, UriParser& file);
51 bool Download(UriParser& srt, UriParser& file);
52
53 srt_logging::Logger applog(SRT_LOGFA_APP, srt_logger_config, "srt-relay");
54
55 std::atomic<bool> g_program_established {false};
56
57 SrtModel* g_pending_model = nullptr;
58
59 thread::id g_root_thread = std::this_thread::get_id();
60
OnINT_SetInterrupted(int)61 static void OnINT_SetInterrupted(int)
62 {
63 Verb() << VerbLock << "SIGINT: Setting interrupt state.";
64 ::transmit_int_state = true;
65
66 // Just for a case, forcefully close all active SRT sockets.
67 SrtModel* pm = ::g_pending_model;
68 if (pm)
69 {
70 // The program is hanged on accepting a new SRT connection.
71 // We need to check which thread we've fallen into.
72 if (this_thread::get_id() == g_root_thread)
73 {
74 // Throw an exception, it will be caught in a predicted place.
75 throw std::runtime_error("Interrupted on request");
76 }
77 else
78 {
79 // This is some other thread, so close the listener socket.
80 // This will cause the accept block to be interrupted.
81 for (SRTSOCKET i: { pm->Socket(), pm->Listener() })
82 if (i != SRT_INVALID_SOCK)
83 srt_close(i);
84 }
85 }
86
87 }
88
89 using namespace std;
90
91 size_t g_chunksize = 0;
92 size_t g_default_live_chunksize = 1316;
93 size_t g_default_file_chunksize = 1456;
94
95 class SrtMainLoop
96 {
97 UriParser m_srtspec;
98
99 // Media used
100 unique_ptr<SrtRelay> m_srt_relay;
101 SourceMedium m_srt_source;
102 SourceMedium m_input_medium;
103 list<unique_ptr<TargetMedium>> m_output_media;
104 thread m_input_thr;
105 std::exception_ptr m_input_xp;
106
107 void InputRunner();
108 volatile bool m_input_running = false;
109
110 public:
111 SrtMainLoop(const string& srt_uri, bool input_echoback, const string& input_spec, const vector<string>& output_spec);
112
113 void run();
114
MakeStop()115 void MakeStop() { m_input_running = false; }
IsRunning()116 bool IsRunning() { return m_input_running; }
117
~SrtMainLoop()118 ~SrtMainLoop()
119 {
120 if (m_input_thr.joinable())
121 m_input_thr.join();
122 }
123 };
124
main(int argc,char ** argv)125 int main( int argc, char** argv )
126 {
127 OptionName
128 o_loglevel = { "ll", "loglevel" },
129 o_logfa = { "lf", "logfa" },
130 o_verbose = {"v", "verbose" },
131 o_input = {"i", "input"},
132 o_output = {"o", "output"},
133 o_echo = {"e", "io", "input-echoback"},
134 o_chunksize = {"c", "chunk"}
135 ;
136
137 // Options that expect no arguments (ARG_NONE) need not be mentioned.
138 vector<OptionScheme> optargs = {
139 { o_loglevel, OptionScheme::ARG_ONE },
140 { o_logfa, OptionScheme::ARG_ONE },
141 { o_input, OptionScheme::ARG_ONE },
142 { o_output, OptionScheme::ARG_VAR },
143 { o_chunksize, OptionScheme::ARG_ONE }
144 };
145 options_t params = ProcessOptions(argv, argc, optargs);
146
147 /*
148 cerr << "OPTIONS (DEBUG)\n";
149 for (auto o: params)
150 {
151 cerr << "[" << o.first << "] ";
152 copy(o.second.begin(), o.second.end(), ostream_iterator<string>(cerr, " "));
153 cerr << endl;
154 }
155 */
156
157 vector<string> args = params[""];
158 if ( args.size() != 1 )
159 {
160 cerr << "Usage: " << argv[0] << " <srt-endpoint> [ -i <input> | -e ] [ -o <output> ]\n";
161 cerr << "Options:\n";
162 cerr << "\t-v . . . . . . . . . . Verbose mode\n";
163 cerr << "\t-ll <level=error> . . . . . Log level for SRT\n";
164 cerr << "\t-lf <logfa=all> . . . . . Log Functional Areas enabled\n";
165 cerr << "\t-c <size=1316[live]|1456[file]> Single reading buffer size\n";
166 cerr << "\t-i <URI> . . . . . . . . Input medium spec\n";
167 cerr << "\t-o <URI> . . . . . . . . Output medium spec\n";
168 cerr << "\t-e . . . (conflicts with -i) Feed SRT output back to SRT input\n";
169 cerr << "\nNote: specify `transtype=file` for using TCP-like stream mode\n";
170 return 1;
171 }
172
173 string loglevel = Option<OutString>(params, "error", o_loglevel);
174 string logfa = Option<OutString>(params, "", o_logfa);
175 srt_logging::LogLevel::type lev = SrtParseLogLevel(loglevel);
176 UDT::setloglevel(lev);
177 if (logfa == "")
178 {
179 UDT::addlogfa(SRT_LOGFA_APP);
180 }
181 else
182 {
183 // Add only selected FAs
184 set<string> unknown_fas;
185 set<srt_logging::LogFA> fas = SrtParseLogFA(logfa, &unknown_fas);
186 UDT::resetlogfa(fas);
187
188 // The general parser doesn't recognize the "app" FA, we check it here.
189 if (unknown_fas.count("app"))
190 UDT::addlogfa(SRT_LOGFA_APP);
191 }
192
193 string verbo = Option<OutString>(params, "no", o_verbose);
194 if ( verbo == "" || !false_names.count(verbo) )
195 {
196 Verbose::on = true;
197 int verboch = atoi(verbo.c_str());
198 if (verboch <= 0)
199 {
200 verboch = 1;
201 }
202 else if (verboch > 2)
203 {
204 cerr << "ERROR: -v option accepts value 1 (stdout, default) or 2 (stderr)\n";
205 return 1;
206 }
207
208 if (verboch == 1)
209 {
210 Verbose::cverb = &std::cout;
211 }
212 else
213 {
214 Verbose::cverb = &std::cerr;
215 }
216 }
217
218 string chunk = Option<OutString>(params, "", o_chunksize);
219 if (chunk != "")
220 {
221 ::g_chunksize = stoi(chunk);
222 }
223
224 string srt_endpoint = args[0];
225
226 UriParser usrt(srt_endpoint);
227
228 if (usrt.scheme() != "srt")
229 {
230 cerr << "ERROR: the only one freestanding parameter should be an SRT uri.\n";
231 cerr << "Usage: " << argv[0] << " <srt-endpoint> [ -i <input> ] [ -o <output> ] [ -e ]\n";
232 return 1;
233 }
234
235 // Allowed are only one input and multiple outputs.
236 // Input-echoback is treated as a single input.
237 bool input_echoback = Option<OutString>(params, "no", o_echo) != "no";
238 string input_spec = Option<OutString>(params, "", o_input);
239
240 if (input_spec != "" && input_echoback)
241 {
242 cerr << "ERROR: input-echoback is treated as input specifcation, -i can't be specified together.\n";
243 return 1;
244 }
245
246 vector<string> output_spec = Option<OutList>(params, vector<string>{}, o_output);
247
248 if (!input_echoback)
249 {
250 if (input_spec == "" || output_spec.empty())
251 {
252 cerr << "ERROR: at least one input and one output must be specified (-io specifies both)\n";
253 return 1;
254 }
255 }
256
257 Verb() << "SETTINGS:";
258 Verb() << "SRT connection: " << srt_endpoint;
259 if (input_echoback)
260 {
261 Verb() << "INPUT: (from SRT connection)";
262 }
263 else
264 {
265 Verb() << "INPUT: " << input_spec;
266 }
267
268 Verb() << "OUTPUT LIST:";
269 if (input_echoback)
270 {
271 Verb() << "\t(back to SRT connection)";
272 }
273 for (auto& s: output_spec)
274 Verb() << "\t" << s;
275
276 #ifdef _MSC_VER
277 // Replacement for sigaction, just use 'signal'
278 // This may make this working kinda impaired and unexpected,
279 // but still better that not compiling at all.
280 signal(SIGINT, OnINT_SetInterrupted);
281 #else
282 struct sigaction sigIntHandler;
283
284 sigIntHandler.sa_handler = OnINT_SetInterrupted;
285 sigemptyset(&sigIntHandler.sa_mask);
286 sigIntHandler.sa_flags = 0;
287
288 sigaction(SIGINT, &sigIntHandler, NULL);
289 #endif
290
291 try
292 {
293 SrtMainLoop loop(srt_endpoint, input_echoback, input_spec, output_spec);
294 loop.run();
295 }
296 catch (std::exception& x)
297 {
298 cerr << "ERROR: " << x.what() << endl;
299 return 1;
300 }
301
302
303 return 0;
304 }
305
SrtMainLoop(const string & srt_uri,bool input_echoback,const string & input_spec,const vector<string> & output_spec)306 SrtMainLoop::SrtMainLoop(const string& srt_uri, bool input_echoback, const string& input_spec, const vector<string>& output_spec)
307 {
308 // Now prepare all media
309 // They use pointers instead of real variables
310 // so that the creation time can be delayed
311 // up to this moment, and the parameters prepared
312 // before passing to the constructors.
313
314 // Start with output media so that they are ready when
315 // the data come in.
316
317 for (string spec: output_spec)
318 {
319 Verb() << "Setting up output: " << spec;
320 unique_ptr<TargetMedium> m { new TargetMedium };
321 m->Setup(Target::Create(spec));
322 m_output_media.push_back(move(m));
323 }
324
325
326 // Start with SRT.
327
328 UriParser srtspec(srt_uri);
329 string transtype = srtspec["transtype"].deflt("live");
330
331 SrtModel m(srtspec.host(), srtspec.portno(), srtspec.parameters());
332
333 // Just to keep it unchanged.
334 string id = m_srtspec["streamid"];
335
336 Verb() << "Establishing SRT connection: " << srt_uri;
337
338 ::g_pending_model = &m;
339 m.Establish((id));
340
341 ::g_program_established = true;
342 ::g_pending_model = nullptr;
343
344 Verb() << "... Established. configuring other pipes:";
345
346 // Once it's ready, use it to initialize the medium.
347 bool file_mode = (transtype == "file");
348 if (g_chunksize == 0)
349 {
350 if (file_mode)
351 g_chunksize = g_default_file_chunksize;
352 else
353 g_chunksize = g_default_live_chunksize;
354
355 Verb() << "DEFAULT CHUNKSIZE used: " << g_chunksize;
356 }
357
358 m_srt_relay.reset(new SrtRelay);
359 m_srt_relay->StealFrom(m);
360
361 m_srt_source.Setup(m_srt_relay.get(), g_chunksize);
362
363 // Now check the input medium
364 if (input_echoback)
365 {
366 Verb() << "SRT set up as input source and the first output target";
367
368 // Add SRT medium to output targets, and keep input medium empty.
369 unique_ptr<TargetMedium> m { new TargetMedium };
370 m->Setup(m_srt_relay.get());
371 m_output_media.push_back(move(m));
372 }
373 else
374 {
375 // Initialize input medium and do not add SRT medium
376 // to the output list, as this will be fed directly
377 // by the data from this input medium in a spearate engine.
378 Verb() << "Setting up input: " << input_spec;
379 m_input_medium.Setup(Source::Create(input_spec), g_chunksize);
380
381 if (!file_mode)
382 {
383 // Also set writing to SRT non-blocking always.
384 bool no = false;
385 srt_setsockflag(m_srt_relay->Socket(), SRTO_SNDSYN, &no, sizeof no);
386 }
387 }
388
389 // We're done here.
390 Verb() << "MEDIA SUCCESSFULLY CREATED.";
391 }
392
InputRunner()393 void SrtMainLoop::InputRunner()
394 {
395 srt::ThreadName::set("InputRN");
396 // An extra thread with a loop that reads from the external input
397 // and writes into the SRT medium. When echoback mode is used,
398 // this thread isn't started at all and instead the SRT reading
399 // serves as both SRT reading and input reading.
400
401 auto on_return_set = OnReturnSet(m_input_running, false);
402
403 Verb() << VerbLock << "RUNNING INPUT LOOP";
404 for (;;)
405 {
406 applog.Debug() << "SrtMainLoop::InputRunner: extracting...";
407 auto data = m_input_medium.Extract();
408
409 if (data.payload.empty())
410 {
411 Verb() << "INPUT READING INTERRUPTED.";
412 break;
413 }
414
415 //Verb() << "INPUT [" << data.size() << "] " << VerbNoEOL;
416 applog.Debug() << "SrtMainLoop::InputRunner: [" << data.payload.size() << "] CLIENT -> SRT-RELAY";
417 m_srt_relay->Write(data);
418 }
419 }
420
run()421 void SrtMainLoop::run()
422 {
423 // Start the media runners.
424
425 Verb() << VerbLock << "STARTING OUTPUT threads:";
426
427 for (auto& o: m_output_media)
428 o->run();
429
430 Verb() << VerbLock << "STARTING SRT INPUT LOOP";
431 m_srt_source.run();
432
433 Verb() << VerbLock << "STARTING INPUT ";
434 if (m_input_medium.med)
435 {
436 m_input_medium.run();
437 m_input_running = true;
438
439 std::ostringstream tns;
440 tns << "Input:" << this;
441 srt::ThreadName tn(tns.str());
442 m_input_thr = thread([this] {
443 try {
444 InputRunner();
445 } catch (...) {
446 m_input_xp = std::current_exception();
447 }
448
449 Verb() << "INPUT: thread exit";
450 });
451 }
452
453 Verb() << VerbLock << "RUNNING SRT MEDIA LOOP";
454 for (;;)
455 {
456 applog.Debug() << "SrtMainLoop::run: SRT-RELAY: extracting...";
457 auto data = m_srt_source.Extract();
458
459 if (data.payload.empty())
460 {
461 Verb() << "SRT READING INTERRUPTED.";
462 break;
463 }
464
465 vector<string> output_report;
466 bool any = false;
467 int no = 1;
468
469 for (auto i = m_output_media.begin(), i_next = i; i != m_output_media.end(); i = i_next)
470 {
471 ++i_next;
472 auto& o = *i;
473 applog.Debug() << "SrtMainLoop::run: [" << data.payload.size() << "] SRT-RELAY: resending to output #" << no << "...";
474 if (!o->Schedule(data))
475 {
476 if (Verbose::on)
477 {
478 ostringstream os;
479 os << " --XXX-> <" << no << ">";
480 output_report.push_back(os.str());
481 }
482 m_output_media.erase(i);
483 continue;
484 }
485
486 if (Verbose::on)
487 {
488 ostringstream os;
489 os << " --> <" << no << ">";
490 output_report.push_back(os.str());
491 }
492 any = true;
493 ++no;
494 }
495 applog.Debug() << "SrtMainLoop::run: [" << data.payload.size() << "] SRT-RELAY -> OUTPUTS: " << Printable(output_report);
496
497 if (Verbose::on)
498 {
499 string outputs;
500 for (auto& r: output_report)
501 outputs += " " + r;
502 if (!any)
503 outputs = " --> * (no output)";
504
505 Verb() << VerbLock << "SRT [" << data.payload.size() << "] " << outputs;
506 }
507 }
508
509 Verb() << "MEDIA LOOP EXIT";
510 for (auto& m : m_output_media)
511 {
512 m->quit();
513 }
514 m_input_medium.quit();
515 m_srt_source.quit();
516
517 if (m_input_xp)
518 {
519 try {
520 std::rethrow_exception(m_input_xp);
521 } catch (std::exception& x) {
522 cerr << "INPUT EXIT BY EXCEPTION: " << x.what() << endl;
523 } catch (...) {
524 cerr << "INPUT EXIT BY UNKNOWN EXCEPTION\n";
525 }
526 }
527 }
528